Delayed Tasks in nodejs. A very opinionated but compatible API with resque and resque scheduler
I learn best by examples:
/////////////////////////
// REQUIRE THE PACKAGE //
/////////////////////////
var NR = require("node-resque");
///////////////////////////
// SET UP THE CONNECTION //
///////////////////////////
var connectionDetails = {
host: "127.0.0.1",
password: "",
port: 6379,
database: 0,
}
//////////////////////////////
// DEFINE YOUR WORKER TASKS //
//////////////////////////////
var jobs = {
"add": {
perform: function(a,b,callback){
var answer = a + b;
callback(null, answer);
},
},
"subtract": {
perform: function(a,b,callback){
var answer = a - b;
callback(null, answer);
},
},
};
////////////////////
// START A WORKER //
////////////////////
var worker = new NR.worker({connection: connectionDetails, queues: ['math']}, jobs, function(){
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers
worker.start();
});
///////////////////////
// START A SCHEDULER //
///////////////////////
var scheduler = new NR.scheduler({connection: connectionDetails}, function(){
scheduler.start();
});
/////////////////////////
// REGESTER FOR EVENTS //
/////////////////////////
worker.on('start', function(){ console.log("worker started"); })
worker.on('end', function(){ console.log("worker ended"); })
worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); })
worker.on('poll', function(queue){ console.log("worker polling " + queue); })
worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); })
worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })
worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); })
worker.on('error', function(queue, job, error){ console.log("job failed " + queue + " " + JSON.stringify(job) + " >> " + error); })
worker.on('pause', function(){ console.log("worker paused"); })
scheduler.on('start', function(){ console.log("scheduler started"); })
scheduler.on('end', function(){ console.log("scheduler ended"); })
scheduler.on('error', function(error){ console.log("scheduler error >> " + error); })
scheduler.on('poll', function(){ console.log("scheduler polling"); })
scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); })
scheduler.on('transferred_job', function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); })
////////////////////////
// CONNECT TO A QUEUE //
////////////////////////
var queue = new NR.queue({connection: connectionDetails}, jobs, function(){
queue.enqueue('math', "add", [1,2]);
queue.enqueue('math', "add", [2,3]);
queue.enqueueIn(3000, 'math', "subtract", [2,1]);
});
new queue
requires only the "queue" variable to be set. You can also pass the jobs
hash to it.
new worker
has some additonal options:
options = {
looping: true,
timeout: 5000,
queues: "*",
name: os.hostname() + ":" + process.pid
}
The configuration hash passed to new worker
, new scheduler
or new queue
can also take a connection
option.
var connectionDetails = {
host: "127.0.0.1",
password: "",
port: 6379,
database: 0,
namespace: "resque",
}
var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, function(){
worker.start();
});
You can also pass redis client directly.
// assume you already initialize redis client before
var connectionDetails = { redis: redisClient }
var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, function(){
worker.start();
});
- Be sure to call
worker.end()
before shutting down your application if you want to properly clear your worker status from resque - When ending your application, be sure to allow your workers time to finsih what they are working on
worker.workerCleanup()
only works for *nix operating systems (osx, unix, solaris, etc)- If you are using any plugins which effect
beforeEnqueue
orafterEnqueue
, be sure to pass thejobs
argument to thenew Queue
constructor - If you plan to run more than one worker per nodejs process, be sure to name them something distinct. Names must follow the patern
hostname:pid+unique_id
. For example:
var name = os.hostname() + ":" + process.pid() + "+" + counter;
var worker = new NR.worker({connection: connectionDetails, queues: 'math', 'name' : name}, jobs);
Additonal methods provided on the queue
object:
- queue.prototype.queues = function(callback)
- callback(error, array_of_queues)
- queue.prototype.length = function(q, callback)
- callback(error, number_of_elements_in_queue)
- queue.prototype.del = function(q, func, args, count, callback)
- callback(error, number_of_items_deleted)
- queue.prototype.delDelayed = function(q, func, args, callback)
- callback(error, timestamps_the_job_was_removed_from)
- queue.prototype.scheduledAt = function(q, func, args, callback)
- callback(error, timestamps_the_job_is_scheduled_for)
Just like ruby's resque, you can write worker plugins. They look look like this. The 4 hooks you have are before_enqueue
, after_enqueue
, before_perform
, and after_perform
var myPlugin = function(worker, func, queue, job, args, options){
var self = this;
self.name = 'myPlugin';
self.worker = worker;
self.queue = queue;
self.func = func;
self.job = job;
self.args = args;
self.options = options;
}
////////////////////
// PLUGIN METHODS //
////////////////////
myPlugin.prototype.before_enqueue = function(callback){
// console.log("** before_enqueue")
callback(null, true);
}
myPlugin.prototype.after_enqueue = function(callback){
// console.log("** after_enqueue")
callback(null, true);
}
myPlugin.prototype.before_perform = function(callback){
// console.log("** before_perform")
callback(null, true);
}
myPlugin.prototype.after_perform = function(callback){
// console.log("** after_perform")
callback(null, true);
}
And then your plugin can be invoked within a job like this:
var jobs = {
"add": {
plugins: [ 'myPlugin' ],
pluginOptions: {
myPlugin: { thing: 'stuff' },
},
perform: function(a,b,callback){
var answer = a + b;
callback(null, answer);
},
},
}
notes
-
All plugins which return
(error, toRun)
. iftoRun = false
onbeforeEnqueue
, the job beign inqueued will be thrown away, and iftoRun = false
onbeforePerfporm
, the job will be reEnqued and not run at this time. However, it doesn't really matter whattoRun
returns on theafter
hooks. -
There are a few included plugins, all in the lib/plugins/* directory. You can rewrite you own and include it like this:
var jobs = {
"add": {
plugins: [ require('myplugin') ],
pluginOptions: {
myPlugin: { thing: 'stuff' },
},
perform: function(a,b,callback){
var answer = a + b;
callback(null, answer);
},
},
}
Most of this code was inspired by / stolen from coffee-resque and coffee-resque-scheduler. Thanks!