diff --git a/lib/ThroatQueueFunction.js b/lib/ThroatQueueFunction.js index 09220b0..839111d 100644 --- a/lib/ThroatQueueFunction.js +++ b/lib/ThroatQueueFunction.js @@ -1,61 +1,64 @@ const Q = require ('@halleyassist/q-lite') const NextTickPromise = Q() +const EmptyArray = [] function ThroatQueueFunction(n = 5){ - const running = [] + const running = new Set() async function race(cancellationState, running){ const deferred = Q.defer() cancellationState.deferredWrap(deferred) - running = [...running, deferred.promise] + running = [deferred.promise, ...running] try { await Q.safeRace(running) } finally { deferred.resolve(null) } } + + async function rFn(cancellationState, what, idObj){ + await NextTickPromise + try { + if(typeof what === 'function'){ + what = what() + } + return await cancellationState.promiseWrap(what) + } finally { + running.delete(idObj) + } + } + let ret = async function(cancellationState, what){ if(what === null){ - if(running.length === 0) return running + if(running.size === 0) return EmptyArray await Promise.all(running) - return running + if(running.size === 0) return EmptyArray + + // while we waited new jobs were added + return [...running] } // This shouldn't happen if we correctly await on the throat - while(running.length >= n){ + while(running.size >= n){ await race(cancellationState, running) } - - const idObj = {} - - // call fn - const rFn = async ()=>{ - await NextTickPromise - try { - if(typeof what === 'function'){ - what = what() - } - return await cancellationState.promiseWrap(what) - } finally { - for(let i = 0 ; i < running.length; i++){ - if(running[i].id === idObj){ - running.splice(i, 1) - break - } - } - } - } - const r = rFn() - const d = Q.defer() + // will be used as an id const dTrack = Q.defer() - dTrack.promise.id = idObj dTrack.promise.fn = what dTrack.promise.cancel = ()=>{ cancellationState.cancel() } - running.push(dTrack.promise) + + // call the function on the next tick + const r = rFn(cancellationState, what, dTrack.promise) + + // we are now running + running.add(dTrack.promise) + + // wait for this to be done + const d = Q.defer() try { const fnResult = await r dTrack.resolve() @@ -65,7 +68,8 @@ function ThroatQueueFunction(n = 5){ d.reject(ex) } - while(running.length >= n){ + // if we are running too many, wait for one to finish + while(running.size >= n){ await race(cancellationState, running) } @@ -74,7 +78,13 @@ function ThroatQueueFunction(n = 5){ ret = Q.canceller(ret) - ret.running = running + Object.defineProperty(ret, 'running', { + get: function(){ + return [...running] + }, + enumerable: true, + configurable: true + }); return ret }