Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttling on arbitrary resources. #46

Closed
wants to merge 89 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
a3728f0
simple throttle for qless
Mar 10, 2014
44df91c
finished renaming resources to throttle
Mar 10, 2014
14d0fe3
throttle tests wip
Mar 10, 2014
674353a
test fixes
Mar 10, 2014
32c795c
wip
Mar 10, 2014
4de319b
general working
Mar 10, 2014
493d157
throttle api
Mar 10, 2014
f6b9a38
throttle changes
Mar 10, 2014
36fd0fb
Add Throttle locks and pending member functions
Mar 10, 2014
3599db3
wip
Mar 10, 2014
9f3bdb9
wip
Mar 10, 2014
623ebd9
Fix tests
Mar 11, 2014
b21e519
Switch to sorted set
Mar 11, 2014
69bd3e7
Fix throttle locks and pending member functions
Mar 11, 2014
d3fe766
lock fixes
Mar 11, 2014
1e003e5
finished basic tests
Mar 11, 2014
4df4123
switched to rank instead of score
Mar 11, 2014
7fff784
Acquire throttle on pop
Mar 11, 2014
fe776ef
test fixes
Mar 11, 2014
4fc8e3c
test fixes
Mar 11, 2014
27aebd4
Add tests for dependent throttling
Mar 11, 2014
a3244fe
Use throttles to handle max-queue-concurrency
Mar 11, 2014
81e2103
removed commented code
Mar 11, 2014
b4e7765
support multiple resources
Mar 11, 2014
308a77e
implemented multiple throttles per job
Mar 11, 2014
5d86598
documentation
Mar 12, 2014
5141cbf
Add job tests for multiple throttles
Mar 12, 2014
a27ac2e
implemeted queue throttled
Mar 12, 2014
d230b54
Merge branch 'throttle' of github.com:backupify/qless-core into throttle
Mar 12, 2014
b159933
small changes
Mar 12, 2014
6522fe7
Add tests for dynamically changing throttle concurrency level
Mar 12, 2014
eaf6be1
Add tests to verify queue throttled
Mar 12, 2014
32a1408
Minor optimizations and test fixes
Mar 12, 2014
a70362b
Add test for queue throttled count
Mar 12, 2014
fc332c9
Add api methods for setting queue throttle max
Mar 12, 2014
b54298f
throttles refactor/cleanup
Mar 12, 2014
71ac5e0
Update queue.lua
Mar 12, 2014
aadd902
fixed syntax error
Mar 13, 2014
9595d5e
Simplify job#acquire_throttles
Mar 13, 2014
769d4f3
Remove commented code
Mar 13, 2014
f7a3b6f
misc fixes
Mar 13, 2014
730c668
Merge branch 'throttle' of github.com:backupify/qless-core into throttle
Mar 13, 2014
7d85dc8
attempt to throttle jobs immediately
Mar 13, 2014
2d9810c
added optional expiration to a throttle
Mar 13, 2014
4728f1d
removed printlines
Mar 13, 2014
85350ea
Add API to retrieve throttle ttl
Mar 13, 2014
5668c32
Set queue throttle expiration to 0
Mar 13, 2014
0d8290e
Update test_job.py
Mar 13, 2014
9a37225
Merge pull request #1 from backupify/throttle
Mar 13, 2014
70a2779
Catch when job has no throttles
Mar 13, 2014
dd2d931
test exposing cancel bug
wr0ngway Mar 14, 2014
8f004dc
fix for cancelled
Mar 31, 2014
a9c3b98
wip
Apr 1, 2014
d1b52be
Merge pull request #2 from backupify/fix_cancel
Apr 1, 2014
d7372ab
removed printline statements
Apr 2, 2014
f1bf59f
removed debugging code
Apr 2, 2014
3b80d6c
Merge pull request #3 from backupify/throttle
Apr 11, 2014
3108245
fixes throttle release to properly remove a job from the throttled set
May 12, 2014
d058374
small optimization
May 12, 2014
ee6f039
test name change
May 12, 2014
42aa9be
whitespace fix
May 12, 2014
3bffc21
Merge pull request #4 from backupify/cancel-fix
May 12, 2014
d9903a5
fix tags work
Jun 3, 2014
56df3fc
test fix
Jun 3, 2014
bd089c0
removed unnecessary code
Jun 3, 2014
ed4a0f8
removed unnecessary change
Jun 3, 2014
9a766e6
Merge pull request #5 from backupify/fix-tags
Jun 3, 2014
3550b9b
patches remove_tag method to be more reliable
Jun 4, 2014
d821344
Merge pull request #6 from backupify/patch-remove-tags
Jun 4, 2014
66c1ebd
Merge branch 'upstream-master' into merge-upstream
Aug 8, 2014
3fe1fcd
merged master
Aug 8, 2014
6451b7c
Merge remote-tracking branch 'upstream/master' into merge-upstream
Aug 8, 2014
65dac2d
Merge pull request #7 from backupify/merge-upstream
james-lawrence Aug 18, 2014
cda5ed8
Update throttle.lua
james-lawrence Nov 26, 2014
b6fd1aa
test fixes
May 4, 2015
38e2493
Merge branch 'upstream' into merge-upstream
May 4, 2015
5dbc192
Merge pull request #8 from backupify/merge-upstream
james-lawrence May 4, 2015
21097aa
Merge branch 'upstream-master'
May 4, 2015
7932ac9
Merge branch 'master' of github.com:backupify/qless-core
May 4, 2015
8fda126
retry pop up to config limit when pop quantity would be unfulfilled d…
wr0ngway Jul 23, 2015
98b71d6
short circuit retry if nothing in work queue
wr0ngway Jul 23, 2015
4718824
verify contents of waiting jids in tests for max-pop-retry
wr0ngway Jul 31, 2015
8baa512
Merge pull request #9 from backupify/pop_retry
Jul 31, 2015
9e5e716
Update api.lua
james-lawrence Sep 10, 2015
e8ed50c
Update test_throttle.py
james-lawrence Sep 10, 2015
230a777
tests
Sep 10, 2015
e3fdb7e
Merge pull request #10 from backupify/throttle-release-job-api
james-lawrence Sep 10, 2015
6dbf028
Ignore ghost jids that have no real job
Jul 5, 2016
20dc687
Remove old queue throttle from reput job
Jul 8, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
all: qless.lua qless-lib.lua

