more refactoring...

Signed-off-by: Alex A. Naanou <alex.nanou@gmail.com>
This commit is contained in:
Alex A. Naanou 2020-11-23 02:59:18 +03:00
parent 98e5346450
commit 5253cef665
4 changed files with 153 additions and 103 deletions

View File

@ -19,7 +19,12 @@ var generator = require('./generator')
/*********************************************************************/
var STOP =
module.STOP =
object.STOP
/*
var StopIteration =
module.StopIteration =
object.Constructor('StopIteration', Error, {
@ -31,6 +36,7 @@ module.StopIteration =
__init__: function(msg){
this.msg = msg },
})
//*/
@ -38,20 +44,20 @@ module.StopIteration =
// Mixins...
// Equivalent to .map(..) / .filter(..) / .reduce(..) / .. with support for
// StopIteration...
// STOP...
//
// NOTE: these add almost no overhead to the iteration.
// NOTE: these will not return a partial result if stopped.
//
// XXX should these return a partial result on StopIteration?
// XXX should these return a partial result on STOP?
var wrapIterFunc = function(iter){
return function(func){
try {
return this[iter](...arguments)
} catch(err){
if(err === StopIteration){
if(err === STOP){
return
} else if( err instanceof StopIteration){
} else if( err instanceof STOP){
return err.msg }
throw err } } }
@ -87,14 +93,14 @@ var wrapIterFunc = function(iter){
// '20C' - number of chunks
//
//
// StopIteration can be thrown in func or chunk_handler at any time to
// STOP can be thrown in func or chunk_handler at any time to
// abort iteration, this will reject the promise.
//
//
// The main goal of this is to not block the runtime while processing a
// very long array by interrupting the processing with a timeout...
//
// XXX should these return a partial result on StopIteration?
// XXX should these return a partial result on STOP?
// XXX add generators:
// .map(..) / .filter(..) / .reduce(..)
// ...the basis here should be the chunks, i.e. each cycle should
@ -133,11 +139,11 @@ var makeChunkIter = function(iter, wrapper){
postChunk
&& postChunk.call(this, this, res, 0)
return Promise.all(res)
// handle StopIteration...
// handle STOP...
} catch(err){
if(err === StopIteration){
if(err === STOP){
return Promise.reject()
} else if( err instanceof StopIteration){
} else if( err instanceof STOP){
return Promise.reject(err.msg) }
throw err } }
@ -158,11 +164,11 @@ var makeChunkIter = function(iter, wrapper){
chunk.map(function([i, v]){ return v }),
val,
chunk[0][0])
// handle StopIteration...
// handle STOP...
} catch(err){
if(err === StopIteration){
if(err === STOP){
return reject()
} else if( err instanceof StopIteration){
} else if( err instanceof STOP){
return reject(err.msg) }
throw err }

View File

@ -9,8 +9,6 @@
var object = require('ig-object')
//var {StopIteration} = require('./Array')
/*********************************************************************/

View File

@ -1,6 +1,6 @@
{
"name": "ig-types",
"version": "3.7.15",
"version": "4.0.0",
"description": "Generic JavaScript types and type extensions...",
"main": "main.js",
"scripts": {

222
runner.js
View File

@ -28,8 +28,15 @@ var events = require('./event')
/*********************************************************************/
module.STOP = object.STOP
//---------------------------------------------------------------------
// XXX need to configure to run a specific amount of jobjs on each start...
// XXX do we need an async mode -- exec ._run(..) in a setTimeout(.., 0)???
// XXX do we need an async mode -- exec .__run_tasks__(..) in a
// setTimeout(.., 0)???
var Queue =
module.Queue = object.Constructor('Queue', Array, {
// create a running queue...
@ -45,7 +52,22 @@ module.Queue = object.Constructor('Queue', Array, {
auto_stop: false,
//
// This can be:
// 'wait' - wait fot the sun-queue to stop
// 'unwind' - run sub-task and requeue parent
//
// XXX do we need this???
// XXX should the nested queue decide???
// ...how???
sub_queue: 'unwind',
//
// This can be:
// 'running'
// 'stopped'
//
__state: null,
get state(){
return this.__state
@ -56,11 +78,6 @@ module.Queue = object.Constructor('Queue', Array, {
} else if(value == 'stopped'){
this.stop() } },
// XXX
run: function(count){
// XXX start, run count tasks and stop...
},
// events/actions - state transitions...
//
// XXX would be nice to run a number of tasks...
@ -69,14 +86,13 @@ module.Queue = object.Constructor('Queue', Array, {
if(this.state == 'running'){
return handle(false) }
this.__state = 'running'
this._run() }),
this.__run_tasks__() }),
stop: events.Event('stop', function(handle){
// can't stop while not running...
if(this.state == 'stopped'){
return handle(false) }
this.__state = 'stopped' }),
// events/actions - state transitions...
//
clear: events.Event(function(handler){
@ -89,111 +105,58 @@ module.Queue = object.Constructor('Queue', Array, {
taskCompleted: events.Event('taskCompleted'),
queueEmpty: events.Event('queueEmpty'),
// helpers...
//
// XXX how do we reference the tasks here???
// - indexes
// - ranges -- simelar to .slice(..)
// - by value
// XXX
// move tasks to head/tail of queue resp.
prioritize: function(...tasks){
return this.sortAs(tasks) },
// same as prioritize but adds stuff to the tail...
delay: function(...tasks){
return this.sortAs(tasks, true) },
// Runner API...
//
// .__run_task__(task)
// Run the given task type...
// .__run_task__(task[, next])
// -> STOP
// -> STOP(value)
// -> promise
// -> func
// -> queue
// -> ...
//
// XXX should this support a task being a queue ???
// XXX do we actually need this??
// ...should this include result processing???
__run_task__: function(task){
__run_task__: function(task, next){
return typeof(task) == 'function' ?
task()
: task instanceof Queue ?
// XXX should we run one item or trigger and wait for event???
// ...should this be an option???
: (task instanceof Queue
&& this.sub_queue == 'unwind') ?
(task.run(next), task)
: (task instanceof Queue
&& this.sub_queue == 'wait') ?
task.start()
: task },
// main runner...
//
// Hanlde 'running' state...
// .__run_tasks__()
// -> this
//
// NOTE: we do not store the exec results...
// NOTE: not intended for direct use and will likely have no effect
// if called directly...
//
// XXX should this support a task being returning a queue???
// XXX need to configure this to run a number of tasks only...
__running: null,
_run: function(){
__run_tasks__: function(){
var that = this
// if we are not running stop immidiately...
if(this.state != 'running'){
return this }
var that = this
var running = this.__running = this.__running || []
// handle queue...
while(this.length > 0
&& this.state == 'running'
&& running.length < (this.pool_size || Infinity) ){
var task = this.shift()
// XXX BUG this executes the task for some reson...
this.trigger('taskStarting', task)
// run...
var res = this.__run_task__(task)
// pool async (promise) task...
if(typeof((res || {}).finally) == 'function'
// one post handler is enough...
&& !running.includes(res)){
running.push(res)
res.finally(function(){
// remove from running...
running.splice(0, running.length,
// NOTE: there can be multiple occurences of res...
...running
.filter(function(e){ return e !== res }))
// finishup...
that
// XXX BUG this executes the task for some reson...
.trigger('taskCompleted', task, res)
._run() })
// re-queue tasks...
// XXX revise...
} else if(typeof(res) == 'function'){
this.push(res)
// queue...
} else if(res instanceof Queue){
// XXX should this be done on stop or on .length == 0???
if(queue.state == 'stopped'){
this.trigger('taskCompleted', task, res)
} else {
running.push(res)
res.stop(function(){
// XXX not done yet -- re-queue...
res.length > 0
&& that.push(res)
// XXX remove from running...
// XXX trigger event...
}) }
// completed sync task...
} else {
// XXX BUG this executes the task for some reson...
this.trigger('taskCompleted', task, res) } }
&& (this.__running || []).length < (this.pool_size || Infinity) ){
this.run(this.__run_tasks__.bind(this)) }
// empty queue -> pole or stop...
//
@ -201,7 +164,7 @@ module.Queue = object.Constructor('Queue', Array, {
// - the pool is full
// - the queue is empty
// NOTE: we do not care about stopping the timer when changing
// state as ._run() will stop itself...
// state as .__run_tasks__() will stop itself...
//
// XXX will this be collected by the GC if it is polling???
if(this.length == 0
@ -214,8 +177,88 @@ module.Queue = object.Constructor('Queue', Array, {
// pole...
: (this.poling_delay
&& setTimeout(
this._run.bind(this),
this.__run_tasks__.bind(this),
this.poling_delay || 200)) }
return this },
// run one task from queue...
//
// NOTE: this does not care about .state...
run: function(next){
var running = this.__running = this.__running || []
// can't run...
if(this.length == 0
|| running.length >= this.pool_size ){
return this }
var cleanupRunning = function(res){
running.splice(0, running.length,
// NOTE: there can be multiple occurences of res...
...running
.filter(function(e){ return e !== res })) }
var task = this.shift()
this.trigger('taskStarting', task)
// run...
var res = this.__run_task__(task, next)
// handle stop...
var stop = res === module.STOP
|| res instanceof module.STOP
res = res instanceof module.STOP ?
res.value
: res
stop
&& this.stop()
// handle task results...
//
// queue -- as a set of tasks...
if(res instanceof Queue
&& this.sub_queue == 'unwind'){
if(res.length > 0){
this.push(res) }
this.trigger('taskCompleted', task, res)
// queue -- as a task...
} else if(res instanceof Queue
&& this.sub_queue == 'wait'){
if(res.state == 'stopped'){
this.trigger('taskCompleted', task, res)
} else {
running.push(res)
res.stop(function(){
cleanupRunning(res)
// not done yet -- re-queue...
res.length > 0
&& that.push(res)
that.trigger('taskCompleted', task, res)
!stop && next
&& next() }) }
// pool async (promise) task...
} else if(typeof((res || {}).finally) == 'function'
// one post handler is enough...
&& !running.includes(res)){
running.push(res)
res.finally(function(){
cleanupRunning(res)
// finishup...
that.trigger('taskCompleted', task, res)
!stop && next
&& next() })
// re-queue tasks...
} else if(typeof(res) == 'function'){
this.push(res)
// completed sync task...
} else {
this.trigger('taskCompleted', task, res) }
return this },
@ -233,11 +276,14 @@ module.Queue = object.Constructor('Queue', Array, {
// -> queue
//
__init__: function(options){
if(this[0] instanceof Object
// options...
if(!(this[0] instanceof Queue)
&& this[0] instanceof Object
&& typeof(this[0]) != 'function'
&& typeof(this[0].finally) != 'function'){
Object.assign(this, this.shift()) }
this._run() },
// see if we need to start...
this.__run_tasks__() },
}))