diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index 7de75c3a7..98a98d906 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -11,52 +11,94 @@ ARGV[3] limit the number of jobs to be removed. 0 is unlimited ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed' ]] + +local setKey = KEYS[1] +local priorityKey = KEYS[2] +local rateLimiterKey = KEYS[3] + +local jobKeyPrefix = ARGV[1] +local maxTimestamp = ARGV[2] +local limitStr = ARGV[3] +local setName = ARGV[4] + local command = "ZRANGE" local isList = false local rcall = redis.call -if ARGV[4] == "wait" or ARGV[4] == "active" or ARGV[4] == "paused" then +if setName == "wait" or setName == "active" or setName == "paused" then command = "LRANGE" isList = true end -local jobIds = rcall(command, KEYS[1], 0, -1) +local limit = tonumber(limitStr) +local rangeStart = 0 +local rangeEnd = -1 + +-- If we're only deleting _n_ items, avoid retrieving all items +-- for faster performance +-- +-- Start from the tail of the list, since that's where oldest elements +-- are generally added for FIFO lists +if limit > 0 then + rangeStart = -1 - limit + 1 + rangeEnd = -1 +end + +local jobIds = rcall(command, setKey, rangeStart, rangeEnd) local deleted = {} local deletedCount = 0 -local limit = tonumber(ARGV[3]) local jobTS -for _, jobId in ipairs(jobIds) do - if limit > 0 and deletedCount >= limit then - break - end - local jobKey = ARGV[1] .. jobId - if (rcall("EXISTS", jobKey .. ":lock") == 0) then - jobTS = rcall("HGET", jobKey, "timestamp") - if (not jobTS or jobTS < ARGV[2]) then - if isList then - rcall("LREM", KEYS[1], 0, jobId) - else - rcall("ZREM", KEYS[1], jobId) - end - rcall("ZREM", KEYS[2], jobId) - rcall("DEL", jobKey) - rcall("DEL", jobKey .. ":logs") - - -- delete keys related to rate limiter - -- NOTE: this code is unncessary for other sets than wait, paused and delayed. - local limiterIndexTable = KEYS[3] .. ":index" - local limitedSetKey = rcall("HGET", limiterIndexTable, jobId) - - if limitedSetKey then - rcall("SREM", limitedSetKey, jobId) - rcall("HDEL", limiterIndexTable, jobId) - end +-- Run this loop: +-- - Once, if limit is -1 or 0 +-- - As many times as needed if limit is positive +while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do + for _, jobId in ipairs(jobIds) do + if limit > 0 and deletedCount >= limit then + break + end + + local jobKey = jobKeyPrefix .. jobId + if (rcall("EXISTS", jobKey .. ":lock") == 0) then + jobTS = rcall("HGET", jobKey, "timestamp") + if (not jobTS or jobTS < maxTimestamp) then + if isList then + rcall("LREM", setKey, 0, jobId) + else + rcall("ZREM", setKey, jobId) + end + rcall("ZREM", priorityKey, jobId) + rcall("DEL", jobKey) + rcall("DEL", jobKey .. ":logs") + + -- delete keys related to rate limiter + -- NOTE: this code is unncessary for other sets than wait, paused and delayed. + local limiterIndexTable = rateLimiterKey .. ":index" + local limitedSetKey = rcall("HGET", limiterIndexTable, jobId) - deletedCount = deletedCount + 1 - table.insert(deleted, jobId) + if limitedSetKey then + rcall("SREM", limitedSetKey, jobId) + rcall("HDEL", limiterIndexTable, jobId) + end + + deletedCount = deletedCount + 1 + table.insert(deleted, jobId) + end end end + + -- If we didn't have a limit, return immediately. We should have deleted + -- all the jobs we can + if limit <= 0 then + break + end + + if deletedCount < limit then + -- We didn't delete enough. Look for more to delete + rangeStart = rangeStart - limit + rangeEnd = rangeEnd - limit + jobIds = rcall(command, setKey, rangeStart, rangeEnd) + end end return deleted diff --git a/test/test_queue.js b/test/test_queue.js index f211b0f43..e703d600b 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -2837,6 +2837,34 @@ describe('Queue', () => { }); }); + it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => { + // This job shouldn't get deleted due to the 5000 grace + await queue.add({ some: 'data' }); + // This job should get cleaned since 10000 > 5000 grace + const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 }); + // This job shouldn't get deleted due to the 5000 grace + await queue.add({ some: 'data' }); + + const cleaned = await queue.clean(5000, 'wait', 1); + expect(cleaned.length).to.be.eql(1); + expect(cleaned[0]).to.eql(jobToClean.id); + + const len = await queue.count(); + expect(len).to.be.eql(2); + }); + + it('shouldn\'t clean anything if all jobs are in grace period', async () => { + await queue.add({ some: 'data' }); + await queue.add({ some: 'data' }); + + const cleaned = await queue.clean(5000, 'wait', 1); + expect(cleaned.length).to.be.eql(0); + + const cleaned2 = await queue.clean(5000, 'wait'); + expect(cleaned2.length).to.be.eql(0); + expect(await queue.count()).to.be.eql(2); + }); + it('should properly clean jobs from the priority set', done => { const client = new redis(6379, '127.0.0.1', {}); queue.add({ some: 'data' }, { priority: 5 });