some refactoring...

Signed-off-by: Alex A. Naanou <alex.nanou@gmail.com>
This commit is contained in:
Alex A. Naanou 2015-12-08 06:56:54 +03:00
parent 449d8ceb21
commit 9de88ea5db
2 changed files with 124 additions and 68 deletions

View File

@ -92,19 +92,24 @@ module.Queue = actions.Actions({
},
// NOTE: these are sparse...
ready: null,
running: null,
done: null,
failed: null,
__ready: null,
__running: null,
__done: null,
__failed: null,
get length(){
var lists = ['ready', 'running', 'done', 'failed']
var sum = 0
while(lists.length > 0){
var l = this[lists.pop()]
sum += l == null ? 0 : l.len
}
return sum
var that = this
//return ['__ready', '__running', '__done', '__failed']
return ['__ready', '__running']
.reduce(function(a, b){
return (typeof(a) == typeof(1) ? a
: that[a] ? that[a].len
: 0)
+ (typeof(b) == typeof(1) ? b
: that[b] ? that[b].len
: 0)
}, 0)
},
set length(val){},
@ -118,16 +123,29 @@ module.Queue = actions.Actions({
return this._state || 'stopped'
},
// general events...
// general task life cycle events...
//
// NOTE: these are not intended to be called by the user...
// NOTE: .on('taskQueued') is just a more uniform shorthand for
// .on('enqueue') with one subtle difference, the former does
// not "wrap" the .enqueue(..) method with .pre/.post sub events
// and runs atomic as all other task events.
taskQueued: ['', function(){}],
taskUnqueued: ['', function(){}],
// NOTE: binding to this is not the same as to .unqueue(..), this will
// get triggered once per each task deleted and not per method
// call...
taskDropped: ['', function(){}],
taskStarted: ['', function(){}],
taskDelayed: ['', function(){}],
taskFailed: ['', function(){}],
taskDone: ['', function(){}],
// task manipulation actions...
//
// A task can be either a Promise/A+ or a function. In the case of
// a function this will work sync.
//
// NOTE: these and task events are partly redundent....
enqueue: ['',
function(a, b, c){
@ -142,23 +160,29 @@ module.Queue = actions.Actions({
var mode = b
}
mode = mode || this.config['default-queue-mode']
var ready = this.ready = this.ready || []
var ready = this.__ready = this.__ready || []
// XXX check if task is a task...
// XXX else, wrap in a task...
ready.push([tag, task, mode])
this.taskQueued(tag, task, mode)
// restart in case the queue was depleted...
this._run()
this.taskQueued(tag, task, mode)
}],
unqueue: ['',
function(a, b){
var ready = this.ready
var that = this
var ready = this.__ready
// empty queue...
if(ready == null || ready.length == 0){
if(ready == null || ready.len == 0){
return
}
// special case -- drop all...
if(a == '*'){
ready.splice(0, ready.length)
return
}
@ -178,19 +202,17 @@ module.Queue = actions.Actions({
// only task given...
: tag == null ? e[1] === task
// both task and tag given...
: e[0] != tag && e[1] === task){
: e[0] == tag && e[1] === task){
delete ready[i]
that.taskDropped(e[0], e[1], e[2])
}
})
// XXX
this.taskUnqueued(tag, task)
}],
delay: ['',
function(a, b){
var ready = this.ready
var ready = this.__ready
// empty queue...
if(ready == null || ready.length == 0){
if(ready == null || ready.len == 0){
return
}
@ -227,9 +249,6 @@ module.Queue = actions.Actions({
// restart in case the queue was depleted...
this._run()
// XXX
this.taskDelayed(tag, task)
}],
// Run the queue...
@ -237,11 +256,11 @@ module.Queue = actions.Actions({
// This is not intended for direct use...
//
// This can run in one of two ways:
// 1) run until the .ready queue is completely depleted
// 1) run until the .__ready queue is completely depleted
// This can occur for very fast or sync tasks, essentially
// each iteration will replenish the .running pool until there
// each iteration will replenish the .__running pool until there
// are not task to run.
// 2) load the .running pool and exit
// 2) load the .__running pool and exit
// The first task to finish will run this again to replenish
// the pool.
//
@ -250,61 +269,96 @@ module.Queue = actions.Actions({
//
// XXX need to handle retries correctly, at this point all errors just
// drop to failed and retry counter is incremented, there is no
// flow back to .running
// flow back to .__running
// XXX this shifts the .__ready, this may cause a race with .unqueue(..)
// and .delay(..)
// really do not like setting this up with a for in or .forEach(..)
// as they will really complicate continuous operation...
_run: ['',
function(){
if(this._running){
if(this.__is_running){
return
}
var that = this
var size = this.config['running-pool-size']
this.running = this.running || []
this.__running = this.__running || []
// NOTE: the function in the look here is to clock some
// values in a closure for reuse in promise state
// handlers...
while(this.ready && this.ready.len > 0
// NOTE: we are not using .forEach(..) here because we need
// to stop at abstract places and to see the list live...
while(this.__ready && this.__ready.len > 0
&& this.state == 'running'
&& (this.running && this.running.len || 0) < size){ (function(){
var elem = ready.shift()
var task = elem[0]
that._running = true
&& (this.__running && this.__running.len || 0) < size){ (function(){
that.running.push(elem)
// XXX this might race...
var elem = that.__ready.shift()
if(elem == null){
return
}
task()
// retry or move to failed...
.catch(function(){
// pop self of .running
delete that.running[that.running.indexOf(elem)]
var task = elem[1]
that.__is_running = true
// push self to .failed
var failed = that.failed = that.failed || []
that.__running.push(elem)
// increment retry count...
elem[3] = (elem[3] || 0) + 1
// start the task...
res = task()
that.taskStarted(elem[0], task)
// XXX check task mode and re-queue if needed...
// XXX
failed.push(elem)
})
// push to done and ._run some more...
.then(function(){
// pop self of .running
delete that.running[that.running.indexOf(elem)]
// Promise/A+
if(res && res.then){
res
// retry or move to failed...
.catch(function(){
// pop self of .__running
delete that.__running[that.__running.indexOf(elem)]
// push self to .done
var done = that.done = that.done || []
// push self to .__failed
var failed = that.__failed = that.__failed || []
done.push(elem)
// increment retry count...
elem[3] = (elem[3] || 0) + 1
// run some more...
that._run()
})
// XXX check task mode and re-queue if needed...
// XXX
failed.push(elem)
that.taskFailed(elem[0], task)
})
// push to done and ._run some more...
.then(function(){
// pop self of .__running
delete that.__running[that.__running.indexOf(elem)]
// push self to .__done
var done = that.__done = that.__done || []
done.push(elem)
// run some more...
that._run()
that.taskDone(elem[0], task)
})
// other...
} else {
// pop self of .__running
delete that.__running[that.__running.indexOf(elem)]
// push self to .__done
var done = that.__done = that.__done || []
done.push(elem)
that.taskDone(elem[0], task)
// run some more...
that._run()
}
})() }
that._running = false
delete that.__is_running
}],
// state manipulation actions...
@ -321,10 +375,10 @@ module.Queue = actions.Actions({
clear: ['',
function(){
this.stop()
delete this.ready
delete this.running
delete this.failed
delete this.done
delete this.__ready
delete this.__running
delete this.__failed
delete this.__done
}],
})
@ -334,6 +388,7 @@ module.Queue = actions.Actions({
// XXX
/**********************************************************************
* vim:set ts=4 sw=4 : */
return module })

View File

@ -50,6 +50,7 @@ function makeConstructor(name, a, b){
var obj = {}
obj.__proto__ = _constructor.prototype
// XXX for some reason this does not resolve from .__proto__
// XXX this also is a regular attr and not a prop...
obj.constructor = _constructor
//obj.__proto__.constructor = _constructor