Skip to content

Commit

Permalink
migrating scheduler to extend kue Queue internals
Browse files Browse the repository at this point in the history
  • Loading branch information
lykmapipo committed May 8, 2015
1 parent 5ca772e commit fad11cd
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 213 deletions.
4 changes: 2 additions & 2 deletions .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
"newcap": true,
"noarg": true,
"node": true,
"mocha":true,
"mocha": true,
"quotmark": "single",
"strict": true,
"undef": true,
"unused": true,
"expr": true,
"ignore":true
"ignore": true
}
241 changes: 148 additions & 93 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
'use strict';

/**
* @module
* @description A job scheduling utility for kue
* @public
*/

//dependencies
var kue = require('kue');
var Queue = kue;
var redis = kue.redis;
var _ = require('lodash');
var async = require('async');
Expand All @@ -11,139 +18,133 @@ var humanInterval = require('human-interval');
var CronTime = require('cron').CronTime;
var noop = function() {};

/**
* @constructor
* @description A job scheduling utility for kue
* @param {Object} options configuration options, similar to kue configuration
* options
* @public
*/
function KueScheduler(options) {
//extend default configurations
//with custom provided configurations
//and reference them for later use
this.options = _.merge({
prefix: 'p',
redis: {
port: 6379,
host: '127.0.0.1'
}
}, options || {});

//start kue queue for scheduler
//which will also do all plumbing work
//on setup job redis client
this.queue = kue.createQueue(this.options);

//a redis client for scheduling key expiry
this.scheduler = redis.createClientFactory(this.options);

//a redis client to listen for key expiry
this.listener = redis.createClientFactory(this.options);

//listen for job key expiry
//and schedule kue jobs to run
this._subscribe();
}

