diff --git a/.jshintrc b/.jshintrc index de6d547..c1f47b8 100644 --- a/.jshintrc +++ b/.jshintrc @@ -10,11 +10,11 @@ "newcap": true, "noarg": true, "node": true, - "mocha":true, + "mocha": true, "quotmark": "single", "strict": true, "undef": true, "unused": true, "expr": true, - "ignore":true + "ignore": true } \ No newline at end of file diff --git a/index.js b/index.js index 6420987..257e2bb 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,14 @@ 'use strict'; +/** + * @module + * @description A job scheduling utility for kue + * @public + */ + //dependencies var kue = require('kue'); +var Queue = kue; var redis = kue.redis; var _ = require('lodash'); var async = require('async'); @@ -11,139 +18,133 @@ var humanInterval = require('human-interval'); var CronTime = require('cron').CronTime; var noop = function() {}; -/** - * @constructor - * @description A job scheduling utility for kue - * @param {Object} options configuration options, similar to kue configuration - * options - * @public - */ -function KueScheduler(options) { - //extend default configurations - //with custom provided configurations - //and reference them for later use - this.options = _.merge({ - prefix: 'p', - redis: { - port: 6379, - host: '127.0.0.1' - } - }, options || {}); - - //start kue queue for scheduler - //which will also do all plumbing work - //on setup job redis client - this.queue = kue.createQueue(this.options); - - //a redis client for scheduling key expiry - this.scheduler = redis.createClientFactory(this.options); - - //a redis client to listen for key expiry - this.listener = redis.createClientFactory(this.options); - - //listen for job key expiry - //and schedule kue jobs to run - this._subscribe(); -} - /** * @function * @description generate an expiration key that is used to track job scheduling * @private */ -KueScheduler.prototype._getJobExpiryKey = function(uuid) { - return this.options.prefix + ':scheduler:' + uuid; +Queue.prototype._getJobExpiryKey = function(uuid) { + //this refer to kue Queue instance context + return this.client.prefix + ':scheduler:' + uuid; }; + /** * @function * @description generate job uuid from job expiry key * @private */ -KueScheduler.prototype._getJobUUID = function(jobExpiryKey) { +Queue.prototype._getJobUUID = function(jobExpiryKey) { + //this refer to kue Queue instance context return jobExpiryKey.split(':')[2]; }; + /** * @function * @description generate a storage key for the scheduled job data * @private */ -KueScheduler.prototype._getJobDataKey = function(uuid) { - return this.options.prefix + ':scheduler:data:' + uuid; +Queue.prototype._getJobDataKey = function(uuid) { + //this refer to kue Queue instance context + return this.client.prefix + ':scheduler:data:' + uuid; }; + /** * @function * @description save job data into redis backend * @private */ -KueScheduler.prototype._saveJobData = function(jobDataKey, jobData, done) { - this.scheduler.set(jobDataKey, JSON.stringify(jobData), function(error, response) { - done(error, jobData, response); - }); +Queue.prototype._saveJobData = function(jobDataKey, jobData, done) { + //this refer to kue Queue instance context + + this + .scheduler + .set( + jobDataKey, + JSON.stringify(jobData), + function(error, response) { + done(error, jobData, response); + }); }; + /** * @function * @description retrieved saved job data from redis backend * @private */ -KueScheduler.prototype._readJobData = function(jobDataKey, done) { - this.scheduler.get(jobDataKey, function(error, data) { - done(error, JSON.parse(data)); - }); +Queue.prototype._readJobData = function(jobDataKey, done) { + //this refer to kue Queue instance context + + this + .scheduler + .get(jobDataKey, function(error, data) { + done(error, JSON.parse(data)); + }); }; -KueScheduler.prototype._subscribe = function() { +/** + * @function + * @description subscribe to key expiry events + * @private + */ +Queue.prototype._subscribe = function() { + //this refer to kue Queue instance context var self = this; //listen for job key expiry - this.listener.on('message', function(channel, jobExpiryKey) { - var jobDefinition; - - async - .waterfall( - [ - //get job data - function(next) { - //get job uuid - var jobUUID = self._getJobUUID(jobExpiryKey); - - //get saved job data - self._readJobData(self._getJobDataKey(jobUUID), next); - }, - //compute next run time - function(jobData, next) { - jobDefinition = jobData; - self._computeNextRunTime(jobData, next); - }, - //resave the key to rerun this job again - function(nextRunTime, next) { - var now = new Date(); - var delay = nextRunTime.getTime() - now.getTime(); - - self.scheduler.set(jobExpiryKey, '', 'PX', delay, next); - }, - //create kue NOW job - function(response, next) { - self.now(jobDefinition, next); - }, - //TODO use event emitter to emit any error - ], noop); - }); + this + .listener + .on('message', function(channel, jobExpiryKey) { + var jobDefinition; + + async + .waterfall( + [ + //get job data + function(next) { + //get job uuid + var jobUUID = self._getJobUUID(jobExpiryKey); + + //get saved job data + self._readJobData(self._getJobDataKey(jobUUID), next); + }, + //compute next run time + function(jobData, next) { + jobDefinition = jobData; + self._computeNextRunTime(jobData, next); + }, + //resave the key to rerun this job again + function(nextRunTime, next) { + var now = new Date(); + var delay = nextRunTime.getTime() - now.getTime(); + + self.scheduler.set(jobExpiryKey, '', 'PX', delay, next); + }, + //create kue NOW job + function(response, next) { + self.now(jobDefinition, next); + }, + //TODO use event emitter to emit any error + ], noop); + }); //subscribe to key expiration events this.listener.subscribe('__keyevent@0__:expired'); }; + /** * @function * @description compute next run time of the given job data * @private */ -KueScheduler.prototype._computeNextRunTime = function(jobData, done) { +Queue.prototype._computeNextRunTime = function(jobData, done) { + //this refer to kue Queue instance context + //grab job reccur interval var interval = jobData.reccurInterval; @@ -209,8 +210,20 @@ KueScheduler.prototype._computeNextRunTime = function(jobData, done) { }); }; - -KueScheduler.prototype.every = function(interval, jobDefinition) { +/** + * @function + * @description schedule a job to run every after a specified interval + * @param {String} interval scheduled interval in or human interval or + * cron format + * @param {Object} jobDefinition valid kue job instance properties in hash form + * @private + */ +Queue.prototype.every = function(interval, jobDefinition) { + //this refer to kue Queue instance context + // + if (arguments.length < 2) { + throw new Error('Invalid number of parameters. See API doc.'); + } var self = this; @@ -259,7 +272,9 @@ KueScheduler.prototype.every = function(interval, jobDefinition) { * @param {Function} done a callback to invoke on error or success * @public */ -KueScheduler.prototype.schedule = function(when, jobDefinition, done) { +Queue.prototype.schedule = function(when, jobDefinition, done) { + //this refer to kue Queue instance context + // if (arguments.length < 3) { done(new Error('Invalid number of parameters. See API doc.')); } @@ -305,6 +320,7 @@ KueScheduler.prototype.schedule = function(when, jobDefinition, done) { }); }; + /** * @function * @description schedule a job to be executed immediatelly after being saved @@ -312,7 +328,13 @@ KueScheduler.prototype.schedule = function(when, jobDefinition, done) { * @param {Function} done a callback to invoke lon success or error * @public */ -KueScheduler.prototype.now = function(jobDefinition, done) { +Queue.prototype.now = function(jobDefinition, done) { + //this refer to kue Queue instance context + // + if (arguments.length < 2) { + done(new Error('Invalid number of parameters. See API doc.')); + } + var self = this; async @@ -336,15 +358,18 @@ KueScheduler.prototype.now = function(jobDefinition, done) { }); }; + /** * @function - * @description build a kue job from a job definition + * @description build a kue job from a job definition hash * @param {Object} jobDefinition valid kue job attributes * @param {Function} done a callback to invoke on error or success * @private */ -KueScheduler.prototype._buildJob = function(jobDefinition, done) { +Queue.prototype._buildJob = function(jobDefinition, done) { + //this refer to kue Queue instance context var self = this; + async .parallel({ isDefined: function(next) { @@ -390,7 +415,7 @@ KueScheduler.prototype._buildJob = function(jobDefinition, done) { //instantiate kue job var job = - self.queue.createJob( + self.createJob( jobDefinition.type, jobDefinition.data ); @@ -411,6 +436,7 @@ KueScheduler.prototype._buildJob = function(jobDefinition, done) { }); }; + /** * @function * @description parse date.js valid string and return a date object @@ -419,7 +445,7 @@ KueScheduler.prototype._buildJob = function(jobDefinition, done) { * @param {Function} done a callback to invoke on error or success * @private */ -KueScheduler.prototype._parse = function(str, done) { +Queue.prototype._parse = function(str, done) { try { var date = datejs(str); return done(null, date); @@ -428,8 +454,37 @@ KueScheduler.prototype._parse = function(str, done) { } }; +//patch kue createQueue to allow options +//to be stored in the kue +//and setup of scheduler +var createQueue = kue.createQueue; +kue.createQueue = function(options) { + options = _.merge({ + prefix: 'q', + redis: { + port: 6379, + host: '127.0.0.1' + } + }, options || {}); + + var queue = createQueue.call(kue, options); + + //a redis client for scheduling key expiry + queue.scheduler = redis.createClientFactory(options); + + //a redis client to listen for key expiry + queue.listener = redis.createClientFactory(options); + + //listen for job key expiry + //and schedule kue jobs to run + queue._subscribe(); + + return queue; +}; + + /** - * @description export kue scheduler + * @description export kue with scheduler attached * @type {Function} */ -module.exports = KueScheduler; \ No newline at end of file +module.exports = kue; \ No newline at end of file diff --git a/package.json b/package.json index 766d29e..c60ae77 100644 --- a/package.json +++ b/package.json @@ -26,17 +26,18 @@ }, "homepage": "https://github.com/lykmapipo/kue-scheduler", "dependencies": { - "async": "^0.9.0", "cron": "^1.0.9", "date.js": "^0.2.0", "human-interval": "^0.1.4", - "lodash": "^3.6.0", "node-uuid": "^1.4.3" }, "peerDependencies": { - "kue": "^0.8.12" + "async": "^0.9.0", + "kue": "^0.8.12", + "lodash": "^3.6.0" }, "devDependencies": { + "async": "^0.9.0", "chai": "^2.2.0", "faker": "^2.1.2", "grunt": "^0.4.5", @@ -44,6 +45,7 @@ "grunt-mocha-test": "^0.12.7", "jshint-stylish": "^1.0.1", "kue": "^0.8.12", + "lodash": "^3.6.0", "mocha": "^2.2.4", "moment": "^2.10.2" } diff --git a/test/bootstrap.spec.js b/test/bootstrap.spec.js index 5da2f74..653db32 100644 --- a/test/bootstrap.spec.js +++ b/test/bootstrap.spec.js @@ -2,36 +2,43 @@ //dependencies var kue = require('kue'); -var Job = kue.Job; var async = require('async'); -var _ = require('lodash'); -//setup test environment +//redis client for database cleanups +var redis = kue.redis.createClientFactory({ + redis: {} +}); + +/** + * @description clean up a database + */ +function cleanup(callback) { + redis + .keys('q*', function(error, rows) { + if (error) { + callback(error); + } else { + async + .each( + rows, + function(row, next) { + redis.del(row, next); + }, + callback); + } + }); +} + + before(function(done) { - //initializing kue default queue - //for cleaning existing jobs - kue.createQueue(); + //clean any previous data + //if any + cleanup(done); +}); - //clean existing jobs - async - .waterfall( - [ - function findJobs(next) { - //TODO find a proper from..to for cleaning - //existing jobs - Job.range(0, 1000000000, 'desc', next); - }, - function prepareCleaningWork(jobs, next) { - next(null, _.map(jobs, function(job) { - return job.remove.bind(job); - })); - }, - function cleanJobs(cleaningWork, next) { - async.parallel(cleaningWork, next); - } - ], - function(error, results) { - done(error, results); - }); +after(function(done) { + //clean all data + //introduced with these specs + cleanup(done); }); \ No newline at end of file diff --git a/test/capability.spec.js b/test/capability.spec.js index 03f0ba8..f2aae28 100644 --- a/test/capability.spec.js +++ b/test/capability.spec.js @@ -4,44 +4,38 @@ var expect = require('chai').expect; var path = require('path'); var uuid = require('node-uuid'); -// var later = require('later'); -var KueScheduler = require(path.join(__dirname, '..', 'index')); +var kue = require(path.join(__dirname, '..', 'index')); +var Queue; +var options = { + prefix: 'q' +}; -describe('KueScheduler#Capability', function() { - var kueScheduler; - var options = { - prefix: 'p' - }; +describe('Queue Scheduling Capabilities', function() { before(function(done) { - kueScheduler = new KueScheduler(options); - done(); - }); - - it('should be a functional constructor', function(done) { - expect(KueScheduler).to.be.a('function'); + Queue = kue.createQueue(); done(); }); it('should be able to shedule job in later time', function(done) { - expect(kueScheduler).to.respondTo('schedule'); + expect(Queue).to.respondTo('schedule'); done(); }); it('should be able to execute jobs every after specific time interval', function(done) { - expect(kueScheduler).to.respondTo('every'); + expect(Queue).to.respondTo('every'); done(); }); it('should be able to execute a job now', function(done) { - expect(kueScheduler).to.respondTo('now'); + expect(Queue).to.respondTo('now'); done(); }); it('should be able to generate job expriration key', function(done) { var jobuuid = uuid.v1(); - expect(kueScheduler._getJobExpiryKey(jobuuid)) + expect(Queue._getJobExpiryKey(jobuuid)) .to.be.equal(options.prefix + ':scheduler:' + jobuuid); done(); @@ -50,7 +44,7 @@ describe('KueScheduler#Capability', function() { it('should be able to generate job data storage key', function(done) { var jobuuid = uuid.v1(); - expect(kueScheduler._getJobDataKey(jobuuid)) + expect(Queue._getJobDataKey(jobuuid)) .to.be.equal(options.prefix + ':scheduler:data:' + jobuuid); done(); @@ -58,22 +52,22 @@ describe('KueScheduler#Capability', function() { it('should be able to generate job uuid from job expriration key', function(done) { var jobuuid = uuid.v1(); - var jobEpiryKey = kueScheduler._getJobExpiryKey(jobuuid); + var jobEpiryKey = Queue._getJobExpiryKey(jobuuid); - expect(kueScheduler._getJobUUID(jobEpiryKey)) + expect(Queue._getJobUUID(jobEpiryKey)) .to.be.equal(jobuuid); done(); }); - describe('KueScheduler#Capability#CRUD', function() { + describe('Queue CRUD Capabilities', function() { var jobuuid; var jobDataKey; var jobData; before(function(done) { jobuuid = uuid.v1(); - jobDataKey = kueScheduler._getJobDataKey(jobuuid); + jobDataKey = Queue._getJobDataKey(jobuuid); jobData = { uuid: jobuuid }; @@ -82,7 +76,7 @@ describe('KueScheduler#Capability', function() { }); it('should be able to save job data', function(done) { - kueScheduler + Queue ._saveJobData(jobDataKey, jobData, function(error, _jobData) { expect(_jobData.uuid).to.equal(jobData.uuid); done(error, _jobData); @@ -90,7 +84,7 @@ describe('KueScheduler#Capability', function() { }); it('should be able to read job data', function(done) { - kueScheduler + Queue ._readJobData(jobDataKey, function(error, _jobData) { expect(_jobData.uuid).to.equal(jobData.uuid); done(error, _jobData); @@ -99,7 +93,7 @@ describe('KueScheduler#Capability', function() { }); - describe('KueScheduler#Capability#nextRun', function() { + describe('Queue `nextRun` Capabilities', function() { var lastRun = new Date(); lastRun.setSeconds(0); @@ -107,7 +101,7 @@ describe('KueScheduler#Capability', function() { var expectedNextRunTime = new Date(lastRun.valueOf()); expectedNextRunTime.setMinutes(expectedNextRunTime.getMinutes() + 5); - kueScheduler._computeNextRunTime({ + Queue._computeNextRunTime({ reccurInterval: '5 minutes', lastRun: lastRun }, function(error, nextRun) { @@ -126,7 +120,7 @@ describe('KueScheduler#Capability', function() { var lastRun = new Date(); lastRun.setSeconds(0); - kueScheduler._computeNextRunTime({ + Queue._computeNextRunTime({ reccurInterval: '* * * * * *', lastRun: lastRun }, function(error, nextRun) { @@ -141,7 +135,7 @@ describe('KueScheduler#Capability', function() { }); it('should throw `Invalid reccur interval` if interval is not human interval or cron interval', function(done) { - kueScheduler._computeNextRunTime({ + Queue._computeNextRunTime({ reccurInterval: 'abcd' }, function(error, nextRun) { expect(error.message).to.equal('Invalid reccur interval'); diff --git a/test/instatiation.spec.js b/test/instatiation.spec.js index 57ddc28..c818869 100644 --- a/test/instatiation.spec.js +++ b/test/instatiation.spec.js @@ -3,35 +3,24 @@ //dependencies var expect = require('chai').expect; var path = require('path'); -var KueScheduler = require(path.join(__dirname, '..', 'index')); +var kue = require(path.join(__dirname, '..', 'index')); +var Queue; -describe('KueScheduler#Instatiation', function() { - var kueScheduler; +describe('Queue Job Scheduler & Listener', function() { before(function(done) { - kueScheduler = new KueScheduler(); - done(); - }); - - it('should be able to set default redis options', function(done) { - expect(kueScheduler.options.redis.port).to.be.equal(6379); - expect(kueScheduler.options.redis.host).to.be.equal('127.0.0.1'); - done(); - }); - - it('should be able to instantiate internal kue queue', function(done) { - expect(kueScheduler.queue).to.exist; + Queue = kue.createQueue(); done(); }); it('should be able to instantiate scheduler redis client', function(done) { - expect(kueScheduler.scheduler).to.exist; + expect(Queue.scheduler).to.exist; done(); }); it('should be able to instantiate expiry key listener', function(done) { - expect(kueScheduler.listener).to.exist; + expect(Queue.listener).to.exist; done(); }); }); \ No newline at end of file diff --git a/test/job_builder.spec.js b/test/job_builder.spec.js index da42617..745fb9e 100644 --- a/test/job_builder.spec.js +++ b/test/job_builder.spec.js @@ -3,23 +3,24 @@ //dependencies var expect = require('chai').expect; var path = require('path'); -var KueScheduler = require(path.join(__dirname, '..', 'index')); var faker = require('faker'); +var kue = require(path.join(__dirname, '..', 'index')); +var Queue; + +describe('Queue JobBuilder', function() { -describe('KueScheduler#JobBuilder', function() { - var kueScheduler; before(function(done) { - kueScheduler = new KueScheduler(); + Queue = kue.createQueue(); done(); }); it('should be a function', function(done) { - expect(kueScheduler._buildJob).to.be.a('function'); + expect(Queue._buildJob).to.be.a('function'); done(); }); it('should throw `Invalid job definition` if no job definiton provided', function(done) { - kueScheduler + Queue ._buildJob('a', function(error, job) { expect(error.message).to.equal('Invalid job definition'); done(null, job); @@ -27,7 +28,7 @@ describe('KueScheduler#JobBuilder', function() { }); it('should throw `Missing job type or data` if no job type provided', function(done) { - kueScheduler + Queue ._buildJob({ data: { to: faker.internet.email() @@ -39,7 +40,7 @@ describe('KueScheduler#JobBuilder', function() { }); it('should throw `Missing job type or data` if no job data provided', function(done) { - kueScheduler + Queue ._buildJob({ type: 'mail' }, function(error, job) { @@ -58,7 +59,7 @@ describe('KueScheduler#JobBuilder', function() { type: 'fixed' }; - kueScheduler + Queue ._buildJob({ type: 'email', priority: 'normal', diff --git a/test/schedule/every.spec.js b/test/schedule/every.spec.js index c02e95f..20c952b 100644 --- a/test/schedule/every.spec.js +++ b/test/schedule/every.spec.js @@ -3,26 +3,23 @@ //dependencies var expect = require('chai').expect; var path = require('path'); -var kue = require('kue'); -var KueScheduler = require(path.join(__dirname, '..', '..', 'index')); +var kue = require(path.join(__dirname, '..', '..', 'index')); var faker = require('faker'); +var Queue; -describe('KueScheduler#every', function() { - var kueScheduler; - var everyQueue; +describe('Queue#every', function() { before(function(done) { - kueScheduler = new KueScheduler(); - everyQueue = kue.createQueue(); + Queue = kue.createQueue(); done(); }); after(function(done) { - everyQueue.shutdown(done); + Queue.shutdown(done); }); it('should be a function', function(done) { - expect(kueScheduler.every).to.be.a('function'); + expect(Queue.every).to.be.a('function'); done(); }); @@ -36,7 +33,7 @@ describe('KueScheduler#every', function() { type: 'fixed' }; - everyQueue.process('every', function(job, finalize) { + Queue.process('every', function(job, finalize) { /*jshint camelcase:false */ expect(job.id).to.exist; expect(job.type).to.equal('every'); @@ -51,7 +48,7 @@ describe('KueScheduler#every', function() { finalize(); }); - kueScheduler + Queue .every('4 seconds', { type: 'every', priority: 'normal', diff --git a/test/schedule/now.spec.js b/test/schedule/now.spec.js index dc97e2b..99e77cd 100644 --- a/test/schedule/now.spec.js +++ b/test/schedule/now.spec.js @@ -3,26 +3,23 @@ //dependencies var expect = require('chai').expect; var path = require('path'); -var kue = require('kue'); -var KueScheduler = require(path.join(__dirname, '..', '..', 'index')); +var kue = require(path.join(__dirname, '..', '..', 'index')); var faker = require('faker'); +var Queue; -describe('KueScheduler#now', function() { - var kueScheduler; - var nowQueue; +describe('Queue#now', function() { before(function(done) { - kueScheduler = new KueScheduler(); - nowQueue = kue.createQueue(); + Queue = kue.createQueue(); done(); }); after(function(done) { - nowQueue.shutdown(done); + Queue.shutdown(done); }); it('should be a function', function(done) { - expect(kueScheduler.now).to.be.a('function'); + expect(Queue.now).to.be.a('function'); done(); }); @@ -36,7 +33,7 @@ describe('KueScheduler#now', function() { type: 'fixed' }; - nowQueue.process('now', function(job, finalize) { + Queue.process('now', function(job, finalize) { /*jshint camelcase:false */ expect(job.id).to.exist; expect(job.type).to.equal('now'); @@ -54,7 +51,7 @@ describe('KueScheduler#now', function() { }); - kueScheduler.now({ + Queue.now({ type: 'now', priority: 'normal', attempts: 3, diff --git a/test/schedule/schedule.spec.js b/test/schedule/schedule.spec.js index f24abd8..dccb9a1 100644 --- a/test/schedule/schedule.spec.js +++ b/test/schedule/schedule.spec.js @@ -5,31 +5,29 @@ var expect = require('chai').expect; var path = require('path'); var kue = require('kue'); var moment = require('moment'); -var KueScheduler = require(path.join(__dirname, '..', '..', 'index')); +var kue = require(path.join(__dirname, '..', '..', 'index')); var faker = require('faker'); +var Queue; -describe('KueScheduler#schedule', function() { - var kueScheduler; - var scheduleQueue; +describe('Queue#schedule', function() { before(function(done) { - kueScheduler = new KueScheduler(); - scheduleQueue = kue.createQueue(); + Queue = kue.createQueue(); done(); }); after(function(done) { - scheduleQueue.shutdown(done); + Queue.shutdown(done); }); it('should be a function', function(done) { - expect(kueScheduler.schedule).to.be.a('function'); + expect(Queue.schedule).to.be.a('function'); done(); }); it('should be able to parse date.js valid string', function(done) { var tenMinutesFromNow = moment().add(10, 'minutes').toDate(); - kueScheduler + Queue ._parse('10 minutes from now', function(error, date) { expect(date.getMinutes()).to.eql(tenMinutesFromNow.getMinutes()); done(error, date); @@ -46,7 +44,7 @@ describe('KueScheduler#schedule', function() { type: 'fixed' }; - scheduleQueue.process('schedule', function(job, finalize) { + Queue.process('schedule', function(job, finalize) { /*jshint camelcase:false */ expect(job.id).to.exist; expect(job.type).to.equal('schedule'); @@ -61,9 +59,9 @@ describe('KueScheduler#schedule', function() { finalize(); }); - scheduleQueue.promote(3000); + Queue.promote(3000); - kueScheduler.schedule( + Queue.schedule( '2 seconds from now', { type: 'schedule', priority: 'normal',