Skip to content

Commit

Permalink
more gc optimised throat function
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Sep 5, 2023
1 parent 5d58704 commit b044f52
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions lib/ThroatQueueFunction.js
Original file line number Diff line number Diff line change
@@ -1,61 +1,64 @@
const Q = require ('@halleyassist/q-lite')

const NextTickPromise = Q()
const EmptyArray = []

function ThroatQueueFunction(n = 5){
const running = []
const running = new Set()

async function race(cancellationState, running){
const deferred = Q.defer()
cancellationState.deferredWrap(deferred)
running = [...running, deferred.promise]
running = [deferred.promise, ...running]
try {
await Q.safeRace(running)
} finally {
deferred.resolve(null)
}
}

async function rFn(cancellationState, what, idObj){
await NextTickPromise
try {
if(typeof what === 'function'){
what = what()
}
return await cancellationState.promiseWrap(what)
} finally {
running.delete(idObj)
}
}

let ret = async function(cancellationState, what){
if(what === null){
if(running.length === 0) return running
if(running.size === 0) return EmptyArray
await Promise.all(running)
return running
if(running.size === 0) return EmptyArray

// while we waited new jobs were added
return [...running]
}

// This shouldn't happen if we correctly await on the throat
while(running.length >= n){
while(running.size >= n){
await race(cancellationState, running)
}

const idObj = {}

// call fn
const rFn = async ()=>{
await NextTickPromise
try {
if(typeof what === 'function'){
what = what()
}
return await cancellationState.promiseWrap(what)
} finally {
for(let i = 0 ; i < running.length; i++){
if(running[i].id === idObj){
running.splice(i, 1)
break
}
}
}
}

const r = rFn()
const d = Q.defer()
// will be used as an id
const dTrack = Q.defer()
dTrack.promise.id = idObj
dTrack.promise.fn = what
dTrack.promise.cancel = ()=>{
cancellationState.cancel()
}
running.push(dTrack.promise)

// call the function on the next tick
const r = rFn(cancellationState, what, dTrack.promise)

// we are now running
running.add(dTrack.promise)

// wait for this to be done
const d = Q.defer()
try {
const fnResult = await r
dTrack.resolve()
Expand All @@ -65,7 +68,8 @@ function ThroatQueueFunction(n = 5){
d.reject(ex)
}

while(running.length >= n){
// if we are running too many, wait for one to finish
while(running.size >= n){
await race(cancellationState, running)
}

Expand All @@ -74,7 +78,13 @@ function ThroatQueueFunction(n = 5){

ret = Q.canceller(ret)

ret.running = running
Object.defineProperty(ret, 'running', {
get: function(){
return [...running]
},
enumerable: true,
configurable: true
});

return ret
}
Expand Down

0 comments on commit b044f52

Please sign in to comment.