From 4f5e388367f161d26f48f008e395d3d87b6495ea Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Fri, 30 Oct 2020 17:23:01 +0100 Subject: [PATCH] fix: faster delay handling. fixes #1893 --- lib/commands/moveToActive-8.lua | 25 ++-- lib/commands/moveToFinished-7.lua | 118 ----------------- lib/commands/moveToFinished-8.lua | 206 ++++++++++++++++++++++++++++++ lib/queue.js | 74 +++++------ lib/scripts.js | 12 +- test/test_job.js | 16 ++- 6 files changed, 274 insertions(+), 177 deletions(-) delete mode 100644 lib/commands/moveToFinished-7.lua create mode 100644 lib/commands/moveToFinished-8.lua diff --git a/lib/commands/moveToActive-8.lua b/lib/commands/moveToActive-8.lua index 1b3518662..f46d76b9f 100644 --- a/lib/commands/moveToActive-8.lua +++ b/lib/commands/moveToActive-8.lua @@ -10,7 +10,7 @@ KEYS[1] wait key KEYS[2] active key KEYS[3] priority key - KEYS[4] active event key + KEYS[4] active key KEYS[5] stalled key -- Rate limiting @@ -34,12 +34,11 @@ local rcall = redis.call -local rateLimit = function(jobId, maxJobs) - local rateLimiterKey = KEYS[6]; +local rateLimit = function(activeKey, rateLimiterKey, delayedKey, jobId, timestamp, maxJobs, timeUnit, bounceBack, groupLimit) local limiterIndexTable = rateLimiterKey .. ":index" -- Rate limit by group? - if(ARGV[9]) then + if(groupLimit) then local group = string.match(jobId, "[^:]+$") if group ~= nil then rateLimiterKey = rateLimiterKey .. ":" .. group @@ -68,7 +67,7 @@ local rateLimit = function(jobId, maxJobs) if numLimitedJobs > 0 then -- Note, add some slack to compensate for drift. - delay = ((numLimitedJobs * ARGV[7] * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey)) + delay = ((numLimitedJobs * timeUnit * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey)) end end @@ -80,31 +79,29 @@ local rateLimit = function(jobId, maxJobs) if (delay == 0) and (jobCounter >= maxJobs) then -- Seems like there are no current rated limited jobs, but the jobCounter has exceeded the number of jobs for this unit of time so we need to rate limit this job. local exceedingJobs = jobCounter - maxJobs - delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * ARGV[7]) / maxJobs + delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * timeUnit) / maxJobs end if delay > 0 then - local bounceBack = ARGV[8] if bounceBack == 'false' then - local timestamp = delay + tonumber(ARGV[4]) + local timestamp = delay + tonumber(timestamp) -- put job into delayed queue - rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId) - rcall("PUBLISH", KEYS[7], timestamp) + rcall("ZADD", delayedKey, timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId) + rcall("PUBLISH", delayedKey, timestamp) rcall("SADD", limitedSetKey, jobId) -- store index so that we can delete rate limited data rcall("HSET", limiterIndexTable, jobId, limitedSetKey) - end -- remove from active queue - rcall("LREM", KEYS[2], 1, jobId) + rcall("LREM", activeKey, 1, jobId) return true else -- false indicates not rate limited -- increment jobCounter only when a job is not rate limited if (jobCounter == 0) then - rcall("PSETEX", rateLimiterKey, ARGV[7], 1) + rcall("PSETEX", rateLimiterKey, timeUnit, 1) else rcall("INCR", rateLimiterKey) end @@ -127,7 +124,7 @@ if jobId then local maxJobs = tonumber(ARGV[6]) if maxJobs then - if rateLimit(jobId, maxJobs) then + if rateLimit(KEYS[2], KEYS[6], KEYS[7], jobId, ARGV[4], maxJobs, ARGV[7], ARGV[8], ARGV[9]) then return end end diff --git a/lib/commands/moveToFinished-7.lua b/lib/commands/moveToFinished-7.lua deleted file mode 100644 index 04a1f34c3..000000000 --- a/lib/commands/moveToFinished-7.lua +++ /dev/null @@ -1,118 +0,0 @@ ---[[ - Move job from active to a finished status (completed o failed) - A job can only be moved to completed if it was active. - The job must be locked before it can be moved to a finished status, - and the lock must be released in this script. - - Input: - KEYS[1] active key - KEYS[2] completed/failed key - KEYS[3] jobId key - - KEYS[4] wait key - KEYS[5] priority key - KEYS[6] active event key - - KEYS[7] delayed key - - ARGV[1] jobId - ARGV[2] timestamp - ARGV[3] msg property - ARGV[4] return value / failed reason - ARGV[5] token - ARGV[6] shouldRemove - ARGV[7] event data (? maybe just send jobid). - ARGV[8] should fetch next job - ARGV[9] base key - - Output: - 0 OK - -1 Missing key. - -2 Missing lock. - - Events: - 'completed/failed' -]] -local rcall = redis.call - -if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists - if ARGV[5] ~= "0" then - local lockKey = KEYS[3] .. ':lock' - if rcall("GET", lockKey) == ARGV[5] then - rcall("DEL", lockKey) - else - return -2 - end - end - - -- Remove from active list - rcall("LREM", KEYS[1], -1, ARGV[1]) - - -- Remove job? - local removeJobs = tonumber(ARGV[6]) - if removeJobs ~= 1 then - -- Add to complete/failed set - rcall("ZADD", KEYS[2], ARGV[2], ARGV[1]) - rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", ARGV[2]) -- "returnvalue" / "failedReason" and "finishedOn" - - -- Remove old jobs? - if removeJobs and removeJobs > 1 then - local start = removeJobs - 1 - local jobIds = rcall("ZREVRANGE", KEYS[2], start, -1) - for i, jobId in ipairs(jobIds) do - local jobKey = ARGV[9] .. jobId - local jobLogKey = jobKey .. ':logs' - rcall("DEL", jobKey, jobLogKey) - end - rcall("ZREMRANGEBYRANK", KEYS[2], 0, -removeJobs); - end - else - local jobLogKey = KEYS[3] .. ':logs' - rcall("DEL", KEYS[3], jobLogKey) - end - - rcall("PUBLISH", KEYS[2], ARGV[7]) - - -- -- Check if we should get from the delayed set instead of the waiting list - -- local delayedJobId = rcall("ZRANGEBYSCORE", KEYS[7], 0, tonumber(ARGV[2]) * 0x1000, "LIMIT", 0, 1)[1] - -- if delayedJobId ~= nil then - -- local jobId = delayedJobId - -- if jobId then - -- local jobKey = ARGV[9] .. jobId - -- local lockKey = jobKey .. ':lock' - - -- -- get a lock - -- rcall("SET", lockKey, ARGV[11], "PX", ARGV[10]) - - -- rcall("ZREM", KEYS[5], jobId) -- remove from priority - -- rcall("PUBLISH", KEYS[6], jobId) - -- rcall("HSET", jobKey, "processedOn", ARGV[2]) - - -- return {rcall("HGETALL", jobKey), jobId} -- get job data - -- end - -- end - - -- Try to get next job to avoid an extra roundtrip if the queue is not closing, - -- and not rate limited. - if(ARGV[8] == "1") then - -- move from wait to active - local jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1]) - if jobId then - local jobKey = ARGV[9] .. jobId - local lockKey = jobKey .. ':lock' - - -- get a lock - rcall("SET", lockKey, ARGV[11], "PX", ARGV[10]) - - rcall("ZREM", KEYS[5], jobId) -- remove from priority - rcall("PUBLISH", KEYS[6], jobId) - rcall("HSET", jobKey, "processedOn", ARGV[2]) - - return {rcall("HGETALL", jobKey), jobId} -- get job data - end - end - - return 0 -else - return -1 -end diff --git a/lib/commands/moveToFinished-8.lua b/lib/commands/moveToFinished-8.lua new file mode 100644 index 000000000..7d6d6f414 --- /dev/null +++ b/lib/commands/moveToFinished-8.lua @@ -0,0 +1,206 @@ +--[[ + Move job from active to a finished status (completed o failed) + A job can only be moved to completed if it was active. + The job must be locked before it can be moved to a finished status, + and the lock must be released in this script. + + Input: + KEYS[1] active key + KEYS[2] completed/failed key + KEYS[3] jobId key + KEYS[4] wait key + KEYS[5] priority key + KEYS[6] active event key + KEYS[7] delayed key + KEYS[8] rateLimiterKey + + ARGV[1] jobId + ARGV[2] timestamp + ARGV[3] msg property + ARGV[4] return value / failed reason + ARGV[5] token + ARGV[6] shouldRemove + ARGV[7] event data (? maybe just send jobid). + ARGV[8] should fetch next job + ARGV[9] base key + + ARGV[10] lock duration + ARGV[11] token + + ARGV[12] optional jobs per time unit (rate limiter) + ARGV[13] optional time unit (rate limiter) + ARGV[14] optional do not do anything with job if rate limit hit + ARGV[15] optional rate limit by key + + Output: + 0 OK + -1 Missing key. + -2 Missing lock. + + Events: + 'completed/failed' +]] +local rcall = redis.call + +-- This is copy/paste from moveToActive since we cannot call functions from other scripts. +local rateLimit = function(activeKey, rateLimiterKey, delayedKey, jobId, timestamp, maxJobs, timeUnit, bounceBack, groupLimit) + local limiterIndexTable = rateLimiterKey .. ":index" + + -- Rate limit by group? + if(groupLimit) then + local group = string.match(jobId, "[^:]+$") + if group ~= nil then + rateLimiterKey = rateLimiterKey .. ":" .. group + end + end + + -- key for storing rate limited jobs + -- When a job has been previously rate limited it should be part of this set + -- if the job is back here means that the delay time for this job has passed and now we should + -- be able to process it again. + local limitedSetKey = rateLimiterKey .. ":limited" + local delay = 0 + + -- -- Check if job was already limited + local isLimited = rcall("SISMEMBER", limitedSetKey, jobId); + + if isLimited == 1 then + -- Remove from limited zset since we are going to try to process it + rcall("SREM", limitedSetKey, jobId) + rcall("HDEL", limiterIndexTable, jobId) + else + -- If not, check if there are any limited jobs + -- If the job has not been rate limited, we should check if there are any other rate limited jobs, because if that + -- is the case we do not want to process this job, just calculate a delay for it and put it to "sleep". + local numLimitedJobs = rcall("SCARD", limitedSetKey) + + if numLimitedJobs > 0 then + -- Note, add some slack to compensate for drift. + delay = ((numLimitedJobs * timeUnit * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey)) + end + end + + local jobCounter = tonumber(rcall("GET", rateLimiterKey)) + if(jobCounter == nil) then + jobCounter = 0 + end + -- check if rate limit hit + if (delay == 0) and (jobCounter >= maxJobs) then + -- Seems like there are no current rated limited jobs, but the jobCounter has exceeded the number of jobs for + -- this unit of time so we need to rate limit this job. + local exceedingJobs = jobCounter - maxJobs + delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * timeUnit) / maxJobs + end + + if delay > 0 then + if bounceBack == 'false' then + local timestamp = delay + tonumber(timestamp) + -- put job into delayed queue + rcall("ZADD", delayedKey, timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId) + rcall("PUBLISH", delayedKey, timestamp) + rcall("SADD", limitedSetKey, jobId) + + -- store index so that we can delete rate limited data + rcall("HSET", limiterIndexTable, jobId, limitedSetKey) + end + + -- remove from active queue + rcall("LREM", activeKey, 1, jobId) + return true + else + -- false indicates not rate limited + -- increment jobCounter only when a job is not rate limited + if (jobCounter == 0) then + rcall("PSETEX", rateLimiterKey, timeUnit, 1) + else + rcall("INCR", rateLimiterKey) + end + return false + end +end + + +if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists + if ARGV[5] ~= "0" then + local lockKey = KEYS[3] .. ':lock' + if rcall("GET", lockKey) == ARGV[5] then + rcall("DEL", lockKey) + else + return -2 + end + end + + -- Remove from active list + rcall("LREM", KEYS[1], -1, ARGV[1]) + + -- Remove job? + local removeJobs = tonumber(ARGV[6]) + if removeJobs ~= 1 then + -- Add to complete/failed set + rcall("ZADD", KEYS[2], ARGV[2], ARGV[1]) + rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", ARGV[2]) -- "returnvalue" / "failedReason" and "finishedOn" + + -- Remove old jobs? + if removeJobs and removeJobs > 1 then + local start = removeJobs - 1 + local jobIds = rcall("ZREVRANGE", KEYS[2], start, -1) + for i, jobId in ipairs(jobIds) do + local jobKey = ARGV[9] .. jobId + local jobLogKey = jobKey .. ':logs' + rcall("DEL", jobKey, jobLogKey) + end + rcall("ZREMRANGEBYRANK", KEYS[2], 0, -removeJobs); + end + else + local jobLogKey = KEYS[3] .. ':logs' + rcall("DEL", KEYS[3], jobLogKey) + end + + rcall("PUBLISH", KEYS[2], ARGV[7]) + + -- Try to get next job to avoid an extra roundtrip if the queue is not closing, + -- and not rate limited. + if(ARGV[8] == "1") then + local jobId + -- Check if there are delayed jobs that can be picked up next. + jobId = rcall("ZRANGEBYSCORE", KEYS[7], 0, tonumber(ARGV[2]) * 0x1000, "LIMIT", 0, 1)[1] + if jobId then + -- move from delayed to active + rcall("ZREM", KEYS[7], jobId) + rcall("LPUSH", KEYS[1], jobId) + else + -- move from wait to active + jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1]) + end + + if jobId then + + -- Check if we need to perform rate limiting. + -- local maxJobs = tonumber(ARGV[12]) + + -- if maxJobs then + -- local rateLimit = function(activeKey, rateLimiterKey, delayedKey, jobId, timestamp, maxJobs, timeUnit, bounceBack, groupLimit) + + -- if rateLimit(KEYS[1], KEYS[8], KEYS[7], jobId, ARGV[2], maxJobs, ARGV[13], ARGV[14], ARGV[15]) then + -- return 0 + -- end + -- end + + local jobKey = ARGV[9] .. jobId + local lockKey = jobKey .. ':lock' + + -- get a lock + rcall("SET", lockKey, ARGV[11], "PX", ARGV[10]) + + rcall("ZREM", KEYS[5], jobId) -- remove from priority + rcall("PUBLISH", KEYS[6], jobId) + rcall("HSET", jobKey, "processedOn", ARGV[2]) + + return {rcall("HGETALL", jobKey), jobId} -- get job data + end + end + + return 0 +else + return -1 +end diff --git a/lib/queue.js b/lib/queue.js index d568a4d60..2e2f08390 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -554,10 +554,7 @@ Queue.prototype.close = function(doNotWaitJobs) { } return cleanPromise; }) - .then( - async () => this.disconnect(), - err => console.error(err) - ) + .then(async () => this.disconnect(), err => console.error(err)) .finally(() => { this.closed = true; this.emit('close'); @@ -888,42 +885,44 @@ Queue.prototype.run = function(concurrency) { at the next known delayed job. */ Queue.prototype.updateDelayTimer = function() { - return scripts - .updateDelaySet(this, Date.now()) - .then(nextTimestamp => { - this.delayedTimestamp = nextTimestamp - ? nextTimestamp / 4096 - : Number.MAX_VALUE; - - // Clear any existing update delay timer - if (this.delayTimer) { - clearTimeout(this.delayTimer); - } + if (this.drained) { + return scripts + .updateDelaySet(this, Date.now()) + .then(nextTimestamp => { + this.delayedTimestamp = nextTimestamp + ? nextTimestamp / 4096 + : Number.MAX_VALUE; + + // Clear any existing update delay timer + if (this.delayTimer) { + clearTimeout(this.delayTimer); + } - // Delay for the next update of delay set - const delay = _.min([ - this.delayedTimestamp - Date.now(), - this.settings.guardInterval - ]); + // Delay for the next update of delay set + const delay = _.min([ + this.delayedTimestamp - Date.now(), + this.settings.guardInterval + ]); - // Schedule next processing of the delayed jobs - if (delay <= 0) { - // Next set of jobs are due right now, process them also - this.updateDelayTimer(); - } else { - // Update the delay set when the next job is due - // or the next guard time - this.delayTimer = setTimeout(() => this.updateDelayTimer(), delay); - } + // Schedule next processing of the delayed jobs + if (delay <= 0) { + // Next set of jobs are due right now, process them also + this.updateDelayTimer(); + } else { + // Update the delay set when the next job is due + // or the next guard time + this.delayTimer = setTimeout(() => this.updateDelayTimer(), delay); + } - // Silence warnings about promise created but not returned. - // This isn't an issue since we emit errors. - // See http://bluebirdjs.com/docs/warning-explanations.html#warning-a-promise-was-created-in-a-handler-but-was-not-returned-from-it - return null; - }) - .catch(err => { - this.emit('error', err, 'Error updating the delay timer'); - }); + // Silence warnings about promise created but not returned. + // This isn't an issue since we emit errors. + // See http://bluebirdjs.com/docs/warning-explanations.html#warning-a-promise-was-created-in-a-handler-but-was-not-returned-from-it + return null; + }) + .catch(err => { + this.emit('error', err, 'Error updating the delay timer'); + }); + } }; /** @@ -1157,6 +1156,7 @@ Queue.prototype.nextJobFromJobData = function(jobData, jobId) { } else { this.drained = true; this.emit('drained'); + this.updateDelayTimer(); return null; } }; diff --git a/lib/scripts.js b/lib/scripts.js index 9ddfc59db..428f0eaa9 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -122,7 +122,8 @@ const scripts = { queueKeys.wait, queueKeys.priority, queueKeys.active + '@' + queue.token, - queueKeys.delayed + queueKeys.delayed, + queueKeys.limiter ]; if (typeof shouldRemove === 'boolean') { @@ -145,6 +146,15 @@ const scripts = { queue.token ]; + if (queue.limiter) { + args.push( + queue.limiter.max, + queue.limiter.duration, + !!queue.limiter.bounceBack + ); + queue.limiter.groupKey && args.push(true); + } + return keys.concat(args); }, diff --git a/test/test_job.js b/test/test_job.js index aa3f27894..cb07a69ec 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -695,7 +695,7 @@ describe('Job', () => { it('should process a promoted job according to its priority', done => { queue.process(() => { - return delay(100); + return delay(10); }); const completed = []; @@ -703,8 +703,12 @@ describe('Job', () => { queue.on('completed', job => { completed.push(job.id); if (completed.length > 3) { - expect(completed).to.be.eql(['1', '2', '3', '4']); - done(); + try { + expect(completed).to.be.eql(['1', '2', '3', '4']); + done(); + } catch (err) { + done(err); + } } }); const processStarted = new Promise(resolve => @@ -718,10 +722,8 @@ describe('Job', () => { .then(() => add('2', 1)) .then(() => processStarted) .then(() => add('3', 5000)) - .then(job => { - job.promote(); - }) - .then(() => add('4', 1)); + .then(job => job.promote()) + .then(() => add('4', 50)); }); it('should not promote a job that is not delayed', () => {