Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save job data as-is without re-encoding #83

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
.vagrant
qless-lib.lua
qless.lua
/env
/.idea

21 changes: 10 additions & 11 deletions job.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ end
-- ('depends', : Json of jobs it depends on in the new queue
-- '["jid1", "jid2", ...]')
---
function QlessJob:complete(now, worker, queue, raw_data, ...)
function QlessJob:complete(now, worker, queue, data, ...)
assert(worker, 'Complete(): Arg "worker" missing')
assert(queue , 'Complete(): Arg "queue" missing')
local data = assert(cjson.decode(raw_data),
'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data))
if data then
assert(cjson.decode(data), 'Complete(): Arg "data" not JSON: ' .. tostring(data))
end

-- Read in all the optional parameters
local options = {}
Expand Down Expand Up @@ -122,8 +123,8 @@ function QlessJob:complete(now, worker, queue, raw_data, ...)
-- update history
self:history(now, 'done')

if raw_data then
redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data)
if data then
redis.call('hset', QlessJob.ns .. self.jid, 'data', data)
end

-- Remove the job from the previous queue
Expand Down Expand Up @@ -334,7 +335,7 @@ function QlessJob:fail(now, worker, group, message, data)
local bin = now - (now % 86400)

if data then
data = cjson.decode(data)
assert(cjson.decode(data), 'Fail(): Arg "data" not JSON: ' .. tostring(data))
end

-- First things first, we should get the history
Expand Down Expand Up @@ -386,7 +387,7 @@ function QlessJob:fail(now, worker, group, message, data)
-- The reason that this appears here is that the above will fail if the
-- job doesn't exist
if data then
redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data))
redis.call('hset', QlessJob.ns .. self.jid, 'data', data)
end

redis.call('hmset', QlessJob.ns .. self.jid,
Expand Down Expand Up @@ -619,7 +620,7 @@ function QlessJob:heartbeat(now, worker, data)
Qless.config.get('heartbeat', 60))

if data then
data = cjson.decode(data)
assert(cjson.decode(data), 'Heartbeat(): Arg "data" not JSON: ' .. tostring(data))
end

-- First, let's see if the worker still owns this job, and there is a
Expand All @@ -639,10 +640,8 @@ function QlessJob:heartbeat(now, worker, data)
else
-- Otherwise, optionally update the user data, and the heartbeat
if data then
-- I don't know if this is wise, but I'm decoding and encoding
-- the user data to hopefully ensure its sanity
redis.call('hmset', QlessJob.ns .. self.jid, 'expires',
expires, 'worker', worker, 'data', cjson.encode(data))
expires, 'worker', worker, 'data', data)
else
redis.call('hmset', QlessJob.ns .. self.jid,
'expires', expires, 'worker', worker)
Expand Down
14 changes: 6 additions & 8 deletions queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,10 @@ end
-- -----------------------
-- Insert a job into the queue with the given priority, tags, delay, klass and
-- data.
function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...)
function QlessQueue:put(now, worker, jid, klass, data, delay, ...)
assert(jid , 'Put(): Arg "jid" missing')
assert(klass, 'Put(): Arg "klass" missing')
local data = assert(cjson.decode(raw_data),
'Put(): Arg "data" missing or not JSON: ' .. tostring(raw_data))
assert(cjson.decode(data), 'Put(): Arg "data" missing or not JSON: ' .. tostring(data))
delay = assert(tonumber(delay),
'Put(): Arg "delay" not a number: ' .. tostring(delay))

Expand Down Expand Up @@ -538,7 +537,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...)
redis.call('hmset', QlessJob.ns .. jid,
'jid' , jid,
'klass' , klass,
'data' , raw_data,
'data' , data,
'priority' , priority,
'tags' , cjson.encode(tags),
'state' , ((delay > 0) and 'scheduled') or 'waiting',
Expand Down Expand Up @@ -630,12 +629,11 @@ function QlessQueue:unfail(now, group, count)
end

-- Recur a job of type klass in this queue
function QlessQueue:recur(now, jid, klass, raw_data, spec, ...)
function QlessQueue:recur(now, jid, klass, data, spec, ...)
assert(jid , 'RecurringJob On(): Arg "jid" missing')
assert(klass, 'RecurringJob On(): Arg "klass" missing')
assert(spec , 'RecurringJob On(): Arg "spec" missing')
local data = assert(cjson.decode(raw_data),
'RecurringJob On(): Arg "data" not JSON: ' .. tostring(raw_data))
assert(cjson.decode(data), 'RecurringJob On(): Arg "data" missing or not JSON: ' .. tostring(data))

-- At some point in the future, we may have different types of recurring
-- jobs, but for the time being, we only have 'interval'-type jobs
Expand Down Expand Up @@ -683,7 +681,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...)
redis.call('hmset', 'ql:r:' .. jid,
'jid' , jid,
'klass' , klass,
'data' , raw_data,
'data' , data,
'priority', options.priority,
'tags' , cjson.encode(options.tags or {}),
'state' , 'recur',
Expand Down