Skip to content

Commit

Permalink
finalize next run time computation
Browse files Browse the repository at this point in the history
  • Loading branch information
lykmapipo committed Apr 15, 2015
1 parent f6954ec commit 2d89459
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 167 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
.tmp
*.log
*.log
*.pid
68 changes: 67 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ var _ = require('lodash');
var async = require('async');
var datejs = require('date.js');
var uuid = require('node-uuid');
var humanInterval = require('human-interval');
var later = require('later');

/**
* @constructor
* @description A job scheduling utility for kue
Expand Down Expand Up @@ -96,9 +99,14 @@ KueScheduler.prototype._subscribe = function() {
//listen for job key expiry
this.listener.on('message', function(channel, jobExpiryKey) {
//get job uuid
scheduler._getJobUUID(jobExpiryKey);
var jobUUID = scheduler._getJobUUID(jobExpiryKey);

//get saved job data
scheduler._readJobData(scheduler._getJobDataKey(jobUUID));

//compute next run
//create kue NOW job
//resave job expiration key

});

Expand All @@ -107,6 +115,64 @@ KueScheduler.prototype._subscribe = function() {

};

/**
* @function
* @description compute next run time of the given job data
* @private
*/
KueScheduler.prototype._computeNextRunTime = function(jobData, done) {
//grab job reccur interval
var interval = jobData.reccurInterval;

async
.parallel({
//compute next run from later text interval
laterText: function(after) {
try {
//last run of the job is now
var lastRun = jobData.lastRun || new Date();

var schedules = later.parse.text(interval, true);
var nextRuns = later.schedule(schedules).next(2, lastRun);

//get differences in milliseconds
//if is zero return next run after first one
var isValidNexRun =
(nextRuns[0].getTime() - lastRun.getTime()) > 0;

if (isValidNexRun) {
after(null, nextRuns[0]);
} else {
after(null, nextRuns[1]);
}
} catch (ex) {
//to allow parallel run with human interval
after(null, null);
}
},
//compute next run from human interval
human: function(next) {
try {
//last run of the job is now
var lastRun = jobData.lastRun || new Date();

next(null, new Date(lastRun.valueOf() + humanInterval(interval)));
} catch (ex) {
//to allow parallel run with cron interval
next(null, null);
}
}
}, function finish(error, results) {
if (!_.isNull(results.laterText)) {
return done(null, results.laterText);
} else if (!_.isNull(results.human)) {
return done(null, results.human);
} else {
return done(new Error('Invalid reccur interval'));
}
});
};


KueScheduler.prototype.every = function(interval, jobDefinition) {
var scheduler = this;
Expand Down
98 changes: 49 additions & 49 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,50 +1,50 @@
{
"name": "kue-scheduler",
"version": "0.1.0",
"description": "A job scheduler for kue",
"main": "index.js",
"scripts": {
"test": "grunt test"
},
"repository": {
"type": "git",
"url": "https://github.com/lykmapipo/kue-scheduler.git"
},
"keywords": [
"schedule",
"every",
"now",
"at",
"delayed",
"jobs",
"redis"
],
"author": "lykmapipo",
"license": "MIT",
"bugs": {
"url": "https://github.com/lykmapipo/kue-scheduler/issues"
},
"homepage": "https://github.com/lykmapipo/kue-scheduler",
"dependencies": {
"async": "^0.9.0",
"cron": "^1.0.9",
"date.js": "^0.2.0",
"human-interval": "^0.1.4",
"lodash": "^3.6.0",
"node-uuid": "^1.4.3"
},
"peerDependencies": {
"kue": "^0.8.12"
},
"devDependencies": {
"chai": "^2.2.0",
"faker": "^2.1.2",
"grunt": "^0.4.5",
"grunt-contrib-jshint": "^0.11.1",
"grunt-mocha-test": "^0.12.7",
"jshint-stylish": "^1.0.1",
"kue": "^0.8.12",
"mocha": "^2.2.4",
"moment": "^2.10.2"
}
}
"name": "kue-scheduler",
"version": "0.1.0",
"description": "A job scheduler for kue",
"main": "index.js",
"scripts": {
"test": "grunt test"
},
"repository": {
"type": "git",
"url": "https://github.com/lykmapipo/kue-scheduler.git"
},
"keywords": [
"schedule",
"every",
"now",
"at",
"delayed",
"jobs",
"redis"
],
"author": "lykmapipo",
"license": "MIT",
"bugs": {
"url": "https://github.com/lykmapipo/kue-scheduler/issues"
},
"homepage": "https://github.com/lykmapipo/kue-scheduler",
"dependencies": {
"async": "^0.9.0",
"date.js": "^0.2.0",
"human-interval": "^0.1.4",
"later": "^1.1.6",
"lodash": "^3.6.0",
"node-uuid": "^1.4.3"
},
"peerDependencies": {
"kue": "^0.8.12"
},
"devDependencies": {
"chai": "^2.2.0",
"faker": "^2.1.2",
"grunt": "^0.4.5",
"grunt-contrib-jshint": "^0.11.1",
"grunt-mocha-test": "^0.12.7",
"jshint-stylish": "^1.0.1",
"kue": "^0.8.12",
"mocha": "^2.2.4",
"moment": "^2.10.2"
}
}
53 changes: 53 additions & 0 deletions test/capability.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
var expect = require('chai').expect;
var path = require('path');
var uuid = require('node-uuid');
var later = require('later');
var KueScheduler = require(path.join(__dirname, '..', 'index'));

