Skip to content

Commit

Permalink
use cancelled race and ensure correct order of operations
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Sep 14, 2023
1 parent 7c22a1a commit 391ed2b
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions lib/ThroatQueueFunction.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,29 @@ function ThroatQueueFunction(n = 5){
cancellationState.deferredWrap(deferred)
running[0] = deferred.promise
try {
await Q.safeRace(running)
await Q.cancelledRace(running, false)
} finally {
deferred.resolve(null)
}
}

async function rFn(cancellationState, what, idObj){
await NextTickPromise
let ret
try {
if(typeof what === 'function'){
what = what()
}
return await cancellationState.promiseWrap(what)
} finally {
ret = await cancellationState.promiseWrap(what)
} catch(ex){
running.delete(idObj)
executing.delete(idObj)
throw ex
}

running.delete(idObj)
executing.delete(idObj)
return ret
}

let ret = async function(cancellationState, what, backpressure = false){
Expand Down Expand Up @@ -72,16 +78,15 @@ function ThroatQueueFunction(n = 5){
// call the function on the next tick
const r = rFn(cancellationState, what, dTrack.promise)

// wait for this to be done
const d = Q.defer()
// wait for this to be done]
let fnResult
try {
const fnResult = await r
dTrack.resolve()
d.resolve(fnResult)
fnResult = await r
} catch(ex) {
dTrack.resolve()
d.reject(ex)
throw ex
}
dTrack.resolve()

// if we are running too many, wait for one to finish
if(backpressure) {
Expand All @@ -97,7 +102,7 @@ function ThroatQueueFunction(n = 5){
}
}

return await d.promise
return fnResult
}

ret = Q.canceller(ret)
Expand Down

0 comments on commit 391ed2b

Please sign in to comment.