diff --git a/api.lua b/api.lua index 91da80b..b956043 100644 --- a/api.lua +++ b/api.lua @@ -74,8 +74,8 @@ QlessAPI.heartbeat = function(now, jid, worker, data) return Qless.job(jid):heartbeat(now, worker, data) end -QlessAPI.workers = function(now, worker) - return cjson.encode(QlessWorker.counts(now, worker)) +QlessAPI.workers = function(now, ...) + return cjson.encode(QlessWorker.counts(now, unpack(arg))) end QlessAPI.track = function(now, command, jid) diff --git a/test/test_worker.py b/test/test_worker.py index 5624fc5..26ed815 100644 --- a/test/test_worker.py +++ b/test/test_worker.py @@ -10,6 +10,12 @@ def setUp(self): # No grace period self.lua('config.set', 0, 'grace-period', 0) + def test_malformed(self): + '''Enumerate all the ways in which the input can be malformed''' + self.assertMalformed(self.lua, [ + ('workers', 1, 0, 'arg2') # Count arg malformed + ]) + def test_basic(self): '''Basic worker-level information''' self.lua('put', 0, 'worker', 'queue', 'jid', 'klass', {}, 0) @@ -24,6 +30,31 @@ def test_basic(self): 'stalled': 0 }]) + def test_paginated(self): + '''Paginated worker-level information''' + + self.lua('put', 0, 'worker-1', 'queue', 'jid-1', 'klass', {}, 0) + self.lua('pop', 1, 'queue', 'worker-1', 10) + self.lua('put', 2, 'worker-2', 'queue', 'jid-2', 'klass', {}, 0) + self.lua('pop', 3, 'queue', 'worker-2', 10) + self.lua('put', 2, 'worker-3', 'queue', 'jid-3', 'klass', {}, 0) + self.lua('pop', 3, 'queue', 'worker-3', 10) + + worker_names = [w['name'] for w in self.lua('workers', 20)] + self.assertEqual(worker_names, ['worker-3', 'worker-2', 'worker-1']) + + worker_names = [w['name'] for w in self.lua('workers', 20, 0, 1)] + self.assertEqual(worker_names, ['worker-3']) + + worker_names = [w['name'] for w in self.lua('workers', 20, 1, 1)] + self.assertEqual(worker_names, ['worker-2']) + + worker_names = [w['name'] for w in self.lua('workers', 20, 2, 1)] + self.assertEqual(worker_names, ['worker-1']) + + worker_names = [w['name'] for w in self.lua('workers', 20, 1, 2)] + self.assertEqual(worker_names, ['worker-2', 'worker-1']) + def test_stalled(self): '''We should be able to detect stalled jobs''' self.lua('put', 0, 'worker', 'queue', 'jid', 'klass', {}, 0) diff --git a/worker.lua b/worker.lua index b3ef835..88e0f92 100644 --- a/worker.lua +++ b/worker.lua @@ -3,6 +3,8 @@ function QlessWorker.deregister(...) redis.call('zrem', 'ql:workers', unpack(arg)) end +-- Counts(now, [offset, [count]]) +-- Counts(now, worker) -- Provide data about all the workers, or if a specific worker is provided, -- then which jobs that worker is responsible for. If no worker is provided, -- expect a response of the form: @@ -31,7 +33,7 @@ end -- ] -- } -- -function QlessWorker.counts(now, worker) +function QlessWorker.counts(now, ...) -- Clean up all the workers' job lists if they're too old. This is -- determined by the `max-worker-age` configuration, defaulting to the -- last day. Seems like a 'reasonable' default @@ -45,14 +47,26 @@ function QlessWorker.counts(now, worker) -- And now remove them from the list of known workers redis.call('zremrangebyscore', 'ql:workers', 0, now - interval) + --- Preserve backwards compatibility for counts() which only + --- takes a worker and not offset/count + local worker + if not tonumber(arg[1]) then + worker = arg[1] + end + if worker then return { jobs = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now + 8640000, now), stalled = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now, 0) } else + local offset = assert(tonumber(arg[1] or 0), + 'Failed(): Arg "offset" is not a number: ' .. tostring(arg[1])) + local count = assert(tonumber(arg[2] or 0), + 'Failed(): Arg "count" is not a number: ' .. tostring(arg[2])) + local response = {} - local workers = redis.call('zrevrange', 'ql:workers', 0, -1) + local workers = redis.call('zrevrange', 'ql:workers', offset, offset + count - 1) for index, worker in ipairs(workers) do table.insert(response, { name = worker,