reworked queued action + now .cacheMetadata(..) is queued...

Signed-off-by: Alex A. Naanou <alex.nanou@gmail.com>
This commit is contained in:
Alex A. Naanou 2020-12-02 18:49:45 +03:00
parent 71b9b444cb
commit 7122c619f4
6 changed files with 251 additions and 268 deletions

View File

@ -2520,6 +2520,7 @@ function(func){
// ...would also be nice to automate this via a chunk iterator so // ...would also be nice to automate this via a chunk iterator so
// as not to block... // as not to block...
// XXX handle errors... (???) // XXX handle errors... (???)
// XXX revise logging and logger passing...
var queuedAction = var queuedAction =
module.queuedAction = module.queuedAction =
function(name, func){ function(name, func){
@ -2543,25 +2544,69 @@ function(name, func){
}) } }) }
//
//
// queueHandler(name[, opts][, arg_handler], func)
// -> action
//
//
// arg_handler(...args)
// -> [items, ...args]
//
//
// action(items, ...args)
// -> promise
//
// action('sync', items, ...args)
// -> promise
//
//
// func(item, ...args)
// -> res
//
//
// XXX should 'sync' set .sync_start or just .runTask(..)
// XXX check if item is already in queue...
var queueHandler = var queueHandler =
module.queueHandler = module.queueHandler =
function(name, func){ function(name, func){
var args = [...arguments] var args = [...arguments]
func = args.pop() func = args.pop()
var arg_handler =
typeof(args.last()) == 'function'
&& args.pop()
var [name, opts] = args var [name, opts] = args
return object.mixin( return object.mixin(
Queued(function(items, ...args){ Queued(function(items, ...args){
var that = this var that = this
return new Promise(function(resolve, reject){ // sync start...
if(arguments[0] == 'sync'){
var [sync, items, ...args] = arguments }
// prep queue...
var q = that.queue(name, var q = that.queue(name,
Object.assign( Object.assign(
{}, {},
opts || {}, opts || {},
{ handler: function(item){ { handler: function(item){
return func.call(that, item, ...args) } })) return func.call(that, item, ...args) } }))
sync
&& (q.sync_start = true)
// pre-process args...
arg_handler
&& ([items, ...args] =
arg_handler.call(this, q, items, ...args))
// fill the queue...
q.push(...(items instanceof Array ? items : [items])) q.push(...(items instanceof Array ? items : [items]))
q.then(resolve, reject) }) }), // make a promise...
var res = new Promise(function(resolve, reject){
q.then(resolve, reject) })
// sync start...
// NOTE: we need to explicitly .start() here because the
// queue could be waiting for a timeout
sync
&& q.start()
return res }),
{ {
toString: function(){ toString: function(){
return `core.queueHandler('${name}',\n\t${ return `core.queueHandler('${name}',\n\t${
@ -2653,13 +2698,13 @@ var TaskActions = actions.Actions({
get queues(){ get queues(){
return (this.__queues = this.__queues || {}) }, return (this.__queues = this.__queues || {}) },
// XXX revise signature... // XXX revise logging and logger passing...
// XXX need better error flow... // XXX need better error flow...
queue: doc('Get or create a queue task', queue: doc('Get or create a queue task',
doc`Get or create a queue task... doc`Get or create a queue task...
.queue(name) .queue(name)
.queue(name[, options][, logger]) .queue(name, options)
-> queue -> queue
If a queue with the given name already exits it will be returned If a queue with the given name already exits it will be returned
@ -2675,7 +2720,7 @@ var TaskActions = actions.Actions({
NOTE: for queue-specific options see ig-types/runner's Queue(..) NOTE: for queue-specific options see ig-types/runner's Queue(..)
`, `,
function(name, options, logger){ function(name, options){
var that = this var that = this
var queue = this.queues[name] var queue = this.queues[name]
@ -2692,21 +2737,25 @@ var TaskActions = actions.Actions({
// && logger.emit('close') // && logger.emit('close')
delete that.queues[name] } } delete that.queues[name] } }
var logger = logger || this.logger options = options || {}
var logger = options.logger || this.logger
//logger = logger && logger.push(name) //logger = logger && logger.push(name)
logger = logger logger = logger
&& logger.push(name, {onclose: abort, quiet: !!options.quiet}) && logger.push(name, {onclose: abort, quiet: !!options.quiet})
logger
&& (options.logger = logger)
queue = this.queues[name] = queue = this.queues[name] =
runner.Queue(options || {}) runner.Queue(options || {})
// setup logging... // setup logging...
if(logger){
queue queue
.on('tasksAdded', function(evt, t){ logger.emit('added', t) }) .on('tasksAdded', function(evt, t){
.on('taskCompleted', function(evt, t){ logger.emit('done', t) }) this.logger && this.logger.emit('added', t) })
.on('taskFailed', function(evt, t, err){ logger.emit('skipped', t, err) }) .on('taskCompleted', function(evt, t){
queue.logger = logger } this.logger && this.logger.emit('done', t) })
.on('taskFailed', function(evt, t, err){
this.logger && this.logger.emit('skipped', t, err) })
// cleanup... // cleanup...
queue queue
.then( .then(

View File

@ -280,7 +280,25 @@ var ExampleActions = actions.Actions({
this.exampleQueuedAction(timeout) } }], this.exampleQueuedAction(timeout) } }],
exampleQueueHandlerAction: ['- Test/', exampleQueueHandlerAction: ['- Test/',
core.queueHandler('exampleQueueHandlerAction', {quiet: true}, core.queueHandler('exampleQueueHandlerAction',
{quiet: true},
function(item, ...args){
console.log('Queue handler action!!', item, ...args)
return new Promise(function(resolve){
setTimeout(resolve, 100) }) })],
exampleQueueHandlerActionWArgs: ['- Test/',
core.queueHandler('exampleQueueHandlerActionWArgs',
{quiet: true},
function(queue, from=0, to=100, ...args){
var items = []
var reverse = from > to
reverse
&& ([from, to] = [to+1, from+1])
for(var i=from; i<to; i++){
items.push(i) }
reverse
&& items.reverse()
return [items, ...args] },
function(item, ...args){ function(item, ...args){
console.log('Queue handler action!!', item, ...args) console.log('Queue handler action!!', item, ...args)
return new Promise(function(resolve){ return new Promise(function(resolve){

View File

@ -115,7 +115,17 @@ var MetadataReaderActions = actions.Actions({
NOTE: also see: .cacheMetadata(..) NOTE: also see: .cacheMetadata(..)
`, `,
core.queueHandler('Read image metadata', function(image, force){ core.queueHandler('Read image metadata',
function(image, force){
return [
images == 'all' ?
this.images.keys()
: images == 'loaded' ?
this.data.getImages('loaded')
: images,
force,
] },
function(image, force){
var that = this var that = this
var gid = this.data.getImage(image) var gid = this.data.getImage(image)
@ -177,7 +187,7 @@ var MetadataReaderActions = actions.Actions({
resolve(data) }) }) }) })], resolve(data) }) }) }) })],
readAllMetadata: ['File/Read all metadata', readAllMetadata: ['File/Read all metadata',
'readMetadata: images.gids ...'], 'readMetadata: "all" ...'],
// XXX take image Metadata and write it to target... // XXX take image Metadata and write it to target...
writeMetadata: ['- Image/Set metadata data', writeMetadata: ['- Image/Set metadata data',

View File

@ -376,7 +376,7 @@ var SharpActions = actions.Actions({
&& Math.max(m.width, m.height) < size) && Math.max(m.width, m.height) < size)
|| (fit == 'outside' || (fit == 'outside'
&& Math.min(m.width, m.height) < size)){ && Math.min(m.width, m.height) < size)){
logger && logger.emit('skipping', gid) //logger && logger.emit('skipping', gid)
return } return }
// continue... // continue...
return img }) return img })
@ -396,7 +396,7 @@ var SharpActions = actions.Actions({
fse.removeSync(to) fse.removeSync(to)
// skip... // skip...
} else { } else {
logger && logger.emit('skipping', gid) //logger && logger.emit('skipping', gid)
return } } return } }
// write... // write...
@ -588,9 +588,8 @@ var SharpActions = actions.Actions({
Promise.reject('aborted') Promise.reject('aborted')
: res.flat() }) })], : res.flat() }) })],
// XXX will this be better off as a queueHandler(..) ???
// XXX add support for offloading the processing to a thread/worker... // XXX add support for offloading the processing to a thread/worker...
__cache_metadata_reading: null, // XXX revise logging and logger passing...
cacheMetadata: ['- Sharp|Image/', cacheMetadata: ['- Sharp|Image/',
core.doc`Cache metadata core.doc`Cache metadata
@ -638,117 +637,37 @@ var SharpActions = actions.Actions({
NOTE: this will effectively update metadata format to the new spec... NOTE: this will effectively update metadata format to the new spec...
NOTE: for info on full metadata format see: .readMetadata(..) NOTE: for info on full metadata format see: .readMetadata(..)
`, `,
core.sessionTaskAction('cacheMetadata', function(ticket, images, logger){ core.queueHandler('Cache image metadata',
{quiet: true},
// parse args...
function(queue, image, logger){
var force = false
if(image === true){
var [force, image, logger] = arguments }
return [
image == 'all' ?
this.images.keys()
: image == 'loaded' ?
this.data.getImages('loaded')
: (image || 'current'),
force,
// XXX
logger || this.logger,
] },
function(image, force, logger){
var that = this var that = this
// setup runtime interactions... // XXX cache the image data???
// var gid = this.data.getImage(image)
// NOTE: we will resolve the ticket when we are fully done var img = this.images[gid]
// and not on stop...
var STOP = false
ticket
.onmessage('stop', function(){
STOP = true })
.then(function(){
// close progress bar...
// NOTE: if we have multiple tasks let the last one
// close the progress bar...
if(that.tasks.titled(ticket.title).length == 0){
logger
&& logger.emit('close') }
that.off('clear', on_close)
// cleanup...
delete that.__cache_metadata_reading })
// clear the progress bar for the next session...
var on_close
this.one('clear', on_close = function(){
logger && logger.emit('close') })
// universal task abort...
// NOTE: this will abort all the tasks of this type...
var abort = function(){
that.tasks.stop(ticket.title) }
var CHUNK_SIZE = 4
// handle logging and processing list...
// NOTE: these will maintain .__cache_metadata_reading helping
// avoid processing an image more than once at the same
// time...
var done = function(gid, msg){
logger && logger.emit(msg || 'done', gid)
if(that.__cache_metadata_reading){
that.__cache_metadata_reading.delete(gid) }
return gid }
var skipping = function(gid){
return done(gid, 'skipping') }
var force = false
if(images === true){
force = true
images = null
} else if(logger === true){
force = true
logger = arguments[2] }
// NOTE: we are caching this to avoid messing things up when
// loading before this was finished...
var cached_images = this.images
// get/normalize images...
//images = images || this.current
images = images
|| 'current'
// keywords...
images =
images == 'all' ?
this.data.getImages('all')
: images == 'loaded' ?
(this.ribbons ?
this.ribbons.getImageGIDs()
: this.data.getImages('all'))
: images == 'current' ?
this.current
: images
images = (images instanceof Array ?
images
: [images])
.filter(function(gid){
return !that.__cache_metadata_reading
|| !that.__cache_metadata_reading.has(gid) })
logger = logger !== false ?
(logger || this.logger)
: false
logger = logger
&& logger.push('Caching image metadata', {onclose: abort, quiet: true})
logger && logger.emit('queued', images)
/*/ XXX set this to tmp for .location.load =='loadImages'
// XXX add preview cache directory...
// - user defined path
// - cleanable
// partially (remove orphans) / full...
// - not sure how to index...
var base_path = that.location.load == 'loadIndex' ?
null
: tmp
/*/
var base_path
//*/
return images
.mapChunks(CHUNK_SIZE, function(gid){
// abort...
if(STOP){
throw Array.STOP('aborted') }
var img = cached_images[gid]
var path = img && that.getImagePath(gid) var path = img && that.getImagePath(gid)
;(that.__cache_metadata_reading =
that.__cache_metadata_reading || new Set()) // XXX
.add(gid) //var base_path = that.location.load == 'loadIndex' ?
// null
// : tmp
//var base_path = img && img.base_path
var base_path
// skip... // skip...
if(!(img && path if(!(img && path
@ -758,17 +677,14 @@ var SharpActions = actions.Actions({
&& img.flipped == null) && img.flipped == null)
// update metadata... // update metadata...
|| (img.metadata || {}).ImageGridMetadata == null))){ || (img.metadata || {}).ImageGridMetadata == null))){
skipping(gid)
return } return }
// XXX handle/report errors...
return sharp(that.getImagePath(gid)) return sharp(that.getImagePath(gid))
.metadata() .metadata()
.catch(function(){
skipping(gid) })
.then(function(metadata){ .then(function(metadata){
// no metadata... // no metadata...
if(metadata == null){ if(metadata == null){
skipping(gid)
return } return }
var o = normalizeOrientation(metadata.orientation) var o = normalizeOrientation(metadata.orientation)
@ -821,20 +737,10 @@ var SharpActions = actions.Actions({
that.ribbons that.ribbons
&& that.ribbons.updateImage(gid) && that.ribbons.updateImage(gid)
return done(gid) }) }) return gid }) })],
.then(function(res){ cacheAllMetadata: ['- Sharp/Image/',
ticket.resolve(res)
// XXX do we need this???
return res == 'aborted' ?
Promise.reject('aborted')
: res
}) })],
cacheAllMetadata: ['- Sharp|Image/',
core.doc`Cache all metadata
NOTE: this is a shorthand to .cacheMetadata('all', ..)`,
'cacheMetadata: "all" ...'], 'cacheMetadata: "all" ...'],
// shorthands... // shorthands...
// XXX do we need these??? // XXX do we need these???
// ...better have a task manager UI... // ...better have a task manager UI...
@ -889,7 +795,7 @@ module.Sharp = core.ImageGridFeatures.Feature({
// drawn... // drawn...
;((this.images[gid] || {}).metadata || {}).ImageGridMetadata ;((this.images[gid] || {}).metadata || {}).ImageGridMetadata
|| this.cacheMetadata('sync', gid, false) || this.cacheMetadata('sync', gid, false)
.then(function([res]){ .then(function(res){
res res
&& that.logger && that.logger
&& that.logger.emit('Cached metadata for', gid) }) }], && that.logger.emit('Cached metadata for', gid) }) }],

