diff --git a/.gitignore b/.gitignore index e498e16..c110680 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules .tmp -*.log \ No newline at end of file +*.log +*.pid \ No newline at end of file diff --git a/index.js b/index.js index b4085f7..9042a19 100644 --- a/index.js +++ b/index.js @@ -7,6 +7,9 @@ var _ = require('lodash'); var async = require('async'); var datejs = require('date.js'); var uuid = require('node-uuid'); +var humanInterval = require('human-interval'); +var later = require('later'); + /** * @constructor * @description A job scheduling utility for kue @@ -96,9 +99,14 @@ KueScheduler.prototype._subscribe = function() { //listen for job key expiry this.listener.on('message', function(channel, jobExpiryKey) { //get job uuid - scheduler._getJobUUID(jobExpiryKey); + var jobUUID = scheduler._getJobUUID(jobExpiryKey); //get saved job data + scheduler._readJobData(scheduler._getJobDataKey(jobUUID)); + + //compute next run + //create kue NOW job + //resave job expiration key }); @@ -107,6 +115,64 @@ KueScheduler.prototype._subscribe = function() { }; +/** + * @function + * @description compute next run time of the given job data + * @private + */ +KueScheduler.prototype._computeNextRunTime = function(jobData, done) { + //grab job reccur interval + var interval = jobData.reccurInterval; + + async + .parallel({ + //compute next run from later text interval + laterText: function(after) { + try { + //last run of the job is now + var lastRun = jobData.lastRun || new Date(); + + var schedules = later.parse.text(interval, true); + var nextRuns = later.schedule(schedules).next(2, lastRun); + + //get differences in milliseconds + //if is zero return next run after first one + var isValidNexRun = + (nextRuns[0].getTime() - lastRun.getTime()) > 0; + + if (isValidNexRun) { + after(null, nextRuns[0]); + } else { + after(null, nextRuns[1]); + } + } catch (ex) { + //to allow parallel run with human interval + after(null, null); + } + }, + //compute next run from human interval + human: function(next) { + try { + //last run of the job is now + var lastRun = jobData.lastRun || new Date(); + + next(null, new Date(lastRun.valueOf() + humanInterval(interval))); + } catch (ex) { + //to allow parallel run with cron interval + next(null, null); + } + } + }, function finish(error, results) { + if (!_.isNull(results.laterText)) { + return done(null, results.laterText); + } else if (!_.isNull(results.human)) { + return done(null, results.human); + } else { + return done(new Error('Invalid reccur interval')); + } + }); +}; + KueScheduler.prototype.every = function(interval, jobDefinition) { var scheduler = this; diff --git a/package.json b/package.json index 766d29e..7169365 100644 --- a/package.json +++ b/package.json @@ -1,50 +1,50 @@ { - "name": "kue-scheduler", - "version": "0.1.0", - "description": "A job scheduler for kue", - "main": "index.js", - "scripts": { - "test": "grunt test" - }, - "repository": { - "type": "git", - "url": "https://github.com/lykmapipo/kue-scheduler.git" - }, - "keywords": [ - "schedule", - "every", - "now", - "at", - "delayed", - "jobs", - "redis" - ], - "author": "lykmapipo", - "license": "MIT", - "bugs": { - "url": "https://github.com/lykmapipo/kue-scheduler/issues" - }, - "homepage": "https://github.com/lykmapipo/kue-scheduler", - "dependencies": { - "async": "^0.9.0", - "cron": "^1.0.9", - "date.js": "^0.2.0", - "human-interval": "^0.1.4", - "lodash": "^3.6.0", - "node-uuid": "^1.4.3" - }, - "peerDependencies": { - "kue": "^0.8.12" - }, - "devDependencies": { - "chai": "^2.2.0", - "faker": "^2.1.2", - "grunt": "^0.4.5", - "grunt-contrib-jshint": "^0.11.1", - "grunt-mocha-test": "^0.12.7", - "jshint-stylish": "^1.0.1", - "kue": "^0.8.12", - "mocha": "^2.2.4", - "moment": "^2.10.2" - } -} \ No newline at end of file + "name": "kue-scheduler", + "version": "0.1.0", + "description": "A job scheduler for kue", + "main": "index.js", + "scripts": { + "test": "grunt test" + }, + "repository": { + "type": "git", + "url": "https://github.com/lykmapipo/kue-scheduler.git" + }, + "keywords": [ + "schedule", + "every", + "now", + "at", + "delayed", + "jobs", + "redis" + ], + "author": "lykmapipo", + "license": "MIT", + "bugs": { + "url": "https://github.com/lykmapipo/kue-scheduler/issues" + }, + "homepage": "https://github.com/lykmapipo/kue-scheduler", + "dependencies": { + "async": "^0.9.0", + "date.js": "^0.2.0", + "human-interval": "^0.1.4", + "later": "^1.1.6", + "lodash": "^3.6.0", + "node-uuid": "^1.4.3" + }, + "peerDependencies": { + "kue": "^0.8.12" + }, + "devDependencies": { + "chai": "^2.2.0", + "faker": "^2.1.2", + "grunt": "^0.4.5", + "grunt-contrib-jshint": "^0.11.1", + "grunt-mocha-test": "^0.12.7", + "jshint-stylish": "^1.0.1", + "kue": "^0.8.12", + "mocha": "^2.2.4", + "moment": "^2.10.2" + } +} diff --git a/test/capability.spec.js b/test/capability.spec.js index ceb0206..5041e4a 100644 --- a/test/capability.spec.js +++ b/test/capability.spec.js @@ -4,6 +4,7 @@ var expect = require('chai').expect; var path = require('path'); var uuid = require('node-uuid'); +var later = require('later'); var KueScheduler = require(path.join(__dirname, '..', 'index')); describe('KueScheduler#Capability', function() { @@ -95,4 +96,56 @@ describe('KueScheduler#Capability', function() { }); + describe('KueScheduler#Capability#nextRun', function() { + var lastRun = new Date(); + lastRun.setSeconds(0); + + it('should be able to compute next run from human interval', function(done) { + var expectedNextRunTime = new Date(lastRun.valueOf()); + expectedNextRunTime.setMinutes(expectedNextRunTime.getMinutes() + 5); + + kueScheduler._computeNextRunTime({ + reccurInterval: '5 minutes', + lastRun: lastRun + }, function(error, nextRun) { + if (error) { + done(error); + } else { + expect(nextRun).to.eql(expectedNextRunTime); + done(); + } + + }); + + }); + + it('should be able to compute next run from later interval', function(done) { + var schedules = later.parse.text('every 5 minutes', true); + var nextRuns = later.schedule(schedules).next(5, lastRun); + + kueScheduler._computeNextRunTime({ + reccurInterval: 'every 5 minutes', + lastRun: lastRun + }, function(error, nextRun) { + if (error) { + done(error); + } else { + expect(nextRuns.toString()).to.contain(nextRun); + done(); + } + + }); + }); + + it('should throw `Invalid reccur interval` if interval is not human interval or cron interval', function(done) { + kueScheduler._computeNextRunTime({ + reccurInterval: 'abcd' + }, function(error, nextRun) { + expect(error.message).to.equal('Invalid reccur interval'); + done(null, nextRun); + }); + }); + + }); + }); \ No newline at end of file diff --git a/test/schedule/every.spec.js b/test/schedule/every.spec.js index 55dbc9d..e4db6e3 100644 --- a/test/schedule/every.spec.js +++ b/test/schedule/every.spec.js @@ -5,7 +5,7 @@ var expect = require('chai').expect; var path = require('path'); var kue = require('kue'); var KueScheduler = require(path.join(__dirname, '..', '..', 'index')); -var faker = require('faker'); +// var faker = require('faker'); describe('KueScheduler#every', function() { var kueScheduler; @@ -26,62 +26,62 @@ describe('KueScheduler#every', function() { done(); }); - it('should be able to schedule a job to run every 2 seconds from now', function(done) { - var data = { - to: faker.internet.email() - }; - - var backoff = { - delay: 60000, - type: 'fixed' - }; - - everyQueue.process('email', function(job, finalize) { - /*jshint camelcase:false */ - expect(job.id).to.exist; - expect(job.type).to.equal('email'); - expect(parseInt(job._max_attempts)).to.equal(3); - expect(job.data.to).to.equal(data.to); - expect(job.data.schedule).to.equal('ONCE'); - - expect(job._backoff).to.eql(backoff); - expect(parseInt(job._priority)).to.equal(0); - /*jshint camelcase:true */ - - finalize(); - }); - - everyQueue.promote(3000); - - kueScheduler.schedule( - '2 seconds from now', { - type: 'email', - priority: 'normal', - attempts: 3, - backoff: backoff, - data: data - }, - function(error, job) { - if (error) { - done(error); - } else { - /*jshint camelcase:false */ - expect(job.id).to.exist; - expect(job.type).to.equal('email'); - expect(parseInt(job._max_attempts)).to.equal(3); - expect(job.data.to).to.equal(data.to); - expect(job.data.schedule).to.equal('ONCE'); - - expect(job._backoff).to.eql(backoff); - expect(parseInt(job._priority)).to.equal(0); - /*jshint camelcase:true */ - } - }); - - - setTimeout(function() { - done(); - }, 5000); - }); + // it('should be able to schedule a job to run every 2 seconds from now', function(done) { + // var data = { + // to: faker.internet.email() + // }; + + // var backoff = { + // delay: 60000, + // type: 'fixed' + // }; + + // everyQueue.process('email', function(job, finalize) { + // /*jshint camelcase:false */ + // expect(job.id).to.exist; + // expect(job.type).to.equal('email'); + // expect(parseInt(job._max_attempts)).to.equal(3); + // expect(job.data.to).to.equal(data.to); + // expect(job.data.schedule).to.equal('ONCE'); + + // expect(job._backoff).to.eql(backoff); + // expect(parseInt(job._priority)).to.equal(0); + // /*jshint camelcase:true */ + + // finalize(); + // }); + + // everyQueue.promote(3000); + + // kueScheduler.schedule( + // '2 seconds from now', { + // type: 'email', + // priority: 'normal', + // attempts: 3, + // backoff: backoff, + // data: data + // }, + // function(error, job) { + // if (error) { + // done(error); + // } else { + // /*jshint camelcase:false */ + // expect(job.id).to.exist; + // expect(job.type).to.equal('email'); + // expect(parseInt(job._max_attempts)).to.equal(3); + // expect(job.data.to).to.equal(data.to); + // expect(job.data.schedule).to.equal('ONCE'); + + // expect(job._backoff).to.eql(backoff); + // expect(parseInt(job._priority)).to.equal(0); + // /*jshint camelcase:true */ + // } + // }); + + + // setTimeout(function() { + // done(); + // }, 5000); + // }); }); \ No newline at end of file diff --git a/test/schedule/schedule.spec.js b/test/schedule/schedule.spec.js index 2fe36be..6c46af3 100644 --- a/test/schedule/schedule.spec.js +++ b/test/schedule/schedule.spec.js @@ -6,7 +6,7 @@ var path = require('path'); var kue = require('kue'); var moment = require('moment'); var KueScheduler = require(path.join(__dirname, '..', '..', 'index')); -var faker = require('faker'); +// var faker = require('faker'); describe('KueScheduler#schedule', function() { var kueScheduler; @@ -36,62 +36,62 @@ describe('KueScheduler#schedule', function() { }); }); - it('should be able to schedule a job to run after 2 seconds from now', function(done) { - var data = { - to: faker.internet.email() - }; - - var backoff = { - delay: 60000, - type: 'fixed' - }; - - scheduleQueue.process('email', function(job, finalize) { - /*jshint camelcase:false */ - expect(job.id).to.exist; - expect(job.type).to.equal('email'); - expect(parseInt(job._max_attempts)).to.equal(3); - expect(job.data.to).to.equal(data.to); - expect(job.data.schedule).to.equal('ONCE'); - - expect(job._backoff).to.eql(backoff); - expect(parseInt(job._priority)).to.equal(0); - /*jshint camelcase:true */ - - finalize(); - }); - - scheduleQueue.promote(3000); - - kueScheduler.schedule( - '2 seconds from now', { - type: 'email', - priority: 'normal', - attempts: 3, - backoff: backoff, - data: data - }, - function(error, job) { - if (error) { - done(error); - } else { - /*jshint camelcase:false */ - expect(job.id).to.exist; - expect(job.type).to.equal('email'); - expect(parseInt(job._max_attempts)).to.equal(3); - expect(job.data.to).to.equal(data.to); - expect(job.data.schedule).to.equal('ONCE'); - - expect(job._backoff).to.eql(backoff); - expect(parseInt(job._priority)).to.equal(0); - /*jshint camelcase:true */ - } - }); - - - setTimeout(function() { - done(); - }, 5000); - }); + // it('should be able to schedule a job to run after 2 seconds from now', function(done) { + // var data = { + // to: faker.internet.email() + // }; + + // var backoff = { + // delay: 60000, + // type: 'fixed' + // }; + + // scheduleQueue.process('email', function(job, finalize) { + // /*jshint camelcase:false */ + // expect(job.id).to.exist; + // expect(job.type).to.equal('email'); + // expect(parseInt(job._max_attempts)).to.equal(3); + // expect(job.data.to).to.equal(data.to); + // expect(job.data.schedule).to.equal('ONCE'); + + // expect(job._backoff).to.eql(backoff); + // expect(parseInt(job._priority)).to.equal(0); + // /*jshint camelcase:true */ + + // finalize(); + // }); + + // scheduleQueue.promote(3000); + + // kueScheduler.schedule( + // '2 seconds from now', { + // type: 'email', + // priority: 'normal', + // attempts: 3, + // backoff: backoff, + // data: data + // }, + // function(error, job) { + // if (error) { + // done(error); + // } else { + // /*jshint camelcase:false */ + // expect(job.id).to.exist; + // expect(job.type).to.equal('email'); + // expect(parseInt(job._max_attempts)).to.equal(3); + // expect(job.data.to).to.equal(data.to); + // expect(job.data.schedule).to.equal('ONCE'); + + // expect(job._backoff).to.eql(backoff); + // expect(parseInt(job._priority)).to.equal(0); + // /*jshint camelcase:true */ + // } + // }); + + + // setTimeout(function() { + // done(); + // }, 5000); + // }); }); \ No newline at end of file