Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttling jobs via arbitrary resource - Issue #27 #44

Closed
wants to merge 53 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
a3728f0
simple throttle for qless
Mar 10, 2014
44df91c
finished renaming resources to throttle
Mar 10, 2014
14d0fe3
throttle tests wip
Mar 10, 2014
674353a
test fixes
Mar 10, 2014
32c795c
wip
Mar 10, 2014
4de319b
general working
Mar 10, 2014
493d157
throttle api
Mar 10, 2014
f6b9a38
throttle changes
Mar 10, 2014
36fd0fb
Add Throttle locks and pending member functions
Mar 10, 2014
3599db3
wip
Mar 10, 2014
9f3bdb9
wip
Mar 10, 2014
623ebd9
Fix tests
Mar 11, 2014
b21e519
Switch to sorted set
Mar 11, 2014
69bd3e7
Fix throttle locks and pending member functions
Mar 11, 2014
d3fe766
lock fixes
Mar 11, 2014
1e003e5
finished basic tests
Mar 11, 2014
4df4123
switched to rank instead of score
Mar 11, 2014
7fff784
Acquire throttle on pop
Mar 11, 2014
fe776ef
test fixes
Mar 11, 2014
4fc8e3c
test fixes
Mar 11, 2014
27aebd4
Add tests for dependent throttling
Mar 11, 2014
a3244fe
Use throttles to handle max-queue-concurrency
Mar 11, 2014
81e2103
removed commented code
Mar 11, 2014
b4e7765
support multiple resources
Mar 11, 2014
308a77e
implemented multiple throttles per job
Mar 11, 2014
5d86598
documentation
Mar 12, 2014
5141cbf
Add job tests for multiple throttles
Mar 12, 2014
a27ac2e
implemeted queue throttled
Mar 12, 2014
d230b54
Merge branch 'throttle' of github.com:backupify/qless-core into throttle
Mar 12, 2014
b159933
small changes
Mar 12, 2014
6522fe7
Add tests for dynamically changing throttle concurrency level
Mar 12, 2014
eaf6be1
Add tests to verify queue throttled
Mar 12, 2014
32a1408
Minor optimizations and test fixes
Mar 12, 2014
a70362b
Add test for queue throttled count
Mar 12, 2014
fc332c9
Add api methods for setting queue throttle max
Mar 12, 2014
b54298f
throttles refactor/cleanup
Mar 12, 2014
71ac5e0
Update queue.lua
Mar 12, 2014
aadd902
fixed syntax error
Mar 13, 2014
9595d5e
Simplify job#acquire_throttles
Mar 13, 2014
769d4f3
Remove commented code
Mar 13, 2014
f7a3b6f
misc fixes
Mar 13, 2014
730c668
Merge branch 'throttle' of github.com:backupify/qless-core into throttle
Mar 13, 2014
7d85dc8
attempt to throttle jobs immediately
Mar 13, 2014
2d9810c
added optional expiration to a throttle
Mar 13, 2014
4728f1d
removed printlines
Mar 13, 2014
85350ea
Add API to retrieve throttle ttl
Mar 13, 2014
5668c32
Set queue throttle expiration to 0
Mar 13, 2014
0d8290e
Update test_job.py
Mar 13, 2014
9a37225
Merge pull request #1 from backupify/throttle
Mar 13, 2014
70a2779
Catch when job has no throttles
Mar 13, 2014
dd2d931
test exposing cancel bug
wr0ngway Mar 14, 2014
8f004dc
fix for cancelled
Mar 31, 2014
1ab0100
Merge pull request #1 from backupify/fix_cancel
Mar 31, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
all: qless.lua qless-lib.lua

qless-lib.lua: base.lua config.lua job.lua queue.lua recurring.lua worker.lua
qless-lib.lua: base.lua config.lua job.lua queue.lua recurring.lua worker.lua throttle.lua
echo "-- Current SHA: `git rev-parse HEAD`" > qless-lib.lua
echo "-- This is a generated file" >> qless-lib.lua
cat base.lua config.lua job.lua queue.lua recurring.lua worker.lua >> qless-lib.lua
cat base.lua config.lua job.lua queue.lua recurring.lua worker.lua throttle.lua >> qless-lib.lua

qless.lua: qless-lib.lua api.lua
# Cat these files out, but remove all the comments from the source
Expand All @@ -18,4 +18,4 @@ clean:

.PHONY: test
test: qless.lua *.lua
nosetests --exe -v
nosetests --exe -v $(TEST)
38 changes: 37 additions & 1 deletion api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ QlessAPI.unpause = function(now, ...)
end

QlessAPI.cancel = function(now, ...)
return Qless.cancel(unpack(arg))
return Qless.cancel(now, unpack(arg))
end

QlessAPI.timeout = function(now, ...)
Expand Down Expand Up @@ -193,6 +193,42 @@ QlessAPI['queue.forget'] = function(now, ...)
QlessQueue.deregister(unpack(arg))
end

QlessAPI['queue.throttle.get'] = function(now, queue)
local data = Qless.throttle(QlessQueue.ns .. queue):data()
if not data then
return nil
end
return cjson.encode(data)
end

QlessAPI['queue.throttle.set'] = function(now, queue, max)
Qless.throttle(QlessQueue.ns .. queue):set({maximum = max}, 0)
end

-- Throttle apis
QlessAPI['throttle.set'] = function(now, tid, max, ...)
local expiration = unpack(arg)
local data = {
maximum = max
}
Qless.throttle(tid):set(data, tonumber(expiration or 0))
end

QlessAPI['throttle.get'] = function(now, tid)
return cjson.encode(Qless.throttle(tid):data())
end

QlessAPI['throttle.delete'] = function(now, tid)
return Qless.throttle(tid):unset()
end