View File

@ -1110,9 +1110,9 @@
"integrity": "sha512-9kZM80Js9/eTwXN9VXwLDC1wDJ7gIAdYU9GIzb5KJmNcLAMaW+zhgFrwFFMrcSfggUuadgnqSrS41E4XLe8JZw==" "integrity": "sha512-9kZM80Js9/eTwXN9VXwLDC1wDJ7gIAdYU9GIzb5KJmNcLAMaW+zhgFrwFFMrcSfggUuadgnqSrS41E4XLe8JZw=="
}, },
"ig-types": { "ig-types": {
"version": "5.0.30", "version": "5.0.32",
"resolved": "https://registry.npmjs.org/ig-types/-/ig-types-5.0.30.tgz", "resolved": "https://registry.npmjs.org/ig-types/-/ig-types-5.0.32.tgz",
"integrity": "sha512-qiE99PB96iEYeygRQTaB1CnJR+1AAAQ/qMmXBKVsbIroAQPveRVXptJIEcPGu6t8AdshibcaLkmLvcEHjMbN0A==", "integrity": "sha512-AKfatN0z3hURn9J7JSCaImZnkpr42WMozVLBJeAeC0urkLEU3NUcMEmuDR57dsI5vu9A3d+tyIiRxd5If/3VaQ==",
"requires": { "requires": {
"ig-object": "^5.4.12", "ig-object": "^5.4.12",
"object-run": "^1.0.1" "object-run": "^1.0.1"

View File

@ -32,7 +32,7 @@
"ig-argv": "^2.15.0", "ig-argv": "^2.15.0",
"ig-features": "^3.4.2", "ig-features": "^3.4.2",
"ig-object": "^5.4.12", "ig-object": "^5.4.12",
"ig-types": "^5.0.30", "ig-types": "^5.0.32",
"moment": "^2.29.1", "moment": "^2.29.1",
"object-run": "^1.0.1", "object-run": "^1.0.1",
"requirejs": "^2.3.6", "requirejs": "^2.3.6",