forked from albrow/jobs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscripts.go
375 lines (345 loc) · 13.6 KB
/
scripts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
// Copyright 2015 Alex Browne. All rights reserved.
// Use of this source code is governed by the MIT
// license, which can be found in the LICENSE file.
// File scripts.go contains code related to parsing
// lua scripts in the scripts file.
// This file has been automatically generated by go generate,
// which calls scripts/generate.go. Do not edit it directly!
package jobs
import (
"github.com/garyburd/redigo/redis"
)
var (
addJobToSetScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- add_job_to_set represents a lua script that takes the following arguments:
-- 1) The id of the job
-- 2) The name of a sorted set
-- 3) The score the inserted job should have in the sorted set
-- It first checks if the job exists in the database (has not been destroyed)
-- and then adds it to the sorted set with the given score.
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
local jobId = ARGV[1]
local setName = ARGV[2]
local score = ARGV[3]
local jobKey = 'jobs:' .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
return
end
redis.call('ZADD', setName, score, jobId)`)
destroyJobScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- destroy_job is a lua script that takes the following arguments:
-- 1) The id of the job to destroy
-- It then removes all traces of the job in the database by doing the following:
-- 1) Removes the job from the status set (which it determines with an HGET call)
-- 2) Removes the job from the time index
-- 3) Removes the main hash for the job
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
-- Assign args to variables for easy reference
local jobId = ARGV[1]
local jobKey = 'jobs:' .. jobId
-- Remove the job from the status set
local status = redis.call('HGET', jobKey, 'status')
if status ~= '' then
local statusSet = 'jobs:' .. status
redis.call('ZREM', statusSet, jobId)
end
-- Remove the job from the time index
redis.call('ZREM', 'jobs:time', jobId)
-- Remove the main hash for the job
redis.call('DEL', jobKey)`)
getJobsByIdsScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- get_jobs_by_ids is a lua script that takes the following arguments:
-- 1) The key of a sorted set of some job ids
-- The script then gets all the data for those job ids from their respective
-- hashes in the database. It returns an array of arrays where each element
-- contains the fields for a particular job, and the jobs are sorted by
-- priority.
-- Here's an example response:
-- [
-- [
-- "id", "afj9afjpa30",
-- "data", [34, 67, 34, 23, 56, 67, 78, 79],
-- "type", "emailJob",
-- "time", 1234567,
-- "freq", 0,
-- "priority", 100,
-- "retries", 0,
-- "status", "executing",
-- "started", 0,
-- "finished", 0,
-- ],
-- [
-- "id", "E8v2ovkdaIw",
-- "data", [46, 43, 12, 08, 34, 45, 57, 43],
-- "type", "emailJob",
-- "time", 1234568,
-- "freq", 0,
-- "priority", 95,
-- "retries", 0,
-- "status", "executing",
-- "started", 0,
-- "finished", 0,
-- ]
-- ]
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
-- Assign keys to variables for easy access
local setKey = ARGV[1]
-- Get all the ids from the set name
local jobIds = redis.call('ZREVRANGE', setKey, 0, -1)
local allJobs = {}
if #jobIds > 0 then
-- Iterate over the ids and find each job
for i, jobId in ipairs(jobIds) do
local jobKey = 'jobs:' .. jobId
local jobFields = redis.call('HGETALL', jobKey)
-- Add the id itself to the fields
jobFields[#jobFields+1] = 'id'
jobFields[#jobFields+1] = jobId
-- Add the field values to allJobs
allJobs[#allJobs+1] = jobFields
end
end
return allJobs`)
popNextJobsScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- pop_next_jobs is a lua script that takes the following arguments:
-- 1) The maximum number of jobs to pop and return
-- 2) The current unix time UTC with nanosecond precision
-- The script gets the next available jobs from the queued set which are
-- ready based on their time parameter. Then it adds those jobs to the
-- executing set, sets their status to executing, and removes them from the
-- queued set. It returns an array of arrays where each element contains the
-- fields for a particular job, and the jobs are sorted by priority.
-- Here's an example response:
-- [
-- [
-- "id", "afj9afjpa30",
-- "data", [34, 67, 34, 23, 56, 67, 78, 79],
-- "type", "emailJob",
-- "time", 1234567,
-- "freq", 0,
-- "priority", 100,
-- "retries", 0,
-- "status", "executing",
-- "started", 0,
-- "finished", 0,
-- ],
-- [
-- "id", "E8v2ovkdaIw",
-- "data", [46, 43, 12, 08, 34, 45, 57, 43],
-- "type", "emailJob",
-- "time", 1234568,
-- "freq", 0,
-- "priority", 95,
-- "retries", 0,
-- "status", "executing",
-- "started", 0,
-- "finished", 0,
-- ]
-- ]
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
-- Assign args to variables for easy reference
local n = ARGV[1]
local currentTime = ARGV[2]
local poolId = ARGV[3]
-- Copy the time index set to a new temporary set
redis.call('ZUNIONSTORE', 'jobs:temp', 1, 'jobs:time')
-- Trim the new temporary set we just created to leave only the jobs which have a time
-- parameter in the past
redis.call('ZREMRANGEBYSCORE', 'jobs:temp', currentTime, '+inf')
-- Intersect the jobs which are ready based on their time with those in the
-- queued set. Use the weights parameter to set the scores entirely based on the
-- queued set, effectively sorting the jobs by priority. Store the results in the
-- temporary set.
redis.call('ZINTERSTORE', 'jobs:temp', 2, 'jobs:queued', 'jobs:temp', 'WEIGHTS', 1, 0)
-- Trim the temp set, so it contains only the first n jobs ordered by
-- priority
redis.call('ZREMRANGEBYRANK', 'jobs:temp', 0, -n - 1)
-- Get all job ids from the temp set
local jobIds = redis.call('ZREVRANGE', 'jobs:temp', 0, -1)
local allJobs = {}
if #jobIds > 0 then
-- Add job ids to the executing set
redis.call('ZUNIONSTORE', 'jobs:executing', 2, 'jobs:executing', 'jobs:temp')
-- Now we are ready to construct our response.
for i, jobId in ipairs(jobIds) do
local jobKey = 'jobs:' .. jobId
-- Remove the job from the queued set
redis.call('ZREM', 'jobs:queued', jobId)
-- Set the poolId field for the job
redis.call('HSET', jobKey, 'poolId', poolId)
-- Set the job status to executing
redis.call('HSET', jobKey, 'status', 'executing')
-- Get the fields from its main hash
local jobFields = redis.call('HGETALL', jobKey)
-- Add the id itself to the fields
jobFields[#jobFields+1] = 'id'
jobFields[#jobFields+1] = jobId
-- Add the field values to allJobs
allJobs[#allJobs+1] = jobFields
end
end
-- Delete the temporary set
redis.call('DEL', 'jobs:temp')
-- Return all the fields for all the jobs
return allJobs`)
purgeStalePoolScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- purge_stale_pool is a lua script which takes the following arguments:
-- 1) The id of the stale pool to purge
-- It then does the following:
-- 1) Removes the pool id from the set of active pools
-- 2) Iterates through each job in the executing set and finds any jobs which
-- have a poolId field equal to the id of the stale pool
-- 3) If it finds any such jobs, it removes them from the executing set and
-- adds them to the queued so that they will be retried
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
-- Assign args to variables for easy reference
local stalePoolId = ARGV[1]
-- Check if the stale pool is in the set of active pools first
local isActive = redis.call('SISMEMBER', 'pools:active', stalePoolId)
if isActive then
-- Remove the stale pool from the set of active pools
redis.call('SREM', 'pools:active', stalePoolId)
-- Get all the jobs in the executing set
local jobIds = redis.call('ZRANGE', 'jobs:executing', 0, -1)
for i, jobId in ipairs(jobIds) do
local jobKey = 'jobs:' .. jobId
-- Check the poolId field
-- If the poolId is equal to the stale id, then this job is stuck
-- in the executing set even though no worker is actually executing it
local poolId = redis.call('HGET', jobKey, 'poolId')
if poolId == stalePoolId then
local jobPriority = redis.call('HGET', jobKey, 'priority')
-- Move the job into the queued set
redis.call('ZADD', 'jobs:queued', jobPriority, jobId)
-- Remove the job from the executing set
redis.call('ZREM', 'jobs:executing', jobId)
-- Set the job status to queued and the pool id to blank
redis.call('HMSET', jobKey, 'status', 'queued', 'poolId', '')
end
end
end
`)
retryOrFailJobScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- retry_or_fail_job represents a lua script that takes the following arguments:
-- 1) The id of the job to either retry or fail
-- It first checks if the job has any retries remaining. If it does,
-- then it:
-- 1) Decrements the number of retries for the given job
-- 2) Adds the job to the queued set
-- 3) Removes the job from the executing set
-- 4) Returns true
-- If the job has no retries remaining then it:
-- 1) Adds the job to the failed set
-- 3) Removes the job from the executing set
-- 2) Returns false
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
-- Assign args to variables for easy reference
local jobId = ARGV[1]
local jobKey = 'jobs:' .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
return 0
end
-- Check how many retries remain
local retries = redis.call('HGET', jobKey, 'retries')
local newStatus = ''
if retries == '0' then
-- newStatus should be failed because there are no retries left
newStatus = 'failed'
else
-- subtract 1 from the remaining retries
redis.call('HINCRBY', jobKey, 'retries', -1)
-- newStatus should be queued, so the job will be retried
newStatus = 'queued'
end
-- Get the job priority (used as score)
local jobPriority = redis.call('HGET', jobKey, 'priority')
-- Add the job to the appropriate new set
local newStatusSet = 'jobs:' .. newStatus
redis.call('ZADD', newStatusSet, jobPriority, jobId)
-- Remove the job from the old status set
local oldStatus = redis.call('HGET', jobKey, 'status')
if ((oldStatus ~= '') and (oldStatus ~= newStatus)) then
local oldStatusSet = 'jobs:' .. oldStatus
redis.call('ZREM', oldStatusSet, jobId)
end
-- Set the job status in the hash
redis.call('HSET', jobKey, 'status', newStatus)
if retries == '0' then
-- Return false to indicate the job has not been queued for retry
-- NOTE: 0 is used to represent false because apparently
-- false gets converted to nil
return 0
else
-- Return true to indicate the job has been queued for retry
-- NOTE: 1 is used to represent true (for consistency)
return 1
end`)
setJobFieldScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- set_job_field represents a lua script that takes the following arguments:
-- 1) The id of the job
-- 2) The name of the field
-- 3) The value to set the field to
-- It first checks if the job exists in the database (has not been destroyed)
-- and then sets the given field to the given value.
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
local jobId = ARGV[1]
local fieldName = ARGV[2]
local fieldVal = ARGV[3]
local jobKey = 'jobs:' .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
return
end
redis.call('HSET', jobKey, fieldName, fieldVal)`)
setJobStatusScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
-- Use of this source code is governed by the MIT
-- license, which can be found in the LICENSE file.
-- set_job_status is a lua script that takes the following arguments:
-- 1) The id of the job
-- 2) The new status (e.g. "queued")
-- It then does the following:
-- 1) Adds the job to the new status set
-- 2) Removes the job from the old status set (which it gets with an HGET call)
-- 3) Sets the 'status' field in the main hash for the job
-- IMPORTANT: If you edit this file, you must run go generate . to rewrite ../scripts.go
-- Assign args to variables for easy reference
local jobId = ARGV[1]
local newStatus = ARGV[2]
local jobKey = 'jobs:' .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
return
end
local newStatusSet = 'jobs:' .. newStatus
-- Add the job to the new status set
local jobPriority = redis.call('HGET', jobKey, 'priority')
redis.call('ZADD', newStatusSet, jobPriority, jobId)
-- Remove the job from the old status set
local oldStatus = redis.call('HGET', jobKey, 'status')
if ((oldStatus ~= '') and (oldStatus ~= newStatus)) then
local oldStatusSet = 'jobs:' .. oldStatus
redis.call('ZREM', oldStatusSet, jobId)
end
-- Set the status field
redis.call('HSET', jobKey, 'status', newStatus)`)
)