QlessAPI['throttle.locks'] = function(now, tid)
return Qless.throttle(tid).locks.members()
end

QlessAPI['throttle.ttl'] = function(now, tid)
return Qless.throttle(tid):ttl()
end
-------------------------------------------------------------------------------
-- Function lookup
-------------------------------------------------------------------------------
Expand Down
74 changes: 58 additions & 16 deletions base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ local QlessJob = {
}
QlessJob.__index = QlessJob

-- throttle forward declaration
local QlessThrottle = {
ns = Qless.ns .. 'th:'
}
QlessThrottle.__index = QlessThrottle

-- RecurringJob forward declaration
local QlessRecurringJob = {}
QlessRecurringJob.__index = QlessRecurringJob
Expand Down Expand Up @@ -61,19 +67,54 @@ function Qless.recurring(jid)
return job
end

-- Return a throttle object
-- throttle objects are used for arbitrary throttling of jobs.
function Qless.throttle(tid)
assert(tid, 'Throttle(): no tid provided')
local throttle = QlessThrottle.data({id = tid})
if not throttle then
throttle = {
id = tid,
maximum = 0
}
end
setmetatable(throttle, QlessThrottle)

-- set of jids which have acquired a lock on this throttle.
throttle.locks = {
length = function()
return (redis.call('zcard', QlessThrottle.ns .. tid .. '-locks') or 0)
end, members = function()
return redis.call('zrange', QlessThrottle.ns .. tid .. '-locks', 0, -1)
end, add = function(...)
if #arg > 0 then
redis.call('zadd', QlessThrottle.ns .. tid .. '-locks', unpack(arg))
end
end, remove = function(...)
if #arg > 0 then
return redis.call('zrem', QlessThrottle.ns .. tid .. '-locks', unpack(arg))
end
end, pop = function(min, max)
return redis.call('zremrangebyrank', QlessThrottle.ns .. tid .. '-locks', min, max)
end
}

return throttle
end

-- Failed([group, [start, [limit]]])
-- ------------------------------------
-- If no group is provided, this returns a JSON blob of the counts of the
-- various groups of failures known. If a group is provided, it will report up
-- to `limit` from `start` of the jobs affected by that issue.
--
--
-- # If no group, then...
-- {
-- 'group1': 1,
-- 'group2': 5,
-- ...
-- }
--
--
-- # If a group is provided, then...
-- {
-- 'total': 20,
Expand Down Expand Up @@ -119,9 +160,9 @@ end
-------------------------------------------------------------------------------
-- Return all the job ids currently considered to be in the provided state
-- in a particular queue. The response is a list of job ids:
--
--
-- [
-- jid1,
-- jid1,
-- jid2,
-- ...
-- ]
Expand All @@ -146,6 +187,8 @@ function Qless.jobs(now, state, ...)
return queue.locks.peek(now, offset, count)
elseif state == 'stalled' then
return queue.locks.expired(now, offset, count)
elseif state == 'throttled' then
return queue.throttled.peek(now, offset, count)
elseif state == 'scheduled' then
queue:check_scheduled(now, queue.scheduled.length())
return queue.scheduled.peek(now, offset, count)
Expand All @@ -167,7 +210,7 @@ end
-- associated with that id, and 'untrack' stops tracking it. In this context,
-- tracking is nothing more than saving the job to a list of jobs that are
-- considered special.
--
--
-- {
-- 'jobs': [
-- {
Expand Down Expand Up @@ -252,7 +295,7 @@ function Qless.tag(now, command, ...)
tags = cjson.decode(tags)
local _tags = {}
for i,v in ipairs(tags) do _tags[v] = true end

-- Otherwise, add the job to the sorted set with that tags
for i=2,#arg do
local tag = arg[i]
Expand All @@ -263,7 +306,7 @@ function Qless.tag(now, command, ...)
redis.call('zadd', 'ql:t:' .. tag, now, jid)
redis.call('zincrby', 'ql:tags', 1, tag)
end

tags = cjson.encode(tags)
redis.call('hset', QlessJob.ns .. jid, 'tags', tags)
return tags
Expand All @@ -279,18 +322,18 @@ function Qless.tag(now, command, ...)
tags = cjson.decode(tags)
local _tags = {}
for i,v in ipairs(tags) do _tags[v] = true end

-- Otherwise, add the job to the sorted set with that tags
for i=2,#arg do
local tag = arg[i]
_tags[tag] = nil
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
end

local results = {}
for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end

tags = cjson.encode(results)
redis.call('hset', QlessJob.ns .. jid, 'tags', tags)
return results
Expand Down Expand Up @@ -321,7 +364,7 @@ end
-- Cancel a job from taking place. It will be deleted from the system, and any
-- attempts to renew a heartbeat will fail, and any attempts to complete it
-- will fail. If you try to get the data on the object, you will get nothing.
function Qless.cancel(...)
function Qless.cancel(now, ...)
-- Dependents is a mapping of a job to its dependent jids
local dependents = {}
for _, jid in ipairs(arg) do
Expand Down Expand Up @@ -367,12 +410,11 @@ function Qless.cancel(...)
-- Remove it from that queue
if queue then
local queue = Qless.queue(queue)
queue.work.remove(jid)
queue.locks.remove(jid)
queue.scheduled.remove(jid)
queue.depends.remove(jid)
queue:remove_job(jid)
end

Qless.job(jid):throttles_release(now)

-- We should probably go through all our dependencies and remove
-- ourselves from the list of dependents
for i, j in ipairs(redis.call(
Expand Down Expand Up @@ -418,7 +460,7 @@ function Qless.cancel(...)
redis.call('del', QlessJob.ns .. jid .. '-history')
end
end

return arg
end

Loading