Skip to content

Commit

Permalink
ThroatQueueFunction becomes cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Nov 29, 2022
1 parent 2bd71b3 commit 8a4bce1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 4 deletions.
13 changes: 9 additions & 4 deletions lib/ThroatQueueFunction.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const NextTickPromise = Q()

function ThroatQueueFunction(n = 5){
const running = []
const ret = async function(what){
let ret = async function(cancellationState, what){
if(what === null){
if(running.length === 0) return running
await Promise.all(running)
Expand All @@ -13,7 +13,7 @@ function ThroatQueueFunction(n = 5){

// This shouldn't happen if we correctly await on the throat
while(running.length >= n){
await Q.safeRace(running)
await cancellationState.promiseWrap(Q.cancelledRace(running))
}

// call fn
Expand All @@ -23,7 +23,7 @@ function ThroatQueueFunction(n = 5){
if(typeof what === 'function'){
what = what()
}
return await what
return await cancellationState.promiseWrap(what)
} finally {
for(let i = 0 ; i < running.length; i++){
if(running[i].id === idObj){
Expand All @@ -38,14 +38,19 @@ function ThroatQueueFunction(n = 5){
const r = rFn()
r.id = idObj
r.fn = what
r.cancel = ()=>{
cancellationState.cancel()
}
running.push(r)
await r

while(running.length >= n){
await Q.safeRace(running)
await cancellationState.promiseWrap(Q.cancelledRace(running))
}
}

ret = Q.canceller(ret)

ret.running = running

return ret
Expand Down
59 changes: 59 additions & 0 deletions test/throat_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,65 @@ describe('ThroatQueueFunction', function(){
await tf(null)
expect(count).to.be.eql(10)

})
it('should run 5 times only if cancelled from fn', async() => {
const tf = ThroatQueueFunction(5)

let count = 0
const deferred = Q.defer()
const p = []
for(let i = 0; i < 10; i++){
p.push(tf(()=>{
count++
return deferred.promise
}))
}

await Q.delay(10)


expect(count).to.be.eql(5)

for(const pp of p){
pp.cancel()
}

deferred.resolve(true)

await tf(null)
expect(count).to.be.eql(5)

})
it('should run 5 times only if cancelled from running', async() => {
const tf = ThroatQueueFunction(5)

let count = 0
const deferred = Q.defer()
const p = []
for(let i = 0; i < 10; i++){
p.push(tf(Q.canceller(async(cancellationState)=>{
await cancellationState.promiseWrap(deferred.promise)
count++
})))
}

await Q.delay(10)


expect(tf.running.length).to.be.eql(5)

for(const pp of tf.running){
pp.cancel()
}

deferred.resolve(true)


expect(tf.running.length).to.be.eql(5)

await tf(null)
expect(count).to.be.eql(5)

})
it('should capture stack trace', async() => {
async function testFn(){
Expand Down

0 comments on commit 8a4bce1

Please sign in to comment.