describe('KueScheduler#Capability', function() {
Expand Down Expand Up @@ -95,4 +96,56 @@ describe('KueScheduler#Capability', function() {

});

describe('KueScheduler#Capability#nextRun', function() {
var lastRun = new Date();
lastRun.setSeconds(0);

it('should be able to compute next run from human interval', function(done) {
var expectedNextRunTime = new Date(lastRun.valueOf());
expectedNextRunTime.setMinutes(expectedNextRunTime.getMinutes() + 5);

kueScheduler._computeNextRunTime({
reccurInterval: '5 minutes',
lastRun: lastRun
}, function(error, nextRun) {
if (error) {
done(error);
} else {
expect(nextRun).to.eql(expectedNextRunTime);
done();
}

});

});

it('should be able to compute next run from later interval', function(done) {
var schedules = later.parse.text('every 5 minutes', true);
var nextRuns = later.schedule(schedules).next(5, lastRun);

kueScheduler._computeNextRunTime({
reccurInterval: 'every 5 minutes',
lastRun: lastRun
}, function(error, nextRun) {
if (error) {
done(error);
} else {
expect(nextRuns.toString()).to.contain(nextRun);
done();
}

});
});

it('should throw `Invalid reccur interval` if interval is not human interval or cron interval', function(done) {
kueScheduler._computeNextRunTime({
reccurInterval: 'abcd'
}, function(error, nextRun) {
expect(error.message).to.equal('Invalid reccur interval');
done(null, nextRun);
});
});

});

});
116 changes: 58 additions & 58 deletions test/schedule/every.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var expect = require('chai').expect;
var path = require('path');
var kue = require('kue');
var KueScheduler = require(path.join(__dirname, '..', '..', 'index'));
var faker = require('faker');
// var faker = require('faker');

describe('KueScheduler#every', function() {
var kueScheduler;
Expand All @@ -26,62 +26,62 @@ describe('KueScheduler#every', function() {
done();
});

it('should be able to schedule a job to run every 2 seconds from now', function(done) {
var data = {
to: faker.internet.email()
};

var backoff = {
delay: 60000,
type: 'fixed'
};

everyQueue.process('email', function(job, finalize) {
/*jshint camelcase:false */
expect(job.id).to.exist;
expect(job.type).to.equal('email');
expect(parseInt(job._max_attempts)).to.equal(3);
expect(job.data.to).to.equal(data.to);
expect(job.data.schedule).to.equal('ONCE');

expect(job._backoff).to.eql(backoff);
expect(parseInt(job._priority)).to.equal(0);
/*jshint camelcase:true */

finalize();
});

everyQueue.promote(3000);

kueScheduler.schedule(
'2 seconds from now', {
type: 'email',
priority: 'normal',
attempts: 3,
backoff: backoff,
data: data
},
function(error, job) {
if (error) {
done(error);
} else {
/*jshint camelcase:false */
expect(job.id).to.exist;
expect(job.type).to.equal('email');
expect(parseInt(job._max_attempts)).to.equal(3);
expect(job.data.to).to.equal(data.to);
expect(job.data.schedule).to.equal('ONCE');

expect(job._backoff).to.eql(backoff);
expect(parseInt(job._priority)).to.equal(0);
/*jshint camelcase:true */
}
});


setTimeout(function() {
done();
}, 5000);
});
// it('should be able to schedule a job to run every 2 seconds from now', function(done) {
// var data = {
// to: faker.internet.email()
// };

// var backoff = {
// delay: 60000,
// type: 'fixed'
// };

// everyQueue.process('email', function(job, finalize) {
// /*jshint camelcase:false */
// expect(job.id).to.exist;
// expect(job.type).to.equal('email');
// expect(parseInt(job._max_attempts)).to.equal(3);
// expect(job.data.to).to.equal(data.to);
// expect(job.data.schedule).to.equal('ONCE');

// expect(job._backoff).to.eql(backoff);
// expect(parseInt(job._priority)).to.equal(0);
// /*jshint camelcase:true */

// finalize();
// });

// everyQueue.promote(3000);

// kueScheduler.schedule(
// '2 seconds from now', {
// type: 'email',
// priority: 'normal',
// attempts: 3,
// backoff: backoff,
// data: data
// },
// function(error, job) {
// if (error) {
// done(error);
// } else {
// /*jshint camelcase:false */
// expect(job.id).to.exist;
// expect(job.type).to.equal('email');
// expect(parseInt(job._max_attempts)).to.equal(3);
// expect(job.data.to).to.equal(data.to);
// expect(job.data.schedule).to.equal('ONCE');

// expect(job._backoff).to.eql(backoff);
// expect(parseInt(job._priority)).to.equal(0);
// /*jshint camelcase:true */
// }
// });


// setTimeout(function() {
// done();
// }, 5000);
// });

});
Loading

0 comments on commit 2d89459

Please sign in to comment.