ironing out issues with chained queueus...

Signed-off-by: Alex A. Naanou <alex.nanou@gmail.com>
This commit is contained in:
Alex A. Naanou 2020-12-16 03:50:06 +03:00
parent 00988feb36
commit 1824eeddb2
2 changed files with 62 additions and 18 deletions

View File

@ -2548,7 +2548,10 @@ function(title, func){
return Object.assign(
this.tasks.Task(...pre_args, func.bind(this), ...args),
// make this searchable by .tasks.named(..)...
{ name: action.name }) }),
{
__session_task__: action.__session_task__,
name: action.name,
}) }),
{
title,
toString: function(){
@ -2617,7 +2620,10 @@ function(title, func){
var res = func.call(that, ...args)
resolve(res)
return res }),
{ title: action.name }) }) }),
{
__session_task__: action.__session_task__,
title: action.name,
}) }) }),
{
title,
toString: function(){
@ -2681,6 +2687,12 @@ function(title, func){
// XXX might be a good idea to split this into a generic and domain parts
// and move the generic part into types/runner...
// XXX check if item is already in queue...
// XXX BUG: when chaining queues, aborting both queues works fine, aborting
// one for some reason does not lead to aborting the other...
// ...this can be fixed by clearing the queue on abort but that is
// a stub, need to figure out the real reason...
// see: .exampleChainedQueueHandler([1,2,'abort',4])
// ...'abort' will stop pre-processing but will not prevent processing...
var queueHandler =
module.queueHandler =
function(title, func){
@ -2726,6 +2738,7 @@ function(title, func){
{},
opts || {},
{
__session_task__: action.__session_task__,
// XXX not sure about this...
//auto_stop: true,
handler: function([item, args]){
@ -2742,18 +2755,29 @@ function(title, func){
items.map(function(e){
return [e, ...args] })
: [items, ...args]))
// make a promise...
return new Promise(function(resolve, reject){
q.then(resolve, reject) }) } }
return q.promise() } }
// pre-process args...
arg_handler
&& (inputs = arg_handler.call(this, q, inputs[0], ...inputs.slice(1)))
&& (inputs = arg_handler.call(this, q, ...inputs))
// run...
return (inputs instanceof Promise
|| inputs instanceof runner.Queue) ?
inputs.then(function(items){
return run([items, ...args]) })
return inputs instanceof Promise ?
// XXX BUG? .then(resolve) is triggered even if inputs was
// stopped (should not resolve) or aborted (should reject)...
inputs.then(
function(items){
return run([items, ...args]) },
function(){
q && q.abort() })
: inputs instanceof runner.Queue ?
inputs
.on('stop', function(){
q && q.stop() })
.on('abort', function(){
q && q.abort() })
.then(function(items){
return run([items, ...args]) })
: run(inputs) }),
{
title,
@ -2809,7 +2833,10 @@ var TaskActions = actions.Actions({
// session tasks are stopped when the index is cleared...
// XXX need to get running tasks by action name...
get sessionTasks(){
return this.tasks.titled(...this.sessionTaskActions) },
//return this.tasks.titled(...this.sessionTaskActions) },
return this.tasks
.filter(function(task){
return task.__session_task__ }) },
// Queue (task)...
@ -2863,7 +2890,10 @@ var TaskActions = actions.Actions({
if(queue == null){
var abort = function(){
options.nonAbortable
|| queue.clear() }
|| queue
// XXX abort or stop???
.abort()
.clear() }
var cleanup = function(){
return function(){
queue.stop()
@ -2895,8 +2925,10 @@ var TaskActions = actions.Actions({
.on('taskFailed', function(evt, t, err){
this.logger && this.logger.emit('skipped'+suffix, t, err) })
.on('stop', function(){
this.logger
&& this.logger.emit('reset')
this.logger && this.logger.emit('reset')
this.clear() })
.on('abort', function(){
this.logger && this.logger.emit('reset')
this.clear() })
// cleanup...
queue
@ -2924,7 +2956,9 @@ module.Tasks = ImageGridFeatures.Feature({
handlers: [
// stop session tasks...
['clear',
'sessionTasks.stop'],
// XXX BUG: for some reason calling .abort here does not work...
//'sessionTasks.stop'],
'sessionTasks.abort'],
],
})

View File

@ -344,12 +344,22 @@ var ExampleActions = actions.Actions({
exampleChainedQueueHandler: ['- Test/',
core.queueHandler('Main queue',
core.queueHandler('Sub queue',
// pre-prepare the inputs (sync)...
function(outer_queue, inner_queue, items, ...args){
console.log('### PRE-PREP', items, ...args)
return [items, ...args] },
function(item, ...args){
console.log('### PREP', item, ...args)
return [items, outer_queue, ...args] },
//return [items, inner_queue, ...args] },
// prepare inputs (async/queue)...
function(item, q, ...args){
console.log('### PREP', q.state, item, ...args)
item == 'abort'
&& q.abort()
&& console.log('### ABORT', q)
item == 'stop'
&& q.stop()
&& console.log('### STOP')
return item+1 }),
// handle inputs (async/queue)...
function(item, ...args){
console.log('### HANDLE', item, ...args)
return item*2 }) ],