diff --git a/lib/ThroatQueueFunction.js b/lib/ThroatQueueFunction.js index 8ea9166..0974877 100644 --- a/lib/ThroatQueueFunction.js +++ b/lib/ThroatQueueFunction.js @@ -12,7 +12,7 @@ function ThroatQueueFunction(n = 5){ cancellationState.deferredWrap(deferred) running[0] = deferred.promise try { - await Q.safeRace(running) + await Q.cancelledRace(running, false) } finally { deferred.resolve(null) } @@ -20,15 +20,21 @@ function ThroatQueueFunction(n = 5){ async function rFn(cancellationState, what, idObj){ await NextTickPromise + let ret try { if(typeof what === 'function'){ what = what() } - return await cancellationState.promiseWrap(what) - } finally { + ret = await cancellationState.promiseWrap(what) + } catch(ex){ running.delete(idObj) executing.delete(idObj) + throw ex } + + running.delete(idObj) + executing.delete(idObj) + return ret } let ret = async function(cancellationState, what, backpressure = false){ @@ -72,16 +78,15 @@ function ThroatQueueFunction(n = 5){ // call the function on the next tick const r = rFn(cancellationState, what, dTrack.promise) - // wait for this to be done - const d = Q.defer() + // wait for this to be done] + let fnResult try { - const fnResult = await r - dTrack.resolve() - d.resolve(fnResult) + fnResult = await r } catch(ex) { dTrack.resolve() - d.reject(ex) + throw ex } + dTrack.resolve() // if we are running too many, wait for one to finish if(backpressure) { @@ -97,7 +102,7 @@ function ThroatQueueFunction(n = 5){ } } - return await d.promise + return fnResult } ret = Q.canceller(ret)