refactoring + minor stuff...

Signed-off-by: Alex A. Naanou <alex.nanou@gmail.com>
This commit is contained in:
Alex A. Naanou 2020-12-03 05:17:59 +03:00
parent b440426bb6
commit 6d5d56b510
2 changed files with 70 additions and 24 deletions

View File

@ -1,6 +1,6 @@
{
"name": "ig-types",
"version": "5.0.34",
"version": "5.0.35",
"description": "Generic JavaScript types and type extensions...",
"main": "main.js",
"scripts": {

View File

@ -63,41 +63,65 @@ object.Constructor('Queue', Array, {
return this({ state: 'running' }, ...tasks) },
}, events.EventMixin('flat', {
// config...
// Config...
//
// Number of tasks to be running at the same time...
pool_size: 8,
// Number of tasks to run in a row before letting go of the exec
// frame...
// Number of tasks to run before letting go of the exec frame...
pause_after_sync: 4,
// XXX revise defaults...
busy_timeout: 50,
poling_timeout: 200,
pause_timeout: 0,
auto_stop: false,
// Start synchronously...
//
// NOTE: this affects the start only, all other timeouts apply as-is...
sync_start: false,
catch_errors: true,
// NOTE: if true this is sync only untill the pool is filled or task
// list is depleted...
sync_start: false,
// XXX
unique_items: false,
// If true, stop after queue is depleted...
auto_stop: false,
// Sub-queue handling mode...
//
// This can be:
// 'wait' - wait fot the sun-queue to stop
// 'unwind' - run sub-task and requeue parent
//
// XXX do we need this???
// XXX should the nested queue decide???
// ...how???
// XXX should the nested queue decide??? ...how???
sub_queue: 'unwind',
// If true only add unique items to queue...
// XXX not implemented yet...
// ...how should this be done?
// - keep a set of seen elements and check against it?
// - check against the queue contents?
unique_items: false,
// Timeouts...
//
// Time to wait when pool is full...
// if 'auto', wait the average task time * .busy_timeout_scale.
// XXX revise defaults...
busy_timeout: 50,
//busy_timeout: 'auto',
busy_timeout_scale: 5,
// Time to wait between checks for new tasks in an empty queue...
poling_timeout: 200,
// Time to pause after a set of .pause_after_sync sync tasks...
pause_timeout: 0,
// Runtime statistics...
//
// To disable set to false
//
// NOTE: this, if true, will get replaced with the stats...
runtime_stats: true,
//
// This can be:
@ -236,7 +260,12 @@ object.Constructor('Queue', Array, {
this.poling_timeout
// busy poling -- pool full...
: c < pause ?
this.busy_timeout
//this.busy_timeout
(this.runtime_stats && this.busy_timeout == 'auto' ?
(this.runtime_stats.avg_t || 50) * (this.busy_timeout_scale || 2)
: this.busy_timeout == 'auto' ?
50 * (this.busy_timeout_scale || 2)
: this.busy_timeout)
// pause -- let other stuff run...
: (this.pause_timeout || 0)
@ -264,6 +293,7 @@ object.Constructor('Queue', Array, {
// run one task from queue...
// NOTE: this does not care about .state...
// XXX revise error handling...
// XXX add task runtime stat...
runTask: function(next){
var that = this
var running = this.__running = this.__running || []
@ -274,18 +304,33 @@ object.Constructor('Queue', Array, {
return this }
// closure: running, task, res, stop, next...
var taskCompleted = function(){
// calculate runtime statistics...
if(that.runtime_stats){
var x = Date.now() - t0
var s = that.runtime_stats =
that.runtime_stats
|| {max_t: x, min_t: x, avg_t: x, count: 0}
s.max_t = Math.max(s.max_t, x)
s.min_t = Math.min(s.min_t, x)
var i = ++s.count
var a = s.avg_t
s.avg_t = a + (x - a)/i }
that.trigger('taskCompleted', task, res) }
var runningDone = function(){
running.splice(0, running.length,
// NOTE: there can be multiple occurences of res...
...running
.filter(function(e){ return e !== res }))
that.trigger('taskCompleted', task, res)
taskCompleted()
!stop && next
&& next() }
var task = this.shift()
this.trigger('taskStarting', task)
var t0 = this.runtime_stats && Date.now()
// run...
// catch and pass errors to .taskFailed(...)
@ -323,13 +368,13 @@ object.Constructor('Queue', Array, {
&& this.sub_queue == 'unwind'){
if(res.length > 0){
this.push(res) }
this.trigger('taskCompleted', task, res)
taskCompleted()
// queue -- as a task...
} else if(res instanceof Queue
&& this.sub_queue == 'wait'){
if(res.state == 'stopped'){
this.trigger('taskCompleted', task, res)
taskCompleted()
} else {
running.push(res)
@ -348,12 +393,12 @@ object.Constructor('Queue', Array, {
// re-queue tasks...
} else if(typeof(res) == 'function'){
that.trigger('taskCompleted', task, res)
taskCompleted()
this.push(res)
// completed sync task...
} else {
this.trigger('taskCompleted', task, res) }
taskCompleted() }
this.length == 0
&& this.trigger('queueEmpty')
@ -393,6 +438,7 @@ object.Constructor('Queue', Array, {
tasks.length > 0
&& this.trigger('tasksAdded', tasks)
return res },
// shorthands...
add: function(...tasks){
this.push(...tasks)