types.js/runner.js

388 lines
9.8 KiB
JavaScript
Raw Normal View History

/**********************************************************************
*
*
*
*
* XXX would be helpful to define a task api...
* task('abort') vs. task.abort(), task state, ...etc.
* then define Task and TaskQueue(Queue) and extended api to:
* - task state introspection
* - stop/resume tasks (or task queue?)
* - serialize tasks
* - ...
* would be nice to make the task just a slightly extended or better
* defined function/generator, ideally to make them interchangable...
*
**********************************************/ /* c8 ignore next 2 */
((typeof define)[0]=='u'?function(f){module.exports=f(require)}:define)
(function(require){ var module={} // make module AMD/node compatible...
/*********************************************************************/
var object = require('ig-object')
require('./Array')
require('./Promise')
var events = require('./event')
/*********************************************************************/
module.STOP = object.STOP
//---------------------------------------------------------------------
// XXX need to configure to run a specific amount of jobjs on each start...
// XXX do we need an async mode -- exec .__run_tasks__(..) in a
// setTimeout(.., 0)???
var Queue =
module.Queue =
object.Constructor('Queue', Array, {
// create a running queue...
runTasks: function(...tasks){
return this({ state: 'running' }, ...tasks) },
}, events.EventMixin('flat', {
// config...
//
pool_size: 8,
poling_delay: 200,
auto_stop: false,
//
// This can be:
// 'wait' - wait fot the sun-queue to stop
// 'unwind' - run sub-task and requeue parent
//
// XXX do we need this???
// XXX should the nested queue decide???
// ...how???
sub_queue: 'unwind',
//
// This can be:
// 'running'
// 'stopped'
//
__state: null,
get state(){
return this.__state
|| 'stopped' },
set state(value){
if(value == 'running'){
this.start()
} else if(value == 'stopped'){
this.stop() } },
// events/actions - state transitions...
//
// XXX would be nice to run a number of tasks...
start: events.Event('start', function(handle){
// can't start while running...
if(this.state == 'running'){
return handle(false) }
this.__state = 'running'
this.__run_tasks__() }),
stop: events.Event('stop', function(handle){
// can't stop while not running...
if(this.state == 'stopped'){
return handle(false) }
this.__state = 'stopped' }),
// events/actions - state transitions...
//
clear: events.Event(function(handler){
this.splice(0, this.length) }),
// events...
//
taskStarting: events.Event('taskStarting'),
taskCompleted: events.Event('taskCompleted'),
queueEmpty: events.Event('queueEmpty'),
// NOTE: each handler will get called once when the next time the
// queue is emptied...
// XXX revise...
then: function(func){
var that = this
return new Promise(function(resolve, reject){
that.one('queueEmpty', function(){
resolve(func()) }) }) },
// helpers...
//
// move tasks to head/tail of queue resp.
prioritize: function(...tasks){
return this.sortAs(tasks) },
delay: function(...tasks){
return this.sortAs(tasks, true) },
// Runner API...
//
// Run the given task type...
// .__run_task__(task[, next])
// -> STOP
// -> STOP(value)
// -> queue
// -> promise
// -> func
// -> ...
//
// NOTE: this intentionally does not handle results as that whould
// require this to also handle events, and other runtime stuff...
// ...to add a new task/result type either handle the non-standard
// result here or wrap it into a standard return value like a
// promise...
__run_task__: function(task, next){
return typeof(task) == 'function' ?
task()
: (task instanceof Queue
&& this.sub_queue == 'unwind') ?
(task.runTask(next), task)
: (task instanceof Queue
&& this.sub_queue == 'wait') ?
task.start()
: task },
//
// Hanlde 'running' state (async)...
// .__run_tasks__()
// -> this
//
// NOTE: we do not store the exec results...
// NOTE: not intended for direct use and will likely have no effect
// if called directly...
__running: null,
__run_tasks__: function(){
var that = this
this.state == 'running'
&& setTimeout(function(){
// handle queue...
while(this.length > 0
&& this.state == 'running'
&& (this.__running || []).length < (this.pool_size || Infinity) ){
this.runTask(this.__run_tasks__.bind(this)) }
// empty queue -> pole or stop...
//
// NOTE: we endup here in two cases:
// - the pool is full
// - the queue is empty
// NOTE: we do not care about stopping the timer when changing
// state as .__run_tasks__() will stop itself...
//
// XXX will this be collected by the GC if it is polling???
if(this.length == 0
&& this.state == 'running'){
this.auto_stop ?
// auto-stop...
this.stop()
// pole...
: (this.poling_delay
&& setTimeout(
this.__run_tasks__.bind(this),
this.poling_delay || 200)) } }.bind(this), 0)
return this },
// run one task from queue...
//
// NOTE: this does not care about .state...
runTask: function(next){
var running = this.__running = this.__running || []
// can't run...
if(this.length == 0
|| running.length >= this.pool_size ){
return this }
// closure: running, task, res, stop, next...
var runningDone = function(){
running.splice(0, running.length,
// NOTE: there can be multiple occurences of res...
...running
.filter(function(e){ return e !== res }))
that.trigger('taskCompleted', task, res)
!stop && next
&& next() }
var task = this.shift()
this.trigger('taskStarting', task)
// run...
var res = this.__run_task__(task, next)
// handle stop...
var stop = res === module.STOP
|| res instanceof module.STOP
res = res instanceof module.STOP ?
res.value
: res
stop
&& this.stop()
// handle task results...
//
// queue -- as a set of tasks...
if(res instanceof Queue
&& this.sub_queue == 'unwind'){
if(res.length > 0){
this.push(res) }
this.trigger('taskCompleted', task, res)
// queue -- as a task...
} else if(res instanceof Queue
&& this.sub_queue == 'wait'){
if(res.state == 'stopped'){
this.trigger('taskCompleted', task, res)
} else {
running.push(res)
res.stop(function(){
// not fully done yet -- re-queue...
res.length > 0
&& that.push(res)
runningDone() }) }
// pool async (promise) task...
} else if(typeof((res || {}).finally) == 'function'
// one post handler is enough...
&& !running.includes(res)){
running.push(res)
res.finally(function(){
runningDone() })
// re-queue tasks...
} else if(typeof(res) == 'function'){
that.trigger('taskCompleted', task, res)
this.push(res)
// completed sync task...
} else {
this.trigger('taskCompleted', task, res) }
this.length == 0
&& this.trigger('queueEmpty')
return this },
// constructor argument handling...
//
// Queue()
// -> queue
//
// Queue(..,tasks)
// -> queue
//
// Queue(options)
// Queue(options, ..,tasks)
// -> queue
//
__init__: function(options){
// options...
if(!(this[0] instanceof Queue)
&& this[0] instanceof Object
&& typeof(this[0]) != 'function'
&& typeof(this[0].finally) != 'function'){
Object.assign(this, this.shift()) }
// see if we need to start...
this.__run_tasks__() },
}))
//---------------------------------------------------------------------
// Task manager...
//
// goal:
// externally manage long running functions/promises/etc
//
// enteties:
// task
// - wrap a function/promise
// - pass the function/promise a reciver
// - return a controller (store in manager)
// manager
// - container for tasks
// - multiplex actions to tasks
//
var TaskMixin =
object.Mixin('TaskMixin', 'soft', {
stop: function(){
this.send('stop', ...arguments) },
})
// XXX should this be a Queue???
var TaskManager =
module.TaskManager =
object.Constructor('TaskManager', Array, events.EventMixin('flat', {
// XXX each task should also trigger this when stopping and this
// should not result in this and tasks infinitely playing
// ping-pong...
stop: events.Event('stop',
function(task='all'){
this.forEach(function(task){
;(task == 'all'
|| task == '*'
|| task === task)
&& task.stop() }) }),
done: events.Event('done'),
Task: function(task, ...args){
var that = this
// normalize handler...
var handler =
// queue...
// NOTE: queue is task-compatible...
task instanceof Queue ?
task.start()
// interactive...
: task && task.then && task.stop ?
task
: TaskMixin(
// dumb promise -- will ignore all the messages...
// XXX should we complain about this???
task instanceof Promise ?
Promise.interactive(
function(resolve, reject, onmsg){
task.then(resolve, reject) })
// function...
: Promise.interactive(
function(resolve, reject, onmsg){
resolve(task(onmsg, ...args)) }))
this.push(handler)
// handle task done...
handler
.then(function(res){
that.splice(that.indexOf(handler), 1)
that.trigger('done', task, res)
that.length == 0
&& that.done('all') })
// XXX or should we return this???
return handler },
}))
/**********************************************************************
* vim:set ts=4 sw=4 : */ return module })