-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Throttling on arbitrary resources. #46
Changes from 73 commits
a3728f0
44df91c
14d0fe3
674353a
32c795c
4de319b
493d157
f6b9a38
36fd0fb
3599db3
9f3bdb9
623ebd9
b21e519
69bd3e7
d3fe766
1e003e5
4df4123
7fff784
fe776ef
4fc8e3c
27aebd4
a3244fe
81e2103
b4e7765
308a77e
5d86598
5141cbf
a27ac2e
d230b54
b159933
6522fe7
eaf6be1
32a1408
a70362b
fc332c9
b54298f
71ac5e0
aadd902
9595d5e
769d4f3
f7a3b6f
730c668
7d85dc8
2d9810c
4728f1d
85350ea
5668c32
0d8290e
9a37225
70a2779
dd2d931
8f004dc
a9c3b98
d1b52be
d7372ab
f1bf59f
3b80d6c
3108245
d058374
ee6f039
42aa9be
3bffc21
d9903a5
56df3fc
bd089c0
ed4a0f8
9a766e6
3550b9b
d821344
66c1ebd
3fe1fcd
6451b7c
65dac2d
cda5ed8
b6fd1aa
38e2493
5dbc192
21097aa
7932ac9
8fda126
98b71d6
4718824
8baa512
9e5e716
e8ed50c
230a777
e3fdb7e
6dbf028
20dc687
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -135,7 +135,7 @@ QlessAPI.unpause = function(now, ...) | |
end | ||
|
||
QlessAPI.cancel = function(now, ...) | ||
return Qless.cancel(unpack(arg)) | ||
return Qless.cancel(now, unpack(arg)) | ||
end | ||
|
||
QlessAPI.timeout = function(now, ...) | ||
|
@@ -199,6 +199,46 @@ QlessAPI['queue.forget'] = function(now, ...) | |
QlessQueue.deregister(unpack(arg)) | ||
end | ||
|
||
QlessAPI['queue.throttle.get'] = function(now, queue) | ||
local data = Qless.throttle(QlessQueue.ns .. queue):data() | ||
if not data then | ||
return nil | ||
end | ||
return cjson.encode(data) | ||
end | ||
|
||
QlessAPI['queue.throttle.set'] = function(now, queue, max) | ||
Qless.throttle(QlessQueue.ns .. queue):set({maximum = max}, 0) | ||
end | ||
|
||
-- Throttle apis | ||
QlessAPI['throttle.set'] = function(now, tid, max, ...) | ||
local expiration = unpack(arg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why deal with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it was to make it an optional argument, not terrible strong in lua. |
||
local data = { | ||
maximum = max | ||
} | ||
Qless.throttle(tid):set(data, tonumber(expiration or 0)) | ||
end | ||
|
||
QlessAPI['throttle.get'] = function(now, tid) | ||
return cjson.encode(Qless.throttle(tid):data()) | ||
end | ||
|
||
QlessAPI['throttle.delete'] = function(now, tid) | ||
return Qless.throttle(tid):unset() | ||
end | ||
|
||
QlessAPI['throttle.locks'] = function(now, tid) | ||
return Qless.throttle(tid).locks.members() | ||
end | ||
|
||
QlessAPI['throttle.pending'] = function(now, tid) | ||
return Qless.throttle(tid).pending.members() | ||
end | ||
|
||
QlessAPI['throttle.ttl'] = function(now, tid) | ||
return Qless.throttle(tid):ttl() | ||
end | ||
------------------------------------------------------------------------------- | ||
-- Function lookup | ||
------------------------------------------------------------------------------- | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,12 @@ local QlessJob = { | |
} | ||
QlessJob.__index = QlessJob | ||
|
||
-- throttle forward declaration | ||
local QlessThrottle = { | ||
ns = Qless.ns .. 'th:' | ||
} | ||
QlessThrottle.__index = QlessThrottle | ||
|
||
-- RecurringJob forward declaration | ||
local QlessRecurringJob = {} | ||
QlessRecurringJob.__index = QlessRecurringJob | ||
|
@@ -61,19 +67,69 @@ function Qless.recurring(jid) | |
return job | ||
end | ||
|
||
-- Return a throttle object | ||
-- throttle objects are used for arbitrary throttling of jobs. | ||
function Qless.throttle(tid) | ||
assert(tid, 'Throttle(): no tid provided') | ||
local throttle = QlessThrottle.data({id = tid}) | ||
setmetatable(throttle, QlessThrottle) | ||
|
||
-- set of jids which have acquired a lock on this throttle. | ||
throttle.locks = { | ||
length = function() | ||
return (redis.call('zcard', QlessThrottle.ns .. tid .. '-locks') or 0) | ||
end, members = function() | ||
return redis.call('zrange', QlessThrottle.ns .. tid .. '-locks', 0, -1) | ||
end, add = function(...) | ||
if #arg > 0 then | ||
redis.call('zadd', QlessThrottle.ns .. tid .. '-locks', unpack(arg)) | ||
end | ||
end, remove = function(...) | ||
if #arg > 0 then | ||
return redis.call('zrem', QlessThrottle.ns .. tid .. '-locks', unpack(arg)) | ||
end | ||
end, pop = function(min, max) | ||
return redis.call('zremrangebyrank', QlessThrottle.ns .. tid .. '-locks', min, max) | ||
end, peek = function(min, max) | ||
return redis.call('zrange', QlessThrottle.ns .. tid .. '-locks', min, max) | ||
end | ||
} | ||
|
||
-- set of jids which are waiting for the throttle to become available. | ||
throttle.pending = { | ||
length = function() | ||
return (redis.call('zcard', QlessThrottle.ns .. tid .. '-pending') or 0) | ||
end, members = function() | ||
return redis.call('zrange', QlessThrottle.ns .. tid .. '-pending', 0, -1) | ||
end, add = function(now, jid) | ||
redis.call('zadd', QlessThrottle.ns .. tid .. '-pending', now, jid) | ||
end, remove = function(...) | ||
if #arg > 0 then | ||
return redis.call('zrem', QlessThrottle.ns .. tid .. '-pending', unpack(arg)) | ||
end | ||
end, pop = function(min, max) | ||
return redis.call('zremrangebyrank', QlessThrottle.ns .. tid .. '-pending', min, max) | ||
end, peek = function(min, max) | ||
return redis.call('zrange', QlessThrottle.ns .. tid .. '-pending', min, max) | ||
end | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Given that there is duplication in there definition, and presumably we'll want to keep them in sync, I think it might be worth extracting some kind of factory method that takes the suffix and constructs the table. Thoughts? On a side note, I don't really get the use of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is also a large duplication between these and the queues themselves. factory for constructing these would be great. I thought about it while I was writing the code but didn't have time to include it. pop & peek you are correct (I'm fairly sure, its been a long time since I've looked at the code), pretty sure they were used for debugging at some point. |
||
|
||
return throttle | ||
end | ||
|
||
-- Failed([group, [start, [limit]]]) | ||
-- ------------------------------------ | ||
-- If no group is provided, this returns a JSON blob of the counts of the | ||
-- various groups of failures known. If a group is provided, it will report up | ||
-- to `limit` from `start` of the jobs affected by that issue. | ||
-- | ||
-- | ||
-- # If no group, then... | ||
-- { | ||
-- 'group1': 1, | ||
-- 'group2': 5, | ||
-- ... | ||
-- } | ||
-- | ||
-- | ||
-- # If a group is provided, then... | ||
-- { | ||
-- 'total': 20, | ||
|
@@ -119,9 +175,9 @@ end | |
------------------------------------------------------------------------------- | ||
-- Return all the job ids currently considered to be in the provided state | ||
-- in a particular queue. The response is a list of job ids: | ||
-- | ||
-- | ||
-- [ | ||
-- jid1, | ||
-- jid1, | ||
-- jid2, | ||
-- ... | ||
-- ] | ||
|
@@ -146,6 +202,8 @@ function Qless.jobs(now, state, ...) | |
return queue.locks.peek(now, offset, count) | ||
elseif state == 'stalled' then | ||
return queue.locks.expired(now, offset, count) | ||
elseif state == 'throttled' then | ||
return queue.throttled.peek(now, offset, count) | ||
elseif state == 'scheduled' then | ||
queue:check_scheduled(now, queue.scheduled.length()) | ||
return queue.scheduled.peek(now, offset, count) | ||
|
@@ -167,7 +225,7 @@ end | |
-- associated with that id, and 'untrack' stops tracking it. In this context, | ||
-- tracking is nothing more than saving the job to a list of jobs that are | ||
-- considered special. | ||
-- | ||
-- | ||
-- { | ||
-- 'jobs': [ | ||
-- { | ||
|
@@ -252,18 +310,17 @@ function Qless.tag(now, command, ...) | |
tags = cjson.decode(tags) | ||
local _tags = {} | ||
for i,v in ipairs(tags) do _tags[v] = true end | ||
|
||
-- Otherwise, add the job to the sorted set with that tags | ||
for i=2,#arg do | ||
local tag = arg[i] | ||
if _tags[tag] == nil then | ||
_tags[tag] = true | ||
table.insert(tags, tag) | ||
end | ||
redis.call('zadd', 'ql:t:' .. tag, now, jid) | ||
redis.call('zincrby', 'ql:tags', 1, tag) | ||
Qless.job(jid):insert_tag(now, tag) | ||
end | ||
|
||
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags)) | ||
return tags | ||
else | ||
|
@@ -278,18 +335,17 @@ function Qless.tag(now, command, ...) | |
tags = cjson.decode(tags) | ||
local _tags = {} | ||
for i,v in ipairs(tags) do _tags[v] = true end | ||
-- Otherwise, add the job to the sorted set with that tags | ||
|
||
-- Otherwise, remove the job from the sorted set with that tags | ||
for i=2,#arg do | ||
local tag = arg[i] | ||
_tags[tag] = nil | ||
redis.call('zrem', 'ql:t:' .. tag, jid) | ||
redis.call('zincrby', 'ql:tags', -1, tag) | ||
Qless.job(jid):remove_tag(tag) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's nice to see some better abstractions being added, rather than re-implementing low-level details in multiple places, so thanks for adding |
||
end | ||
|
||
local results = {} | ||
for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end | ||
|
||
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results)) | ||
return results | ||
else | ||
|
@@ -319,7 +375,7 @@ end | |
-- Cancel a job from taking place. It will be deleted from the system, and any | ||
-- attempts to renew a heartbeat will fail, and any attempts to complete it | ||
-- will fail. If you try to get the data on the object, you will get nothing. | ||
function Qless.cancel(...) | ||
function Qless.cancel(now, ...) | ||
-- Dependents is a mapping of a job to its dependent jids | ||
local dependents = {} | ||
for _, jid in ipairs(arg) do | ||
|
@@ -365,22 +421,20 @@ function Qless.cancel(...) | |
-- Remove it from that queue | ||
if queue then | ||
local queue = Qless.queue(queue) | ||
queue.work.remove(jid) | ||
queue.locks.remove(jid) | ||
queue.scheduled.remove(jid) | ||
queue.depends.remove(jid) | ||
queue:remove_job(jid) | ||
end | ||
|
||
local job = Qless.job(jid) | ||
|
||
job:throttles_release(now) | ||
|
||
-- We should probably go through all our dependencies and remove | ||
-- ourselves from the list of dependents | ||
for i, j in ipairs(redis.call( | ||
'smembers', QlessJob.ns .. jid .. '-dependencies')) do | ||
redis.call('srem', QlessJob.ns .. j .. '-dependents', jid) | ||
end | ||
|
||
-- Delete any notion of dependencies it has | ||
redis.call('del', QlessJob.ns .. jid .. '-dependencies') | ||
|
||
-- If we're in the failed state, remove all of our data | ||
if state == 'failed' then | ||
failure = cjson.decode(failure) | ||
|
@@ -398,25 +452,15 @@ function Qless.cancel(...) | |
'ql:s:stats:' .. bin .. ':' .. queue, 'failed', failed - 1) | ||
end | ||
|
||
-- Remove it as a job that's tagged with this particular tag | ||
local tags = cjson.decode( | ||
redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') | ||
for i, tag in ipairs(tags) do | ||
redis.call('zrem', 'ql:t:' .. tag, jid) | ||
redis.call('zincrby', 'ql:tags', -1, tag) | ||
end | ||
job:delete() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Again, it's good to see this extracted into a helper function. |
||
|
||
-- If the job was being tracked, we should notify | ||
if redis.call('zscore', 'ql:tracked', jid) ~= false then | ||
Qless.publish('canceled', jid) | ||
end | ||
|
||
-- Just go ahead and delete our data | ||
redis.call('del', QlessJob.ns .. jid) | ||
redis.call('del', QlessJob.ns .. jid .. '-history') | ||
end | ||
end | ||
|
||
return arg | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are there separate
queue.throttle.*
APIs when there are alreadythrottle.*
APIs? As far as I can tell, they aren't used by your qless gem PR...Anyhow, if we do still need them for some reason, I'd expect them to be implemented in terms of the
throttle.*
APIs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hold over from an original implementation. unnecessary =)