diff --git a/README.md b/README.md index 4b79b11..c48c4e1 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,8 @@ await sync(async()=>{ ## Usage as throat +A throat is an asyncronous function that executes work in the background without awaiting on the result. A throat guaruntees no more than `n` functions will be executed at a time. + ``` const sync = ThroatFunction(2) await sync(async()=>{ @@ -32,4 +34,10 @@ await sync(async()=>{ // wait until done await sync(null) -``` \ No newline at end of file +``` + + +## Usage in a throat queue + + +A throat is an asyncronous function that executes work in the background with each function call returning an awaitable promise for the underlying call. A throat guaruntees no more than `n` functions will be executed at a time. \ No newline at end of file diff --git a/index.js b/index.js index 55bb959..9e204af 100644 --- a/index.js +++ b/index.js @@ -2,4 +2,5 @@ module.exports = require('./lib/SyncFunction') module.exports.SyncFunction = require('./lib/SyncFunction') module.exports.SyncQueue = require('./lib/SyncQueue') module.exports.ThroatFunction = require('./lib/ThroatFunction') +module.exports.ThroatQueueFunction = require('./lib/ThroatQueueFunction') module.exports.TimerFunction = require('./lib/TimerFunction') \ No newline at end of file diff --git a/lib/ThroatQueueFunction.js b/lib/ThroatQueueFunction.js new file mode 100644 index 0000000..f8f7c19 --- /dev/null +++ b/lib/ThroatQueueFunction.js @@ -0,0 +1,53 @@ +const Q = require ('@halleyassist/q-lite') + +const NextTickPromise = Q() + +function ThroatQueueFunction(n = 5){ + const running = [] + const ret = async function(what){ + if(what === null){ + if(running.length === 0) return + await Promise.all(running) + return + } + + // This shouldn't happen if we correctly await on the throat + while(running.length >= n){ + await Q.safeRace(running) + } + + // call fn + const rFn = async ()=>{ + await NextTickPromise + try { + if(typeof what === 'function'){ + what = what() + } + return await what + } finally { + for(let i = 0 ; i < running.length; i++){ + if(running[i].id === idObj){ + running.splice(i, 1) + break + } + } + } + } + + const idObj = {} + const r = rFn() + r.id = idObj + running.push(r) + await r + + while(running.length >= n){ + await Q.safeRace(running) + } + } + + ret.running = running + + return ret +} + +module.exports = ThroatQueueFunction \ No newline at end of file diff --git a/test/throat_test.js b/test/throat_test.js index a3f777e..c2c4e9d 100644 --- a/test/throat_test.js +++ b/test/throat_test.js @@ -1,4 +1,4 @@ -const {ThroatFunction} = require('../index'), +const {ThroatFunction, ThroatQueueFunction} = require('../index'), {expect} = require('chai'), Q = require('@halleyassist/q-lite') @@ -53,4 +53,55 @@ describe('ThroatFunction', function(){ expect(count).to.be.eql(10) }) +}) +describe('ThroatQueueFunction', function(){ + it('should run 5 at a time without backpressure', async() => { + const tf = ThroatQueueFunction(5) + + let count = 0 + const deferred = Q.defer() + for(let i = 0; i < 10; i++){ + tf(()=>{ + count++ + return deferred.promise + }) + } + + await Q.delay(10) + + + expect(count).to.be.eql(5) + deferred.resolve() + + + await Q.delay(10) + await tf(null) + expect(count).to.be.eql(10) + + }) + it('should capture stack trace', async() => { + async function testFn(){ + const errors = [] + const tf = ThroatQueueFunction(5) + + for(let i = 0; i < 10; i++){ + try { + await tf(()=>{ + throw new Error('test') + }) + } catch(ex) { + errors.push(ex) + } + } + + return errors + } + + const errors = await testFn() + expect(errors.length).to.be.eql(10) + for(const e of errors){ + expect(e.stack).to.be.a('string') + expect(e.stack).to.contain('testFn') + } + }) }) \ No newline at end of file