mirror of
https://github.com/flynx/types.js.git
synced 2025-10-28 10:00:08 +00:00
996 lines
27 KiB
JavaScript
996 lines
27 KiB
JavaScript
/**********************************************************************
|
|
*
|
|
*
|
|
*
|
|
*
|
|
* XXX would be helpful to define a task api...
|
|
* task('abort') vs. task.abort(), task state, ...etc.
|
|
* then define Task and TaskQueue(Queue) and extended api to:
|
|
* - task state introspection
|
|
* - stop/resume tasks (or task queue?)
|
|
* - serialize tasks
|
|
* - ...
|
|
* would be nice to make the task just a slightly extended or better
|
|
* defined function/generator, ideally to make them interchangable...
|
|
* XXX BUG: there seems to be an odd bug somplace here where each item
|
|
* gets processed once per "thread"... (ImageGrid.Viewer export)
|
|
*
|
|
**********************************************/ /* c8 ignore next 2 */
|
|
((typeof define)[0]=='u'?function(f){module.exports=f(require)}:define)
|
|
(function(require){ var module={} // make module AMD/node compatible...
|
|
/*********************************************************************/
|
|
|
|
var object = require('ig-object')
|
|
|
|
require('./Array')
|
|
require('./Promise')
|
|
|
|
var events = require('./event')
|
|
|
|
|
|
|
|
/*********************************************************************/
|
|
|
|
module.STOP = object.STOP
|
|
|
|
module.SKIP = {doc: 'skip queue item',}
|
|
|
|
|
|
//---------------------------------------------------------------------
|
|
// Queue...
|
|
//
|
|
// A means to manage execution of large-ish number of small tasks...
|
|
//
|
|
// A queue is a list of async functions that get executed in order and
|
|
// not more than .pool_size can run at a time, i.e. new tasks get
|
|
// started only only when tasks in the running pool either finish or
|
|
// release their spot in the pool.
|
|
//
|
|
// XXX would be nice to:
|
|
// - nest queues -- DONE
|
|
// - chain queues
|
|
// - weave queues -- a-la generator's .map(..) etc.
|
|
// XXX need to configure to run a specific amount of jobs on each start...
|
|
// XXX document the Queue({handler: e => e*e}, 1,2,3,4) use-case...
|
|
// ...this is essentially a .map(..) variant...
|
|
var Queue =
|
|
module.Queue =
|
|
object.Constructor('Queue', Array, {
|
|
// create a running queue...
|
|
//
|
|
runTasks: function(...tasks){
|
|
if(typeof(tasks[0]) != 'function'
|
|
&& !(tasks[0] instanceof Queue)
|
|
&& typeof(tasks[0].finally) != 'function'){
|
|
var [options, ...tasks] = arguments }
|
|
return this(
|
|
Object.assign({},
|
|
options || {},
|
|
{ state: 'running' }),
|
|
...tasks) },
|
|
|
|
// Create a handler queue...
|
|
//
|
|
// Queue.handle(func, ...data)
|
|
// Queue.handle(options, func, ...data)
|
|
// -> queue
|
|
//
|
|
// NOTE: func(..) should be compatible with .handler(..) instance method...
|
|
// NOTE: this is a shorthand for:
|
|
// Queue({handler: func, ...}, ...data)
|
|
handle: function(handler, ...data){
|
|
// NOTE: this is a simpler test than in .runTasks(..) above because
|
|
// here we are expecting a function as the first arg in the
|
|
// general case while above a non-task is the exception..
|
|
if(typeof(handler) != 'function'){
|
|
var [options, handler, ...data] = arguments }
|
|
return this(
|
|
Object.assign({},
|
|
options || {},
|
|
{handler}),
|
|
...data) },
|
|
|
|
}, events.EventMixin('flat', {
|
|
// Config...
|
|
//
|
|
// Number of tasks to be running at the same time...
|
|
pool_size: 8,
|
|
|
|
// Number of tasks to run before letting go of the exec frame...
|
|
pause_after_sync: 4,
|
|
|
|
// XXX
|
|
auto_start: false,
|
|
|
|
// Start synchronously...
|
|
//
|
|
// NOTE: this affects the start only, all other timeouts apply as-is...
|
|
sync_start: false,
|
|
|
|
catch_errors: true,
|
|
|
|
// If true, stop after queue is depleted...
|
|
auto_stop: false,
|
|
|
|
// NOTE: for long running queues this may get quite big...
|
|
collect_results: true,
|
|
|
|
// Sub-queue handling mode...
|
|
//
|
|
// This can be:
|
|
// 'wait' - wait fot the sun-queue to stop
|
|
// 'unwind' - run sub-task and requeue parent
|
|
//
|
|
// XXX do we need this???
|
|
// XXX should the nested queue decide??? ...how???
|
|
sub_queue: 'unwind',
|
|
|
|
// If true only add unique items to queue...
|
|
// XXX not implemented yet...
|
|
// ...how should this be done?
|
|
// - keep a set of seen elements and check against it?
|
|
// - check against the queue contents?
|
|
unique_items: false,
|
|
|
|
// Timeouts...
|
|
//
|
|
// Time to wait when pool is full...
|
|
// if 'auto', wait the average task time * .busy_timeout_scale.
|
|
// XXX revise defaults...
|
|
busy_timeout: 50,
|
|
|
|
//busy_timeout: 'auto',
|
|
busy_timeout_scale: 5,
|
|
|
|
// Time to wait between checks for new tasks in an empty queue...
|
|
poling_timeout: 200,
|
|
|
|
// Time to pause after a set of .pause_after_sync sync tasks...
|
|
pause_timeout: 0,
|
|
|
|
// Runtime statistics...
|
|
//
|
|
// To disable set to false
|
|
//
|
|
// NOTE: this, if true, will get replaced with the stats...
|
|
runtime_stats: true,
|
|
|
|
|
|
//
|
|
// This can be:
|
|
// 'running'
|
|
// 'stopped'
|
|
//
|
|
__state: null,
|
|
get state(){
|
|
return this.__state
|
|
|| 'stopped' },
|
|
set state(value){
|
|
if(value == 'running'){
|
|
this.start()
|
|
} else if(value == 'stopped'){
|
|
this.stop()
|
|
} else if(value == 'aborted'){
|
|
this.abort() } },
|
|
|
|
// events/actions - state transitions...
|
|
//
|
|
// NOTE: to start synchronously call .start(true), this will not
|
|
// affect further operation...
|
|
//
|
|
// XXX would be nice to run a specific number of tasks and stop...
|
|
// XXX might be a good idea to let the user set .__wait_for_items...
|
|
// XXX should we wait for items on empty?
|
|
__wait_for_items: null,
|
|
start: events.Event('start', function(handle, sync){
|
|
// first start -- wait for items...
|
|
if('__state' in this){
|
|
this.__wait_for_items = true }
|
|
// can't start while running...
|
|
if(this.__state == 'running' || this.__state == 'aborted'){
|
|
return handle(false) }
|
|
this.__state = 'running'
|
|
// XXX if empty start polling...
|
|
this.__run_tasks__(sync) }),
|
|
stop: events.Event('stop', function(handle){
|
|
// can't stop while not running...
|
|
if(this.state == 'stopped' || this.state == 'aborted'){
|
|
return handle(false) }
|
|
this.__state = 'stopped' }),
|
|
|
|
// events...
|
|
//
|
|
// .tasksAdded(func(evt, [task, ..]))
|
|
// .taskStarting(func(evt, task))
|
|
// .taskCompleted(func(evt, task, res))
|
|
// .queueEmpty(func(evt))
|
|
//
|
|
tasksAdded: events.PureEvent('tasksAdded'),
|
|
taskStarting: events.PureEvent('taskStarting'),
|
|
taskCompleted: events.PureEvent('taskCompleted'),
|
|
taskFailed: events.PureEvent('taskFailed'),
|
|
queueEmpty: events.PureEvent('queueEmpty'),
|
|
|
|
|
|
// Runner API...
|
|
//
|
|
// Run the given task type...
|
|
// .handler(task[, next])
|
|
// -> STOP
|
|
// -> STOP(value)
|
|
// -> SKIP
|
|
// -> queue
|
|
// -> promise
|
|
// -> func
|
|
// -> ...
|
|
//
|
|
// NOTE: this intentionally does not handle results as that whould
|
|
// require this to also handle events, and other runtime stuff...
|
|
// ...to add a new task/result type either handle the non-standard
|
|
// result here or wrap it into a standard return value like a
|
|
// promise...
|
|
handler: function(task, next){
|
|
return typeof(task) == 'function' ?
|
|
task()
|
|
: (task instanceof Queue
|
|
&& this.sub_queue == 'unwind') ?
|
|
(task.runTask(next), task)
|
|
: (task instanceof Queue
|
|
&& this.sub_queue == 'wait') ?
|
|
task.start()
|
|
: task },
|
|
//
|
|
// Hanlde 'running' state (async)...
|
|
// .__run_tasks__()
|
|
// -> this
|
|
//
|
|
// NOTE: .sync_start affects only the first run...
|
|
// NOTE: we do not store the exec results...
|
|
// NOTE: not intended for direct use and will likely have no effect
|
|
// if called directly...
|
|
//
|
|
// XXX will this be collected by the GC if it is polling???
|
|
__running: null,
|
|
__run_tasks__: function(sync){
|
|
var that = this
|
|
sync = sync == null ?
|
|
this.sync_start
|
|
: sync == 'async' ?
|
|
false
|
|
: !!sync
|
|
|
|
var run = function(){
|
|
var c = 0
|
|
var pause = this.pause_after_sync
|
|
var running = this.__running || []
|
|
|
|
// run queue...
|
|
while(this.length > 0
|
|
&& this.state == 'running'
|
|
// do not exceed pool size...
|
|
&& running.length < this.pool_size
|
|
// do not run too many sync tasks without a break...
|
|
&& (pause == null
|
|
|| c < pause)){
|
|
var p = running.length
|
|
delete this.__wait_for_items
|
|
|
|
this.runTask(this.__run_tasks__.bind(this))
|
|
|
|
// NOTE: only count sync stuff that does not get added
|
|
// to the pool...
|
|
p == running.length
|
|
&& c++ }
|
|
|
|
// empty queue -> pole or stop...
|
|
//
|
|
// NOTE: we endup here in two cases:
|
|
// - the pool is full
|
|
// - the queue is empty
|
|
// NOTE: we do not care about stopping the timer when changing
|
|
// state as .__run_tasks__() will stop itself...
|
|
if(this.state == 'running'){
|
|
var timeout =
|
|
// idle -- empty queue...
|
|
this.length == 0 ?
|
|
this.poling_timeout
|
|
// busy poling -- pool full...
|
|
: c < pause ?
|
|
//this.busy_timeout
|
|
(this.runtime_stats && this.busy_timeout == 'auto' ?
|
|
(this.runtime_stats.avg_t || 50) * (this.busy_timeout_scale || 2)
|
|
: this.busy_timeout == 'auto' ?
|
|
50 * (this.busy_timeout_scale || 2)
|
|
: this.busy_timeout)
|
|
// pause -- let other stuff run...
|
|
: (this.pause_timeout || 0)
|
|
|
|
;(this.length == 0
|
|
&& this.auto_stop
|
|
&& !this.__wait_for_items) ?
|
|
// auto-stop...
|
|
this.__onempty__()
|
|
// pole / pause...
|
|
: timeout != null
|
|
&& setTimeout(
|
|
this.__run_tasks__.bind(this), timeout) } }.bind(this)
|
|
|
|
this.state == 'running'
|
|
&& (sync ?
|
|
run()
|
|
: setTimeout(run, 0))
|
|
return this },
|
|
__onempty__: function(){
|
|
var that = this
|
|
this.poling_timeout != null ?
|
|
// wait a bit then stop if still empty...
|
|
setTimeout(function(){
|
|
that.length > 0 ?
|
|
that.__run_tasks__()
|
|
: that.stop()
|
|
}, this.poling_timeout)
|
|
// stop now...
|
|
: this.stop()
|
|
return this },
|
|
// run one task from queue...
|
|
// NOTE: this does not care about .state...
|
|
// XXX revise error handling...
|
|
// XXX ABORT: added nested abort support...
|
|
__results: null,
|
|
runTask: function(next){
|
|
var that = this
|
|
var running = this.__running = this.__running || []
|
|
|
|
// can't run...
|
|
if(this.length == 0
|
|
|| running.length >= this.pool_size ){
|
|
return this }
|
|
|
|
// closure: running, task, res, stop, next...
|
|
var taskCompleted = function(){
|
|
// calculate runtime statistics...
|
|
if(that.runtime_stats){
|
|
var x = Date.now() - t0
|
|
var s = that.runtime_stats =
|
|
that.runtime_stats
|
|
|| {max_t: x, min_t: x, avg_t: x, count: 0}
|
|
s.max_t = Math.max(s.max_t, x)
|
|
s.min_t = Math.min(s.min_t, x)
|
|
var i = ++s.count
|
|
var a = s.avg_t
|
|
s.avg_t = a + (x - a)/i }
|
|
// report...
|
|
that.trigger('taskCompleted', task, res) }
|
|
var fail = {doc: 'fail runningDone(..)'}
|
|
// XXX should this also remove .__running when empty???
|
|
var runningDone = function(mode){
|
|
running.splice(0, running.length,
|
|
// NOTE: there can be multiple occurrences of res...
|
|
...running
|
|
.filter(function(e){ return e !== res }))
|
|
mode === fail
|
|
|| taskCompleted()
|
|
!stop && next
|
|
&& next() }
|
|
|
|
var task = this.shift()
|
|
|
|
this.trigger('taskStarting', task)
|
|
var t0 = this.runtime_stats && Date.now()
|
|
|
|
// run...
|
|
// catch and pass errors to .taskFailed(...)
|
|
if(this.catch_errors){
|
|
var err
|
|
try {
|
|
var res = this.handler(task, next)
|
|
|
|
} catch(err){
|
|
this.trigger('taskFailed', task, err) }
|
|
|
|
// promise result...
|
|
// XXX is the err test here needed???
|
|
res
|
|
&& err === undefined
|
|
&& res.catch
|
|
&& res.catch(function(err){
|
|
that.trigger('taskFailed', task, err) })
|
|
|
|
// let errors rain...
|
|
} else {
|
|
var res = this.handler(task, next) }
|
|
|
|
// handle stop...
|
|
var stop = res === module.STOP
|
|
|| res instanceof module.STOP
|
|
res = res instanceof module.STOP ?
|
|
res.value
|
|
: res === module.STOP ?
|
|
undefined
|
|
: res
|
|
|
|
// collect results...
|
|
this.collect_results
|
|
&& res !== module.SKIP
|
|
&& (this.__results = this.__results ?? []).push(res)
|
|
|
|
// handle task results...
|
|
//
|
|
// queue -- as a set of tasks...
|
|
if(res instanceof Queue
|
|
&& this.sub_queue == 'unwind'){
|
|
if(res.length > 0){
|
|
this.push(res) }
|
|
taskCompleted()
|
|
|
|
// queue -- as a task...
|
|
} else if(res instanceof Queue
|
|
&& this.sub_queue == 'wait'){
|
|
if(res.state == 'stopped'){
|
|
taskCompleted()
|
|
|
|
} else {
|
|
running.push(res)
|
|
res.stop(function(){
|
|
// not fully done yet -- re-queue...
|
|
res.length > 0
|
|
&& that.push(res)
|
|
runningDone() }) }
|
|
|
|
// pool async (promise) task...
|
|
// XXX REVISE...
|
|
// XXX do we need to report errors???
|
|
} else if(typeof((res || {}).then) == 'function'
|
|
// one post handler is enough...
|
|
// XXX will this prevent some tasks from reporting???
|
|
&& !running.includes(res)){
|
|
running.push(res)
|
|
//res.finally(runningDone)
|
|
res.then(
|
|
runningDone,
|
|
...(this.catch_errors ?
|
|
[function(err){
|
|
runningDone(fail)
|
|
that.trigger('taskFailed', task, err) }]
|
|
// let errors propagate...
|
|
: []))
|
|
|
|
// func -> re-queue tasks...
|
|
} else if(typeof(res) == 'function'){
|
|
taskCompleted()
|
|
this.push(res)
|
|
|
|
// completed sync task...
|
|
} else {
|
|
taskCompleted() }
|
|
|
|
this.length == 0
|
|
&& this.trigger('queueEmpty')
|
|
|
|
stop
|
|
&& this.stop()
|
|
|
|
return res },
|
|
|
|
|
|
// helpers...
|
|
//
|
|
// move tasks to head/tail of queue resp.
|
|
prioritize: function(...tasks){
|
|
return this.sortAs(tasks) },
|
|
delay: function(...tasks){
|
|
return this.sortAs(tasks, true) },
|
|
|
|
|
|
// edit/add API...
|
|
//
|
|
// trigger .tasksAdded(..) on relevant methods...
|
|
//
|
|
// NOTE: adding tasks via the [..] notation will not trigger the
|
|
// event...
|
|
// NOTE: the events will not be triggered on no-op calls...
|
|
//
|
|
// XXX add methods that can shorten the queue (like .pop()/.shift()/..)
|
|
// to test and trigger .queueEmpty()
|
|
// ...this is not and will not be done on polling as that would
|
|
// introduce issues -- queue can change between task runs... (revise!)
|
|
push: function(...tasks){
|
|
var res = object.parentCall(Queue.prototype.push, this, ...tasks)
|
|
tasks.length > 0
|
|
&& this.trigger('tasksAdded', tasks)
|
|
return res },
|
|
unsift: function(...tasks){
|
|
var res = object.parentCall(Queue.prototype.unshift, this, ...tasks)
|
|
tasks.length > 0
|
|
&& this.trigger('tasksAdded', tasks)
|
|
return res },
|
|
splice: function(...args){
|
|
var l = this.length
|
|
var res = object.parentCall(Queue.prototype.splice, this, ...args)
|
|
var tasks = args.slice(2)
|
|
tasks.length > 0
|
|
&& this.trigger('tasksAdded', tasks)
|
|
// length changed...
|
|
l != 0 && this.length == 0
|
|
&& this.trigger('queueEmpty')
|
|
return res },
|
|
|
|
// shorthands...
|
|
//
|
|
// NOTE: this helps get around the argument number limitation in JS...
|
|
add: function(tasks){
|
|
tasks = tasks instanceof Array ?
|
|
tasks
|
|
: [tasks]
|
|
// handle too large a number of args...
|
|
var MAX_ARGS = 10000
|
|
if(tasks.length > MAX_ARGS){
|
|
while(tasks.length > 0){
|
|
this.push(...tasks.splice(0, MAX_ARGS)) }
|
|
return this }
|
|
this.push(...tasks)
|
|
return this },
|
|
// NOTE: this will also clear the results cache...
|
|
clear: function(full=false){
|
|
full
|
|
&& (delete this.__results)
|
|
this.splice(0, this.length) },
|
|
|
|
|
|
// constructor argument handling...
|
|
//
|
|
// Queue()
|
|
// -> queue
|
|
//
|
|
// Queue(..,tasks)
|
|
// -> queue
|
|
//
|
|
// Queue(options)
|
|
// Queue(options, ..,tasks)
|
|
// -> queue
|
|
//
|
|
__init__: function(options){
|
|
// options...
|
|
if(!(this[0] instanceof Queue)
|
|
&& this[0] instanceof Object
|
|
&& typeof(this[0]) != 'function'
|
|
// XXX do we need this test???
|
|
&& typeof(this[0].finally) != 'function'){
|
|
Object.assign(this, this.shift()) }
|
|
this.length > 0
|
|
&& this.trigger('tasksAdded', [...this])
|
|
// see if we need to start...
|
|
this.__run_tasks__() },
|
|
}))
|
|
|
|
|
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
// Like Queue(..) but adds terminal states and conversion to promises...
|
|
//
|
|
// XXX should this .freeze()??/
|
|
// XXX find a better name...
|
|
var FinalizableQueue =
|
|
module.FinalizableQueue =
|
|
object.Constructor('FinalizableQueue', Queue, {
|
|
auto_stop: true,
|
|
|
|
// not for direct use...
|
|
// NOTE: we are intentionally not freezing .__results to allow them
|
|
// to be consumed by the client...
|
|
// XXX is this correct???
|
|
__freeze: function(){
|
|
Object.freeze(this)
|
|
return this },
|
|
|
|
__onempty__: function(){
|
|
return this.trigger('done') },
|
|
|
|
done: events.Event('done', function(handle){
|
|
// abort only once...
|
|
if(this.state == 'aborted'
|
|
|| this.state == 'done'){
|
|
return handle(false) }
|
|
this.__state = 'done'
|
|
this.__freeze() }),
|
|
abort: events.Event('abort', function(handle){
|
|
// abort only once...
|
|
if(this.state == 'aborted'
|
|
|| this.state == 'done'){
|
|
return handle(false) }
|
|
this.__state = 'aborted'
|
|
this.__freeze() }),
|
|
|
|
// NOTE: each handler will get called once when the next time the
|
|
// queue is emptied...
|
|
promise: function(){
|
|
return this.__promise },
|
|
then: function(onresolve, onreject){
|
|
return this.promise().then(...arguments) },
|
|
catch: function(onreject){
|
|
return this.promise().catch(...arguments) },
|
|
|
|
__init__: function(options){
|
|
var that = this
|
|
this.__promise = new Promise(function(resolve, reject){
|
|
that
|
|
.one('done', function(){
|
|
resolve(...(that.collect_results ?
|
|
[that.__results ?? []]
|
|
: [])) })
|
|
.one('abort', reject) })
|
|
return object.parentCall(FinalizableQueue.prototype.__init__, this, ...arguments) },
|
|
})
|
|
|
|
|
|
|
|
|
|
//---------------------------------------------------------------------
|
|
// Task manager...
|
|
|
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
// Helpres...
|
|
|
|
// Task ticket...
|
|
//
|
|
// This lets the client control the task object and receive messages
|
|
// from it.
|
|
//
|
|
// NOTE: this is not intended for direct use...
|
|
var TaskTicket =
|
|
// XXX do we let the user see this???
|
|
module.TaskTicket =
|
|
object.Constructor('TaskTicket', Promise, {
|
|
__data: null,
|
|
|
|
title: null,
|
|
task: null,
|
|
|
|
get state(){
|
|
return this.__data.state },
|
|
|
|
resolve: function(...args){
|
|
if(this.__data.state == 'pending'){
|
|
this.__data.state = 'resolved'
|
|
this.__data.resolve(...args) }
|
|
return this },
|
|
reject: function(...args){
|
|
if(this.__data.state == 'pending'){
|
|
this.__data.state = 'rejected'
|
|
this.__data.reject(...args) }
|
|
return this },
|
|
onmessage: function(msg, func){
|
|
this.__data.onmessage(
|
|
typeof(msg) == 'function' ?
|
|
msg
|
|
: function(m, ...args){
|
|
m == msg
|
|
&& func(...args) })
|
|
return this },
|
|
|
|
then: Promise.iter.prototype.then,
|
|
|
|
__new__: function(_, title, resolve, reject, onmessage, task){
|
|
var handlers
|
|
var resolver = arguments[1]
|
|
|
|
var obj = Reflect.construct(
|
|
TaskTicket.__proto__,
|
|
[function(resolve, reject){
|
|
handlers = {resolve, reject}
|
|
// NOTE: this is here to support builtin .then(..)
|
|
typeof(resolver) == 'function'
|
|
&& resolver(resolve, reject) }],
|
|
TaskTicket)
|
|
// if we got a resolver then it's an internal constructor we are
|
|
// not using (likely in base .then(..)) so there is no point in
|
|
// moving on...
|
|
// NOTE: this may be a potential source of bugs so we need to
|
|
// keep tracking this (XXX)
|
|
if(typeof(resolver) == 'function'){
|
|
return obj }
|
|
|
|
// bind this to external resolve/reject...
|
|
obj.then(
|
|
function(){
|
|
resolve(...arguments) },
|
|
function(){
|
|
reject(...arguments) })
|
|
// setup state...
|
|
obj.title = title
|
|
obj.task = task
|
|
Object.defineProperty(obj, '__data', {
|
|
value: {
|
|
resolve: handlers.resolve,
|
|
reject: handlers.reject,
|
|
onmessage,
|
|
state: 'pending',
|
|
},
|
|
enumerable: false,
|
|
})
|
|
return obj },
|
|
})
|
|
|
|
|
|
// NOTE: this is not intended for direct use...
|
|
var TaskMixin =
|
|
module.TaskMixin =
|
|
object.Mixin('TaskMixin', 'soft', {
|
|
// standard messages...
|
|
stop: function(){
|
|
this.send('stop', ...arguments) },
|
|
})
|
|
|
|
|
|
|
|
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
// Task manager...
|
|
//
|
|
// Externally manage/influence long running tasks...
|
|
//
|
|
// A task can be:
|
|
// - Promise.interactive(..)
|
|
// - Queue(..)
|
|
// - function(ticket, ..)
|
|
// - object supporting task protocol
|
|
//
|
|
//
|
|
// The task is controlled by passing messages, default messages include:
|
|
// - .stop(..)
|
|
//
|
|
//
|
|
// Task protocol:
|
|
// .then(..) - registers a completion handler (a-la Promise)
|
|
// .stop(..) - triggers a task to stop
|
|
//
|
|
//
|
|
// NOTE: we should keep the API here similar to Queue...
|
|
// ...but this is not a queue in principle (internal vs. external
|
|
// management) so we'll also need to keep them different enough to
|
|
// avoid confusion...
|
|
//
|
|
// XXX should a task manager have a pool size???
|
|
// ...if yes it would be fun to use the queue to manage the pool...
|
|
// XXX revise .abort(..)
|
|
var TaskManager =
|
|
module.TaskManager =
|
|
object.Constructor('TaskManager', Array, events.EventMixin('flat', {
|
|
__task_ticket__: TaskTicket,
|
|
__task_mixin__: TaskMixin,
|
|
|
|
// settings...
|
|
//
|
|
// if true start/end times will be set on the task:
|
|
// .time_started
|
|
// .time_ended
|
|
record_times: true,
|
|
|
|
// if true the task will be started sync before .Task(..) is returns..
|
|
//
|
|
// NOTE: this is not recommended as the default as this can block the
|
|
// manager...
|
|
sync_start: false,
|
|
|
|
|
|
//
|
|
// .titled(title)
|
|
// .titled(title, ..)
|
|
// -> manager
|
|
//
|
|
titled: function(title){
|
|
if(title == 'all' || title == '*'){
|
|
return this }
|
|
var titles = new Set([...arguments])
|
|
return this
|
|
.filter(function(task){
|
|
return titles.has(task.title)
|
|
|| titles.has(task.name) }) },
|
|
|
|
// actions...
|
|
//
|
|
// commands to test as methods...
|
|
// i.e. task.send(cmd, ...args) -> task[cmd](...args)
|
|
__send_commands__: [ 'stop', 'abort'],
|
|
send: function(title, ...args){
|
|
var that = this
|
|
if(title == 'all' || title == '*'){
|
|
;[...this].forEach(function(task){
|
|
'send' in task ?
|
|
task.send(...args)
|
|
: that.__send_commands__.includes(args[0]) ?
|
|
task[args[0]](...args.slice(1))
|
|
// XXX
|
|
: console.warn('.send(..): can\'t .send(..) to:', task)
|
|
})
|
|
return this }
|
|
return this.titled(
|
|
...(title instanceof Array) ?
|
|
title
|
|
: [title])
|
|
.send('all', ...args) },
|
|
// XXX should this be an event???
|
|
// the best way to go would be to proxy this to task-specific
|
|
// variants and register handlers on the tasks...
|
|
// ...should work with .on(..) / ... and other event methods...
|
|
stop: function(title='all'){
|
|
this.send(title, 'stop')
|
|
return this },
|
|
// XXX
|
|
abort: function(title='all'){
|
|
this.send(title, 'abort')
|
|
return this },
|
|
|
|
// events...
|
|
//
|
|
// XXX need to be able to bind to proxy events...
|
|
done: events.PureEvent('done'),
|
|
error: events.PureEvent('error'),
|
|
tasksDone: events.PureEvent('tasksDone'),
|
|
|
|
|
|
// Create/start a task...
|
|
//
|
|
// Create a task...
|
|
// .Task(task)
|
|
// .Task(title, task)
|
|
// -> task-handler
|
|
//
|
|
//
|
|
// Create a function task...
|
|
// .Task(func, ..)
|
|
// .Task(title, func, ..)
|
|
// -> task-handler
|
|
//
|
|
// func(ticket, ..)
|
|
//
|
|
//
|
|
// A task can be:
|
|
// - Promise.cooperative instance
|
|
// - Queue instance
|
|
// - function
|
|
// - Promise instance
|
|
//
|
|
// The task-manager is a Promise.interactive(..) instance with
|
|
// TaskMixin added.
|
|
//
|
|
// The ticket is a TaskTicket instance, see it for reference...
|
|
//
|
|
//
|
|
//
|
|
// We can also force a specific task to start sync/async regardless
|
|
// of the .sync_start setting:
|
|
//
|
|
// .Task('sync', task)
|
|
// .Task('sync', title, task)
|
|
// .Task(title, 'sync', task)
|
|
// -> task-handler
|
|
//
|
|
// .Task('async', task)
|
|
// .Task('async', title, task)
|
|
// .Task(title, 'async', task)
|
|
// -> task-handler
|
|
//
|
|
//
|
|
// sync/async start mode apply only to function tasks and tasks that
|
|
// have a .start() method like Queue's...
|
|
//
|
|
//
|
|
// NOTE: 'sync' more for a blocking task will block the task manager.
|
|
// NOTE: only function tasks accept args.
|
|
// NOTE: the task is started as soon as it is accepted.
|
|
// NOTE: tasks trigger events only on the task-manager instance that
|
|
// they were created in...
|
|
// XXX try and make all event handlers to be registered in the
|
|
// task itself and all the manager events just be proxies
|
|
// to tasks...
|
|
// ...this needs to work with all the event methods...
|
|
Task: function(title, task, ...args){
|
|
var that = this
|
|
var _args = [...arguments]
|
|
|
|
// parse args...
|
|
var sync_start = this.sync_start
|
|
if(title == 'sync' || title == 'async'){
|
|
;[sync_start, title, task, ...args] = _args
|
|
sync_start = sync_start == 'sync'
|
|
|
|
} else if(task == 'sync' || task == 'async'){
|
|
;[title, sync_start, task, ...args] = _args
|
|
sync_start = sync_start == 'sync' }
|
|
|
|
// anonymous task...
|
|
if(typeof(title) != typeof('str')){
|
|
;[task, ...args] = _args
|
|
title = null }
|
|
|
|
// normalize handler...
|
|
var run
|
|
var handler =
|
|
// queue...
|
|
// NOTE: queue is task-compatible...
|
|
task instanceof FinalizableQueue ?
|
|
task
|
|
// task protocol...
|
|
: task && task.then
|
|
&& (task.stop || task.send) ?
|
|
task
|
|
: this.__task_mixin__(
|
|
// interactive promise...
|
|
task instanceof Promise.interactive ?
|
|
task
|
|
// dumb promise -- will ignore all the messages...
|
|
// XXX should we complain about this???
|
|
: task instanceof Promise ?
|
|
Promise.interactive(
|
|
function(resolve, reject, onmsg){
|
|
// NOTE: since this is a promise, we can't
|
|
// stop it externally...
|
|
onmsg(function(msg){
|
|
msg == 'stop'
|
|
&& reject('stop') })
|
|
task.then(resolve, reject) })
|
|
// function...
|
|
: Promise.interactive(
|
|
function(resolve, reject, onmessage){
|
|
// NOTE: we need to start this a bit later hence
|
|
// we wrap this into run(..) and call it when
|
|
// the context is ready...
|
|
run = function(){
|
|
var res =
|
|
task(
|
|
that.__task_ticket__(title, resolve, reject, onmessage, handler),
|
|
...args)
|
|
// NOTE: if the client calls resolve(..) this
|
|
// second resolve(..) call has no effect,
|
|
// and the same is true with reject...
|
|
// XXX is double binding like this (i.e.
|
|
// ticket + .then()) a good idea???
|
|
res instanceof Promise
|
|
&& res.then(resolve, reject) } }))
|
|
// set handler title...
|
|
// NOTE: this will override the title of the handler if it was
|
|
// set before...
|
|
if(title){
|
|
handler.title
|
|
&& console.warn(
|
|
'TaskManager.Task(..): task title already defined:', handler.title,
|
|
'overwriting with:', title)
|
|
Object.assign(handler, {title}) }
|
|
|
|
this.push(handler)
|
|
|
|
this.record_times
|
|
&& (handler.time_started = Date.now())
|
|
|
|
// handle task manager state...
|
|
var cleanup = function(evt){
|
|
return function(res){
|
|
that.record_times
|
|
&& (handler.time_ended = Date.now())
|
|
that.splice(that.indexOf(handler), 1)
|
|
that.trigger(evt, task, res)
|
|
that.length == 0
|
|
&& that.trigger('tasksDone') } }
|
|
handler
|
|
.then(cleanup('done'), cleanup('error'))
|
|
|
|
// start...
|
|
var start = function(){
|
|
run ?
|
|
run()
|
|
: task.start ?
|
|
task.start()
|
|
: null }
|
|
// trigger task start...
|
|
sync_start ?
|
|
start()
|
|
: setTimeout(start, 0)
|
|
|
|
return handler },
|
|
}))
|
|
|
|
|
|
|
|
|
|
/**********************************************************************
|
|
* vim:set ts=4 sw=4 : */ return module })
|