diff --git a/lib/ThroatQueueFunction.js b/lib/ThroatQueueFunction.js index c4fe662..1641dcd 100644 --- a/lib/ThroatQueueFunction.js +++ b/lib/ThroatQueueFunction.js @@ -4,6 +4,12 @@ const NextTickPromise = Q() function ThroatQueueFunction(n = 5){ const running = [] + async function race(cancellationState, running){ + const deferred = Q.defer() + cancellationState.deferredWrap(deferred) + running = [...running, deferred.promise] + await Q.safeRace(running) + } let ret = async function(cancellationState, what){ if(what === null){ if(running.length === 0) return running @@ -13,8 +19,10 @@ function ThroatQueueFunction(n = 5){ // This shouldn't happen if we correctly await on the throat while(running.length >= n){ - await cancellationState.promiseWrap(Q.cancelledRace(running)) + await race(cancellationState, running) } + + const idObj = {} // call fn const rFn = async ()=>{ @@ -34,18 +42,23 @@ function ThroatQueueFunction(n = 5){ } } - const idObj = {} const r = rFn() - r.id = idObj - r.fn = what - r.cancel = ()=>{ + const d = Q.defer() + const rr = d.promise + rr.id = idObj + rr.fn = what + rr.cancel = ()=>{ cancellationState.cancel() } - running.push(r) - await r - + running.push(rr) + try { + await r + } finally { + r.then(d.resolve, d.reject) + } + while(running.length >= n){ - await cancellationState.promiseWrap(Q.cancelledRace(running)) + await race(cancellationState, running) } }