/**
* @function
* @description generate an expiration key that is used to track job scheduling
* @private
*/
KueScheduler.prototype._getJobExpiryKey = function(uuid) {
return this.options.prefix + ':scheduler:' + uuid;
Queue.prototype._getJobExpiryKey = function(uuid) {
//this refer to kue Queue instance context
return this.client.prefix + ':scheduler:' + uuid;
};


/**
* @function
* @description generate job uuid from job expiry key
* @private
*/
KueScheduler.prototype._getJobUUID = function(jobExpiryKey) {
Queue.prototype._getJobUUID = function(jobExpiryKey) {
//this refer to kue Queue instance context
return jobExpiryKey.split(':')[2];
};


/**
* @function
* @description generate a storage key for the scheduled job data
* @private
*/
KueScheduler.prototype._getJobDataKey = function(uuid) {
return this.options.prefix + ':scheduler:data:' + uuid;
Queue.prototype._getJobDataKey = function(uuid) {
//this refer to kue Queue instance context
return this.client.prefix + ':scheduler:data:' + uuid;
};


/**
* @function
* @description save job data into redis backend
* @private
*/
KueScheduler.prototype._saveJobData = function(jobDataKey, jobData, done) {
this.scheduler.set(jobDataKey, JSON.stringify(jobData), function(error, response) {
done(error, jobData, response);
});
Queue.prototype._saveJobData = function(jobDataKey, jobData, done) {
//this refer to kue Queue instance context

this
.scheduler
.set(
jobDataKey,
JSON.stringify(jobData),
function(error, response) {
done(error, jobData, response);
});
};


/**
* @function
* @description retrieved saved job data from redis backend
* @private
*/
KueScheduler.prototype._readJobData = function(jobDataKey, done) {
this.scheduler.get(jobDataKey, function(error, data) {
done(error, JSON.parse(data));
});
Queue.prototype._readJobData = function(jobDataKey, done) {
//this refer to kue Queue instance context

this
.scheduler
.get(jobDataKey, function(error, data) {
done(error, JSON.parse(data));
});
};

KueScheduler.prototype._subscribe = function() {
/**
* @function
* @description subscribe to key expiry events
* @private
*/
Queue.prototype._subscribe = function() {
//this refer to kue Queue instance context
var self = this;

//listen for job key expiry
this.listener.on('message', function(channel, jobExpiryKey) {
var jobDefinition;

async
.waterfall(
[
//get job data
function(next) {
//get job uuid
var jobUUID = self._getJobUUID(jobExpiryKey);

//get saved job data
self._readJobData(self._getJobDataKey(jobUUID), next);
},
//compute next run time
function(jobData, next) {
jobDefinition = jobData;
self._computeNextRunTime(jobData, next);
},
//resave the key to rerun this job again
function(nextRunTime, next) {
var now = new Date();
var delay = nextRunTime.getTime() - now.getTime();

self.scheduler.set(jobExpiryKey, '', 'PX', delay, next);
},
//create kue NOW job
function(response, next) {
self.now(jobDefinition, next);
},
//TODO use event emitter to emit any error
], noop);
});
this
.listener
.on('message', function(channel, jobExpiryKey) {
var jobDefinition;

async
.waterfall(
[
//get job data
function(next) {
//get job uuid
var jobUUID = self._getJobUUID(jobExpiryKey);

//get saved job data
self._readJobData(self._getJobDataKey(jobUUID), next);
},
//compute next run time
function(jobData, next) {
jobDefinition = jobData;
self._computeNextRunTime(jobData, next);
},
//resave the key to rerun this job again
function(nextRunTime, next) {
var now = new Date();
var delay = nextRunTime.getTime() - now.getTime();

self.scheduler.set(jobExpiryKey, '', 'PX', delay, next);
},
//create kue NOW job
function(response, next) {
self.now(jobDefinition, next);
},
//TODO use event emitter to emit any error
], noop);
});

//subscribe to key expiration events
this.listener.subscribe('__keyevent@0__:expired');

};


/**
* @function
* @description compute next run time of the given job data
* @private
*/
KueScheduler.prototype._computeNextRunTime = function(jobData, done) {
Queue.prototype._computeNextRunTime = function(jobData, done) {
//this refer to kue Queue instance context

//grab job reccur interval
var interval = jobData.reccurInterval;

Expand Down Expand Up @@ -209,8 +210,20 @@ KueScheduler.prototype._computeNextRunTime = function(jobData, done) {
});
};


KueScheduler.prototype.every = function(interval, jobDefinition) {
/**
* @function
* @description schedule a job to run every after a specified interval
* @param {String} interval scheduled interval in or human interval or
* cron format
* @param {Object} jobDefinition valid kue job instance properties in hash form
* @private
*/
Queue.prototype.every = function(interval, jobDefinition) {
//this refer to kue Queue instance context
//
if (arguments.length < 2) {
throw new Error('Invalid number of parameters. See API doc.');
}

var self = this;

Expand Down Expand Up @@ -259,7 +272,9 @@ KueScheduler.prototype.every = function(interval, jobDefinition) {
* @param {Function} done a callback to invoke on error or success
* @public
*/
KueScheduler.prototype.schedule = function(when, jobDefinition, done) {
Queue.prototype.schedule = function(when, jobDefinition, done) {
//this refer to kue Queue instance context
//
if (arguments.length < 3) {
done(new Error('Invalid number of parameters. See API doc.'));
}
Expand Down Expand Up @@ -305,14 +320,21 @@ KueScheduler.prototype.schedule = function(when, jobDefinition, done) {
});
};


/**
* @function
* @description schedule a job to be executed immediatelly after being saved
* @param {Object} jobDefinition a valid kue job definition
* @param {Function} done a callback to invoke lon success or error
* @public
*/
KueScheduler.prototype.now = function(jobDefinition, done) {
Queue.prototype.now = function(jobDefinition, done) {
//this refer to kue Queue instance context
//
if (arguments.length < 2) {
done(new Error('Invalid number of parameters. See API doc.'));
}

var self = this;

async
Expand All @@ -336,15 +358,18 @@ KueScheduler.prototype.now = function(jobDefinition, done) {
});
};


/**
* @function
* @description build a kue job from a job definition
* @description build a kue job from a job definition hash
* @param {Object} jobDefinition valid kue job attributes
* @param {Function} done a callback to invoke on error or success
* @private
*/
KueScheduler.prototype._buildJob = function(jobDefinition, done) {
Queue.prototype._buildJob = function(jobDefinition, done) {
//this refer to kue Queue instance context
var self = this;

async
.parallel({
isDefined: function(next) {
Expand Down Expand Up @@ -390,7 +415,7 @@ KueScheduler.prototype._buildJob = function(jobDefinition, done) {

//instantiate kue job
var job =
self.queue.createJob(
self.createJob(
jobDefinition.type,
jobDefinition.data
);
Expand All @@ -411,6 +436,7 @@ KueScheduler.prototype._buildJob = function(jobDefinition, done) {
});
};


/**
* @function
* @description parse date.js valid string and return a date object
Expand All @@ -419,7 +445,7 @@ KueScheduler.prototype._buildJob = function(jobDefinition, done) {
* @param {Function} done a callback to invoke on error or success
* @private
*/
KueScheduler.prototype._parse = function(str, done) {
Queue.prototype._parse = function(str, done) {
try {
var date = datejs(str);
return done(null, date);
Expand All @@ -428,8 +454,37 @@ KueScheduler.prototype._parse = function(str, done) {
}
};

//patch kue createQueue to allow options
//to be stored in the kue
//and setup of scheduler
var createQueue = kue.createQueue;
kue.createQueue = function(options) {
options = _.merge({
prefix: 'q',
redis: {
port: 6379,
host: '127.0.0.1'
}
}, options || {});

var queue = createQueue.call(kue, options);

//a redis client for scheduling key expiry
queue.scheduler = redis.createClientFactory(options);

//a redis client to listen for key expiry
queue.listener = redis.createClientFactory(options);

//listen for job key expiry
//and schedule kue jobs to run
queue._subscribe();

return queue;
};


/**
* @description export kue scheduler
* @description export kue with scheduler attached
* @type {Function}
*/
module.exports = KueScheduler;
module.exports = kue;
Loading

0 comments on commit fad11cd

Please sign in to comment.