Skip to content
This repository has been archived by the owner on Nov 12, 2017. It is now read-only.

Add Scheduler #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/ScheduledTask.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/** @license MIT License (c) copyright 2016 original author or authors */

export default class ScheduledTask {
constructor (delay, period, task, scheduler) {
this.time = delay
this.period = period
this.task = task
this.scheduler = scheduler
this.active = true
}

run () {
return this.task.run(this.time)
}

error (e) {
return this.task.error(this.time, e)
}

cancel () {
this.scheduler.cancel(this)
return this.task.dispose()
}
}
180 changes: 180 additions & 0 deletions src/Scheduler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/** @license MIT License (c) copyright 2016 original author or authors */

import ScheduledTask from './ScheduledTask'
import binarySearch from './binarySearch'
import { findIndex, removeAll } from '@most/prelude'

export default class Scheduler {
constructor (timer) {
this.timer = timer

this._timer = null
this._nextArrival = 0
this._tasks = []

var self = this
this._runReadyTasksBound = function () {
self._runReadyTasks(self.now())
}
}

now () {
return this.timer.now()
}

asap (task) {
return this.schedule(0, -1, task)
}

delay (delay, task) {
return this.schedule(delay, -1, task)
}

periodic (period, task) {
return this.schedule(0, period, task)
}

schedule (delay, period, task) {
var now = this.now()
var st = new ScheduledTask(now + Math.max(0, delay), period, task, this)

insertByTime(st, this._tasks)
this._scheduleNextRun(now)
return st
}

cancel (task) {
task.active = false
var i = binarySearch(task.time, this._tasks)

if (i >= 0 && i < this._tasks.length) {
var at = findIndex(task, this._tasks[i].events)
if (at >= 0) {
this._tasks[i].events.splice(at, 1)
this._reschedule()
}
}
}

cancelAll (f) {
for (var i = 0; i < this._tasks.length; ++i) {
removeAllFrom(f, this._tasks[i])
}
this._reschedule()
}

_reschedule () {
if (this._tasks.length === 0) {
this._unschedule()
} else {
this._scheduleNextRun(this.now())
}
}

_unschedule () {
this.timer.clearTimer(this._timer)
this._timer = null
}

_scheduleNextRun (now) { // eslint-disable-line complexity
if (this._tasks.length === 0) {
return
}

var nextArrival = this._tasks[0].time

if (this._timer === null) {
this._scheduleNextArrival(nextArrival, now)
} else if (nextArrival < this._nextArrival) {
this._unschedule()
this._scheduleNextArrival(nextArrival, now)
}
}

_scheduleNextArrival (nextArrival, now) {
this._nextArrival = nextArrival
var delay = Math.max(0, nextArrival - now)
this._timer = this.timer.setTimer(this._runReadyTasksBound, delay)
}

_runReadyTasks (now) {
this._timer = null

this._tasks = this._findAndRunTasks(now)

this._scheduleNextRun(this.now())
}

_findAndRunTasks (now) {
var tasks = this._tasks
var l = tasks.length
var i = 0

while (i < l && tasks[i].time <= now) {
++i
}

this._tasks = tasks.slice(i)

// Run all ready tasks
for (var j = 0; j < i; ++j) {
this._tasks = runTasks(tasks[j], this._tasks)
}
return this._tasks
}
}

function removeAllFrom (f, timeslot) {
timeslot.events = removeAll(f, timeslot.events)
}

function runTasks (timeslot, tasks) { // eslint-disable-line complexity
var events = timeslot.events
for (var i = 0; i < events.length; ++i) {
var task = events[i]

if (task.active) {
runTask(task)

// Reschedule periodic repeating tasks
// Check active again, since a task may have canceled itself
if (task.period >= 0) {
task.time = task.time + task.period
insertByTime(task, tasks)
}
}
}

return tasks
}

function runTask (task) {
try {
return task.run()
} catch (e) {
return task.error(e)
}
}

function insertByTime (task, timeslots) { // eslint-disable-line complexity
var l = timeslots.length

if (l === 0) {
timeslots.push(newTimeslot(task.time, [task]))
return
}

var i = binarySearch(task.time, timeslots)

if (i >= l) {
timeslots.push(newTimeslot(task.time, [task]))
} else if (task.time === timeslots[i].time) {
timeslots[i].events.push(task)
} else {
timeslots.splice(i, 0, newTimeslot(task.time, [task]))
}
}

function newTimeslot (t, events) {
return { time: t, events: events }
}
21 changes: 21 additions & 0 deletions src/binarySearch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/** @license MIT License (c) copyright 2016 original author or authors */

export default function binarySearch (t, sortedArray) { // eslint-disable-line complexity
var lo = 0
var hi = sortedArray.length
var mid, y

while (lo < hi) {
mid = Math.floor((lo + hi) / 2)
y = sortedArray[mid]

if (t === y.time) {
return mid
} else if (t < y.time) {
hi = mid
} else {
lo = mid + 1
}
}
return hi
}
3 changes: 2 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/** @license MIT License (c) copyright 2016 original author or authors */

import Stream from './Stream'
import Scheduler from './Scheduler'

export { Stream }
export { Stream, Scheduler }