Skip to content

Commit

Permalink
add ThroatQueueFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Nov 29, 2022
1 parent 109397d commit 2bbf5ad
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ await sync(async()=>{

## Usage as throat

A throat is an asyncronous function that executes work in the background without awaiting on the result. A throat guaruntees no more than `n` functions will be executed at a time.

```
const sync = ThroatFunction(2)
await sync(async()=>{
Expand All @@ -32,4 +34,10 @@ await sync(async()=>{
// wait until done
await sync(null)
```
```


## Usage in a throat queue


A throat is an asyncronous function that executes work in the background with each function call returning an awaitable promise for the underlying call. A throat guaruntees no more than `n` functions will be executed at a time.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ module.exports = require('./lib/SyncFunction')
module.exports.SyncFunction = require('./lib/SyncFunction')
module.exports.SyncQueue = require('./lib/SyncQueue')
module.exports.ThroatFunction = require('./lib/ThroatFunction')
module.exports.ThroatQueueFunction = require('./lib/ThroatQueueFunction')
module.exports.TimerFunction = require('./lib/TimerFunction')
53 changes: 53 additions & 0 deletions lib/ThroatQueueFunction.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const Q = require ('@halleyassist/q-lite')

const NextTickPromise = Q()

function ThroatQueueFunction(n = 5){
const running = []
const ret = async function(what){
if(what === null){
if(running.length === 0) return
await Promise.all(running)
return
}

// This shouldn't happen if we correctly await on the throat
while(running.length >= n){
await Q.safeRace(running)
}

// call fn
const rFn = async ()=>{
await NextTickPromise
try {
if(typeof what === 'function'){
what = what()
}
return await what
} finally {
for(let i = 0 ; i < running.length; i++){
if(running[i].id === idObj){
running.splice(i, 1)
break
}
}
}
}

const idObj = {}
const r = rFn()
r.id = idObj
running.push(r)
await r

while(running.length >= n){
await Q.safeRace(running)
}
}

ret.running = running

return ret
}

module.exports = ThroatQueueFunction
53 changes: 52 additions & 1 deletion test/throat_test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const {ThroatFunction} = require('../index'),
const {ThroatFunction, ThroatQueueFunction} = require('../index'),
{expect} = require('chai'),
Q = require('@halleyassist/q-lite')

Expand Down Expand Up @@ -53,4 +53,55 @@ describe('ThroatFunction', function(){
expect(count).to.be.eql(10)

})
})
describe('ThroatQueueFunction', function(){
it('should run 5 at a time without backpressure', async() => {
const tf = ThroatQueueFunction(5)

let count = 0
const deferred = Q.defer()
for(let i = 0; i < 10; i++){
tf(()=>{
count++
return deferred.promise
})
}

await Q.delay(10)


expect(count).to.be.eql(5)
deferred.resolve()


await Q.delay(10)
await tf(null)
expect(count).to.be.eql(10)

})
it('should capture stack trace', async() => {
async function testFn(){
const errors = []
const tf = ThroatQueueFunction(5)

for(let i = 0; i < 10; i++){
try {
await tf(()=>{
throw new Error('test')
})
} catch(ex) {
errors.push(ex)
}
}

return errors
}

const errors = await testFn()
expect(errors.length).to.be.eql(10)
for(const e of errors){
expect(e.stack).to.be.a('string')
expect(e.stack).to.contain('testFn')
}
})
})

0 comments on commit 2bbf5ad

Please sign in to comment.