Skip to content

Commit

Permalink
introduce timers that are blocked event loop / high load safe
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Sep 8, 2023
1 parent 93814d2 commit c2deab7
Showing 1 changed file with 138 additions and 29 deletions.
167 changes: 138 additions & 29 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,47 +215,154 @@ Q.nfcall = function (fn, ...args) {
})
}

function timeoutHandle(e, deferred, overloadSafe) {
e.code = 'ETIMEDOUT'
if (overloadSafe) setImmediate(deferred.reject, e)
else deferred.reject(e)
}

Q.timeout = function (promise, ms, message = undefined, overloadSafe = true) {
Q.timeoutExact = function (promise, ms, message = undefined, overloadSafe = true) {
const deferred = Q.defer()

const stackTraceLimit = Error.stackTraceLimit
Error.stackTraceLimit = 10
let e = new Error(message ? message : `Timed out after ${ms} ms`)
Error.stackTraceLimit = stackTraceLimit

const e = new Error(message ? message : `Timed out after ${ms} ms`)
let timeout

promise.then(deferred.resolve, deferred.reject).then(() => {
clearTimeout(timeout)
})

deferred.promise.cancel = () => {
try {
if (promise.cancel) promise.cancel()
} finally {
e = null
deferred.reject(new CancellationError())
clearTimeout(timeout)
}
if (promise.cancel) promise.cancel()
deferred.reject(new CancellationError())
}

deferred.promise.extend = (ms)=>{
if(timeout){
clearTimeout(timeout)
}
timeout = setTimeout(timeoutHandle, ms, e, deferred, overloadSafe)
timeout = setTimeout(() => {
e.code = 'ETIMEDOUT'
if (overloadSafe) setImmediate(deferred.reject, e)
else deferred.reject(e)
}, ms)
}
deferred.promise.extend(ms)

return deferred.promise
}

const EmptyTimer = function(){}
EmptyTimer.time = Number.POSITIVE_INFINITY
let nextTimer = EmptyTimer
const timers = new Set()
let nextTickTimer = null

function addTimer(fn, ms){
const now = Date.now()
const timeToRun = now + ms
if(nextTimer.time > timeToRun){
nextTimer = fn
nextTimer.time = timeToRun

if(nextTickTimer === null) {
nextTickTimer = setTimeout(executeTimerTick, 25, now + 25)
}
}

}

function adjustTimer(fn, ms){
const now = Date.now()
fn.time = now + ms
if(nextTickTimer === fn){
for(const t in timers){
if(t.time < fn.time) {
nextTimer = t
clearTimeout(nextTickTimer)
timers.add(fn)
return
}
}
}
}

function clearTimer(fn){
if(nextTimer === fn){
nextTimer = EmptyTimer
for(const t of timers){
// potentially the next tiemr
if(t.time < nextTimer.time){
nextTimer = t
}
}
} else {
timers.delete(fn)
}
}

function executeTimerTick(scheduled){
const now = Date.now()
const workingTime = (now + scheduled) / 2

if(nextTimer.time > workingTime){
if(nextTickTimer !== EmptyTimer){
nextTickTimer = setTimeout(executeTimerTick, 25, Math.min(now + 25, workingTime + 50))
} else {
nextTickTimer = null
}
return
}

// execute the next timer
setImmediate(nextTimer)
nextTimer = EmptyTimer

// find the next timer, execute any due timers
for(const t of timers){
// due for execution
if(t.time <= workingTime) {
setImmediate(t)
timers.delete(t)
continue
}

// potentially the next tiemr
if(t.time < nextTimer.time){
nextTimer = t
}
}

if(nextTimer !== EmptyTimer){
timers.delete(nextTimer)

// schedule the next timer
nextTickTimer = setTimeout(executeTimerTick, 25, Math.min(now + 25, workingTime + 50))
} else {
nextTickTimer = null
}
}

Q.timeout = function (promise, ms, message = undefined) {
const deferred = Q.defer()

const e = new Error(message ? message : `Timed out after ${ms} ms`)
let timeout = () => {
e.code = 'ETIMEDOUT'
deferred.reject(e)
}

promise.then(deferred.resolve, deferred.reject).then(() => {
clearTimer(timeout)
})

deferred.promise.cancel = () => {
if (promise.cancel) promise.cancel()
deferred.reject(new CancellationError())
}

deferred.promise.extend = (ms)=>{
adjustTimer(timeout, ms)
}

addTimer(timeout, ms)

return deferred.promise
}

Q.timewarn = async function (promise, ms, fn, message = undefined) {
let ex = new Error(message ? message : `Timed out after ${ms} ms`)
async function doCall() {
Expand All @@ -266,12 +373,12 @@ Q.timewarn = async function (promise, ms, fn, message = undefined) {
const requeue = await fn(ex)
if(requeue){
if(!ex) return
timeout = setTimeout(doCall, Number.isInteger(requeue) ? requeue : ms)
addTimer(doCall, Number.isInteger(requeue) ? requeue : ms)
}
}
let timeout = setTimeout(doCall, ms)
addTimer(doCall, ms)
function doClear(v){
clearTimeout(timeout)
clearTimer(doCall)
ex = null
return v
}
Expand All @@ -283,17 +390,19 @@ Q.timewarn = async function (promise, ms, fn, message = undefined) {
return await promise.then(doClear, doClearEx)
}

Q.deferredTimeout = function (deferred, ms, symbol = undefined, overloadSafe = true) {
Q.deferredTimeout = function (deferred, ms, symbol = undefined) {
if (!symbol) {
symbol = new Error(`Timed out after ${ms} ms`)
}
const timer = setTimeout(() => {
if (overloadSafe) setImmediate(deferred.reject, symbol)
else deferred.reject(symbol)
}, ms)

const fn = () => {
deferred.reject(symbol)
}

addTimer(fn, ms)

deferred.promise.catch(() => { }).then(() => {
clearTimeout(timer)
clearTimer(fn)
})

return deferred.promise
Expand Down

0 comments on commit c2deab7

Please sign in to comment.