From 5253cef6656ac3fe9ec3f2315fe22f9ceb11c517 Mon Sep 17 00:00:00 2001 From: "Alex A. Naanou" Date: Mon, 23 Nov 2020 02:59:18 +0300 Subject: [PATCH] more refactoring... Signed-off-by: Alex A. Naanou --- Array.js | 30 ++++--- Promise.js | 2 - package.json | 2 +- runner.js | 222 +++++++++++++++++++++++++++++++-------------------- 4 files changed, 153 insertions(+), 103 deletions(-) diff --git a/Array.js b/Array.js index e923dac..45f1f2d 100644 --- a/Array.js +++ b/Array.js @@ -19,7 +19,12 @@ var generator = require('./generator') /*********************************************************************/ +var STOP = +module.STOP = + object.STOP + +/* var StopIteration = module.StopIteration = object.Constructor('StopIteration', Error, { @@ -31,6 +36,7 @@ module.StopIteration = __init__: function(msg){ this.msg = msg }, }) +//*/ @@ -38,20 +44,20 @@ module.StopIteration = // Mixins... // Equivalent to .map(..) / .filter(..) / .reduce(..) / .. with support for -// StopIteration... +// STOP... // // NOTE: these add almost no overhead to the iteration. // NOTE: these will not return a partial result if stopped. // -// XXX should these return a partial result on StopIteration? +// XXX should these return a partial result on STOP? var wrapIterFunc = function(iter){ return function(func){ try { return this[iter](...arguments) } catch(err){ - if(err === StopIteration){ + if(err === STOP){ return - } else if( err instanceof StopIteration){ + } else if( err instanceof STOP){ return err.msg } throw err } } } @@ -87,14 +93,14 @@ var wrapIterFunc = function(iter){ // '20C' - number of chunks // // -// StopIteration can be thrown in func or chunk_handler at any time to +// STOP can be thrown in func or chunk_handler at any time to // abort iteration, this will reject the promise. // // // The main goal of this is to not block the runtime while processing a // very long array by interrupting the processing with a timeout... // -// XXX should these return a partial result on StopIteration? +// XXX should these return a partial result on STOP? // XXX add generators: // .map(..) / .filter(..) / .reduce(..) // ...the basis here should be the chunks, i.e. each cycle should @@ -133,11 +139,11 @@ var makeChunkIter = function(iter, wrapper){ postChunk && postChunk.call(this, this, res, 0) return Promise.all(res) - // handle StopIteration... + // handle STOP... } catch(err){ - if(err === StopIteration){ + if(err === STOP){ return Promise.reject() - } else if( err instanceof StopIteration){ + } else if( err instanceof STOP){ return Promise.reject(err.msg) } throw err } } @@ -158,11 +164,11 @@ var makeChunkIter = function(iter, wrapper){ chunk.map(function([i, v]){ return v }), val, chunk[0][0]) - // handle StopIteration... + // handle STOP... } catch(err){ - if(err === StopIteration){ + if(err === STOP){ return reject() - } else if( err instanceof StopIteration){ + } else if( err instanceof STOP){ return reject(err.msg) } throw err } diff --git a/Promise.js b/Promise.js index 3c44378..77f8b0e 100644 --- a/Promise.js +++ b/Promise.js @@ -9,8 +9,6 @@ var object = require('ig-object') -//var {StopIteration} = require('./Array') - /*********************************************************************/ diff --git a/package.json b/package.json index f604e74..3fa5cf4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ig-types", - "version": "3.7.15", + "version": "4.0.0", "description": "Generic JavaScript types and type extensions...", "main": "main.js", "scripts": { diff --git a/runner.js b/runner.js index 5c25650..60724b3 100644 --- a/runner.js +++ b/runner.js @@ -28,8 +28,15 @@ var events = require('./event') /*********************************************************************/ +module.STOP = object.STOP + + + +//--------------------------------------------------------------------- + // XXX need to configure to run a specific amount of jobjs on each start... -// XXX do we need an async mode -- exec ._run(..) in a setTimeout(.., 0)??? +// XXX do we need an async mode -- exec .__run_tasks__(..) in a +// setTimeout(.., 0)??? var Queue = module.Queue = object.Constructor('Queue', Array, { // create a running queue... @@ -45,7 +52,22 @@ module.Queue = object.Constructor('Queue', Array, { auto_stop: false, + // + // This can be: + // 'wait' - wait fot the sun-queue to stop + // 'unwind' - run sub-task and requeue parent + // + // XXX do we need this??? + // XXX should the nested queue decide??? + // ...how??? + sub_queue: 'unwind', + + // + // This can be: + // 'running' + // 'stopped' + // __state: null, get state(){ return this.__state @@ -56,11 +78,6 @@ module.Queue = object.Constructor('Queue', Array, { } else if(value == 'stopped'){ this.stop() } }, - // XXX - run: function(count){ - // XXX start, run count tasks and stop... - }, - // events/actions - state transitions... // // XXX would be nice to run a number of tasks... @@ -69,14 +86,13 @@ module.Queue = object.Constructor('Queue', Array, { if(this.state == 'running'){ return handle(false) } this.__state = 'running' - this._run() }), + this.__run_tasks__() }), stop: events.Event('stop', function(handle){ // can't stop while not running... if(this.state == 'stopped'){ return handle(false) } this.__state = 'stopped' }), - // events/actions - state transitions... // clear: events.Event(function(handler){ @@ -89,111 +105,58 @@ module.Queue = object.Constructor('Queue', Array, { taskCompleted: events.Event('taskCompleted'), queueEmpty: events.Event('queueEmpty'), + // helpers... // - // XXX how do we reference the tasks here??? - // - indexes - // - ranges -- simelar to .slice(..) - // - by value - // XXX + // move tasks to head/tail of queue resp. prioritize: function(...tasks){ return this.sortAs(tasks) }, - // same as prioritize but adds stuff to the tail... delay: function(...tasks){ return this.sortAs(tasks, true) }, - + + // Runner API... // - // .__run_task__(task) + // Run the given task type... + // .__run_task__(task[, next]) + // -> STOP + // -> STOP(value) // -> promise // -> func // -> queue // -> ... // - // XXX should this support a task being a queue ??? - // XXX do we actually need this?? - // ...should this include result processing??? - __run_task__: function(task){ + __run_task__: function(task, next){ return typeof(task) == 'function' ? task() - : task instanceof Queue ? - // XXX should we run one item or trigger and wait for event??? - // ...should this be an option??? + : (task instanceof Queue + && this.sub_queue == 'unwind') ? + (task.run(next), task) + : (task instanceof Queue + && this.sub_queue == 'wait') ? task.start() : task }, - - - // main runner... + // + // Hanlde 'running' state... + // .__run_tasks__() + // -> this // // NOTE: we do not store the exec results... // NOTE: not intended for direct use and will likely have no effect // if called directly... - // - // XXX should this support a task being returning a queue??? - // XXX need to configure this to run a number of tasks only... __running: null, - _run: function(){ + __run_tasks__: function(){ + var that = this + // if we are not running stop immidiately... if(this.state != 'running'){ return this } - var that = this - var running = this.__running = this.__running || [] - // handle queue... while(this.length > 0 && this.state == 'running' - && running.length < (this.pool_size || Infinity) ){ - - var task = this.shift() - // XXX BUG this executes the task for some reson... - this.trigger('taskStarting', task) - - // run... - var res = this.__run_task__(task) - - // pool async (promise) task... - if(typeof((res || {}).finally) == 'function' - // one post handler is enough... - && !running.includes(res)){ - running.push(res) - res.finally(function(){ - // remove from running... - running.splice(0, running.length, - // NOTE: there can be multiple occurences of res... - ...running - .filter(function(e){ return e !== res })) - // finishup... - that - // XXX BUG this executes the task for some reson... - .trigger('taskCompleted', task, res) - ._run() }) - - // re-queue tasks... - // XXX revise... - } else if(typeof(res) == 'function'){ - this.push(res) - - // queue... - } else if(res instanceof Queue){ - // XXX should this be done on stop or on .length == 0??? - if(queue.state == 'stopped'){ - this.trigger('taskCompleted', task, res) - - } else { - running.push(res) - res.stop(function(){ - // XXX not done yet -- re-queue... - res.length > 0 - && that.push(res) - // XXX remove from running... - // XXX trigger event... - }) } - - // completed sync task... - } else { - // XXX BUG this executes the task for some reson... - this.trigger('taskCompleted', task, res) } } + && (this.__running || []).length < (this.pool_size || Infinity) ){ + this.run(this.__run_tasks__.bind(this)) } // empty queue -> pole or stop... // @@ -201,7 +164,7 @@ module.Queue = object.Constructor('Queue', Array, { // - the pool is full // - the queue is empty // NOTE: we do not care about stopping the timer when changing - // state as ._run() will stop itself... + // state as .__run_tasks__() will stop itself... // // XXX will this be collected by the GC if it is polling??? if(this.length == 0 @@ -214,8 +177,88 @@ module.Queue = object.Constructor('Queue', Array, { // pole... : (this.poling_delay && setTimeout( - this._run.bind(this), + this.__run_tasks__.bind(this), this.poling_delay || 200)) } + return this }, + + // run one task from queue... + // + // NOTE: this does not care about .state... + run: function(next){ + var running = this.__running = this.__running || [] + + // can't run... + if(this.length == 0 + || running.length >= this.pool_size ){ + return this } + + var cleanupRunning = function(res){ + running.splice(0, running.length, + // NOTE: there can be multiple occurences of res... + ...running + .filter(function(e){ return e !== res })) } + + var task = this.shift() + + this.trigger('taskStarting', task) + + // run... + var res = this.__run_task__(task, next) + + // handle stop... + var stop = res === module.STOP + || res instanceof module.STOP + res = res instanceof module.STOP ? + res.value + : res + stop + && this.stop() + + // handle task results... + // + // queue -- as a set of tasks... + if(res instanceof Queue + && this.sub_queue == 'unwind'){ + if(res.length > 0){ + this.push(res) } + this.trigger('taskCompleted', task, res) + + // queue -- as a task... + } else if(res instanceof Queue + && this.sub_queue == 'wait'){ + if(res.state == 'stopped'){ + this.trigger('taskCompleted', task, res) + + } else { + running.push(res) + res.stop(function(){ + cleanupRunning(res) + // not done yet -- re-queue... + res.length > 0 + && that.push(res) + that.trigger('taskCompleted', task, res) + !stop && next + && next() }) } + + // pool async (promise) task... + } else if(typeof((res || {}).finally) == 'function' + // one post handler is enough... + && !running.includes(res)){ + running.push(res) + res.finally(function(){ + cleanupRunning(res) + // finishup... + that.trigger('taskCompleted', task, res) + !stop && next + && next() }) + + // re-queue tasks... + } else if(typeof(res) == 'function'){ + this.push(res) + + // completed sync task... + } else { + this.trigger('taskCompleted', task, res) } return this }, @@ -233,11 +276,14 @@ module.Queue = object.Constructor('Queue', Array, { // -> queue // __init__: function(options){ - if(this[0] instanceof Object + // options... + if(!(this[0] instanceof Queue) + && this[0] instanceof Object && typeof(this[0]) != 'function' && typeof(this[0].finally) != 'function'){ Object.assign(this, this.shift()) } - this._run() }, + // see if we need to start... + this.__run_tasks__() }, }))