diff --git a/.gitignore b/.gitignore index 380be18..d86a33c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ .vagrant qless-lib.lua qless.lua +/env +/.idea + diff --git a/job.lua b/job.lua index 251a5d4..0948b4b 100644 --- a/job.lua +++ b/job.lua @@ -66,11 +66,12 @@ end -- ('depends', : Json of jobs it depends on in the new queue -- '["jid1", "jid2", ...]') --- -function QlessJob:complete(now, worker, queue, raw_data, ...) +function QlessJob:complete(now, worker, queue, data, ...) assert(worker, 'Complete(): Arg "worker" missing') assert(queue , 'Complete(): Arg "queue" missing') - local data = assert(cjson.decode(raw_data), - 'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data)) + if data then + assert(cjson.decode(data), 'Complete(): Arg "data" not JSON: ' .. tostring(data)) + end -- Read in all the optional parameters local options = {} @@ -122,8 +123,8 @@ function QlessJob:complete(now, worker, queue, raw_data, ...) -- update history self:history(now, 'done') - if raw_data then - redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data) + if data then + redis.call('hset', QlessJob.ns .. self.jid, 'data', data) end -- Remove the job from the previous queue @@ -334,7 +335,7 @@ function QlessJob:fail(now, worker, group, message, data) local bin = now - (now % 86400) if data then - data = cjson.decode(data) + assert(cjson.decode(data), 'Fail(): Arg "data" not JSON: ' .. tostring(data)) end -- First things first, we should get the history @@ -386,7 +387,7 @@ function QlessJob:fail(now, worker, group, message, data) -- The reason that this appears here is that the above will fail if the -- job doesn't exist if data then - redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data)) + redis.call('hset', QlessJob.ns .. self.jid, 'data', data) end redis.call('hmset', QlessJob.ns .. self.jid, @@ -619,7 +620,7 @@ function QlessJob:heartbeat(now, worker, data) Qless.config.get('heartbeat', 60)) if data then - data = cjson.decode(data) + assert(cjson.decode(data), 'Heartbeat(): Arg "data" not JSON: ' .. tostring(data)) end -- First, let's see if the worker still owns this job, and there is a @@ -639,10 +640,8 @@ function QlessJob:heartbeat(now, worker, data) else -- Otherwise, optionally update the user data, and the heartbeat if data then - -- I don't know if this is wise, but I'm decoding and encoding - -- the user data to hopefully ensure its sanity redis.call('hmset', QlessJob.ns .. self.jid, 'expires', - expires, 'worker', worker, 'data', cjson.encode(data)) + expires, 'worker', worker, 'data', data) else redis.call('hmset', QlessJob.ns .. self.jid, 'expires', expires, 'worker', worker) diff --git a/queue.lua b/queue.lua index 04e8497..47e9207 100644 --- a/queue.lua +++ b/queue.lua @@ -412,11 +412,10 @@ end -- ----------------------- -- Insert a job into the queue with the given priority, tags, delay, klass and -- data. -function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) +function QlessQueue:put(now, worker, jid, klass, data, delay, ...) assert(jid , 'Put(): Arg "jid" missing') assert(klass, 'Put(): Arg "klass" missing') - local data = assert(cjson.decode(raw_data), - 'Put(): Arg "data" missing or not JSON: ' .. tostring(raw_data)) + assert(cjson.decode(data), 'Put(): Arg "data" missing or not JSON: ' .. tostring(data)) delay = assert(tonumber(delay), 'Put(): Arg "delay" not a number: ' .. tostring(delay)) @@ -538,7 +537,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) redis.call('hmset', QlessJob.ns .. jid, 'jid' , jid, 'klass' , klass, - 'data' , raw_data, + 'data' , data, 'priority' , priority, 'tags' , cjson.encode(tags), 'state' , ((delay > 0) and 'scheduled') or 'waiting', @@ -630,12 +629,11 @@ function QlessQueue:unfail(now, group, count) end -- Recur a job of type klass in this queue -function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) +function QlessQueue:recur(now, jid, klass, data, spec, ...) assert(jid , 'RecurringJob On(): Arg "jid" missing') assert(klass, 'RecurringJob On(): Arg "klass" missing') assert(spec , 'RecurringJob On(): Arg "spec" missing') - local data = assert(cjson.decode(raw_data), - 'RecurringJob On(): Arg "data" not JSON: ' .. tostring(raw_data)) + assert(cjson.decode(data), 'RecurringJob On(): Arg "data" missing or not JSON: ' .. tostring(data)) -- At some point in the future, we may have different types of recurring -- jobs, but for the time being, we only have 'interval'-type jobs @@ -683,7 +681,7 @@ function QlessQueue:recur(now, jid, klass, raw_data, spec, ...) redis.call('hmset', 'ql:r:' .. jid, 'jid' , jid, 'klass' , klass, - 'data' , raw_data, + 'data' , data, 'priority', options.priority, 'tags' , cjson.encode(options.tags or {}), 'state' , 'recur',