qless-lib.lua: base.lua config.lua job.lua queue.lua recurring.lua worker.lua
qless-lib.lua: base.lua config.lua job.lua queue.lua recurring.lua worker.lua throttle.lua
echo "-- Current SHA: `git rev-parse HEAD`" > qless-lib.lua
echo "-- This is a generated file" >> qless-lib.lua
cat base.lua config.lua job.lua queue.lua recurring.lua worker.lua >> qless-lib.lua
cat base.lua config.lua job.lua queue.lua recurring.lua worker.lua throttle.lua >> qless-lib.lua

qless.lua: qless-lib.lua api.lua
# Cat these files out, but remove all the comments from the source
Expand All @@ -18,4 +18,4 @@ clean:

.PHONY: test
test: qless.lua *.lua
nosetests --exe -v
nosetests --exe -v $(TEST)
54 changes: 33 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,24 @@ installed:
pip install redis nose
```

To run the tests, there is a directive included in the makefile:
To run all the tests, there is a directive included in the makefile:

```bash
make test
```

You can run specific test files by passing the TEST environment variable to make:

```bash
make test TEST=test/test_worker.py
```

or for a single test case in that file:

```bash
make test TEST=test/test_worker.py:TestWorker.test_basic
```

If you have Redis running somewhere other than `localhost:6379`, you can supply
the `REDIS_URL` environment variable:

Expand Down Expand Up @@ -107,7 +119,7 @@ the heartbeat should return `false` and the worker should yield.

When a node attempts to heartbeat, the lua script should check to see if the
node attempting to renew the lock is the same node that currently owns the
lock. If so, then the lock's expiration should be pushed back accordingly,
lock. If so, then the lock's expiration should be pushed back accordingly,
and the updated expiration returned. If not, an exception is raised.

Stats
Expand All @@ -117,7 +129,7 @@ and job completion time (time completed - time popped). By 'statistics',
I mean average, variange, count and a histogram. Stats for the number of
failures and retries for a given queue are also available.

Stats are grouped by day. In the case of job wait time, its stats are
Stats are grouped by day. In the case of job wait time, its stats are
aggregated on the day when the job was popped. In the case of completion time,
they are grouped by the day it was completed.

Expand All @@ -141,7 +153,7 @@ might have:
=============
upload error
widget failure

ql:f:upload error
==================
deadbeef
Expand Down Expand Up @@ -211,45 +223,45 @@ job is stored in `ql:j:<jid>-dependents`. For example, `ql:j:<jid>`:
# This is the same id as identifies it in the key. It should be
# a hex value of a uuid
'jid' : 'deadbeef...',

# This is a 'type' identifier. Clients may choose to ignore it,
# or use it as a language-specific identifier for determining
# what code to run. For instance, it might be 'foo.bar.FooJob'
'type' : '...',

# This is the priority of the job -- lower means more priority.
# The default is 0
'priority' : 0,

# This is the user data associated with the job. (JSON blob)
'data' : '{"hello": "how are you"}',

# A JSON array of tags associated with this job
'tags' : '["testing", "experimental"]',

# The worker ID of the worker that owns it. Currently the worker
# id is <hostname>-<pid>
'worker' : 'ec2-...-4925',

# This is the time when it must next check in
'expires' : 1352375209,

# The current state of the job: 'waiting', 'pending', 'complete'
'state' : 'waiting',

# The queue that it's associated with. 'null' if complete
'queue' : 'example',

# The maximum number of retries this job is allowed per queue
'retries' : 3,
# The number of retries remaining
'remaining' : 3,

# The jids that depend on this job's completion
'dependents' : [...],
# The jids that this job is dependent upon
'dependencies': [...],

# A list of all the things that have happened to a job. Each entry has
# the keys 'what' and 'when', but it may also have arbitrary keys
# associated with it.
Expand All @@ -274,11 +286,11 @@ A queue is a priority queue and consists of three parts:
1. `ql:q:<name>-depends` -- sorted set of jobs in a queue, but waiting on
other jobs

When looking for a unit of work, the client should first choose from the
When looking for a unit of work, the client should first choose from the
next expired lock. If none are expired, then we should next make sure that
any jobs that should now be considered eligible (the scheduled time is in
the past) are then inserted into the work queue. A sorted set of all the
known queues is maintained at `ql:queues`. Currently we're keeping it
the past) are then inserted into the work queue. A sorted set of all the
known queues is maintained at `ql:queues`. Currently we're keeping it
sorted based on the time when we first saw the queue, but that's a little
bit at odd with only keeping queues around while they're being used.

Expand All @@ -293,7 +305,7 @@ an integer timestamp of midnight for that day:

<day> = time - (time % (24 * 60 * 60))

Stats are stored under two hashes: `ql:s:wait:<day>:<queue>` and
Stats are stored under two hashes: `ql:s:wait:<day>:<queue>` and
`ql:s:run:<day>:<queue>` respectively. Each has the keys:

- `total` -- The total number of data points contained
Expand Down Expand Up @@ -363,8 +375,8 @@ something to be aware of when writing language bindings.

Filesystem Access
-----------------
It's intended to be a common usecase that bindings provide a worker script or
binary that runs several worker subprocesses. These should run with their
It's intended to be a common usecase that bindings provide a worker script or
binary that runs several worker subprocesses. These should run with their
working directory as a sandbox.

Forking Model
Expand Down
51 changes: 50 additions & 1 deletion api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ QlessAPI.unpause = function(now, ...)
end

QlessAPI.cancel = function(now, ...)
return Qless.cancel(unpack(arg))
return Qless.cancel(now, unpack(arg))
end

QlessAPI.timeout = function(now, ...)
Expand Down Expand Up @@ -199,6 +199,55 @@ QlessAPI['queue.forget'] = function(now, ...)
QlessQueue.deregister(unpack(arg))
end

QlessAPI['queue.throttle.get'] = function(now, queue)
local data = Qless.throttle(QlessQueue.ns .. queue):data()
if not data then
return nil
end
return cjson.encode(data)
end

QlessAPI['queue.throttle.set'] = function(now, queue, max)
Qless.throttle(QlessQueue.ns .. queue):set({maximum = max}, 0)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are there separate queue.throttle.* APIs when there are already throttle.* APIs? As far as I can tell, they aren't used by your qless gem PR...

Anyhow, if we do still need them for some reason, I'd expect them to be implemented in terms of the throttle.* APIs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hold over from an original implementation. unnecessary =)


-- Throttle apis
QlessAPI['throttle.set'] = function(now, tid, max, ...)
local expiration = unpack(arg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why deal with ... and unpack(arg), rather than declaring expiration as an argument?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was to make it an optional argument, not terrible strong in lua.

local data = {
maximum = max
}
Qless.throttle(tid):set(data, tonumber(expiration or 0))
end

QlessAPI['throttle.get'] = function(now, tid)
return cjson.encode(Qless.throttle(tid):data())
end

QlessAPI['throttle.delete'] = function(now, tid)
return Qless.throttle(tid):unset()
end

QlessAPI['throttle.locks'] = function(now, tid)
return Qless.throttle(tid).locks.members()
end

QlessAPI['throttle.pending'] = function(now, tid)
return Qless.throttle(tid).pending.members()
end

QlessAPI['throttle.ttl'] = function(now, tid)
return Qless.throttle(tid):ttl()
end

-- releases the set of jids from the specified throttle.
QlessAPI['throttle.release'] = function(now, tid, ...)
local throttle = Qless.throttle(tid)

for _, jid in ipairs(arg) do
throttle:release(now, jid)
end
end
-------------------------------------------------------------------------------
-- Function lookup
-------------------------------------------------------------------------------
Expand Down
Loading