Skip to content

Commit

Permalink
feat(688): Add support for periodic builds (#14)
Browse files Browse the repository at this point in the history
* feat(688): Add lib function to transform cron expression

* startPeriodic() and stopPeriodic()

* Tests

* Changes to fix review comments

* Fix tests

* Add check to see if annotation exists

* Fix cron module to be more consistent

* Create new multiworker and scheduler
  • Loading branch information
Pranav Ravichandran authored May 2, 2018
1 parent 2e30acd commit dbcdff0
Show file tree
Hide file tree
Showing 7 changed files with 525 additions and 2 deletions.
212 changes: 212 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ const Executor = require('screwdriver-executor-base');
const Redis = require('ioredis');
const Resque = require('node-resque');
const fuses = require('circuit-fuses');
const req = require('request');
const hoek = require('hoek');
const winston = require('winston');
const cron = require('./lib/cron');
const Breaker = fuses.breaker;
const FuseBox = fuses.box;

Expand All @@ -27,7 +31,10 @@ class ExecutorQueue extends Executor {

this.prefix = config.prefix || '';
this.buildQueue = `${this.prefix}builds`;
this.periodicBuildQueue = `${this.prefix}periodicBuilds`;
this.buildConfigTable = `${this.prefix}buildConfigs`;
this.periodicBuildTable = `${this.prefix}periodicBuildConfigs`;
this.tokenGen = null;

const redisConnection = Object.assign({}, config.redisConnection, { pkg: 'ioredis' });

Expand All @@ -48,6 +55,211 @@ class ExecutorQueue extends Executor {
this.fuseBox = new FuseBox();
this.fuseBox.addFuse(this.queueBreaker);
this.fuseBox.addFuse(this.redisBreaker);

const RETRY_LIMIT = 3;
const RETRY_DELAY = 5;
const retryOptions = {
plugins: ['retry'],
pluginOptions: {
retry: {
retryLimit: RETRY_LIMIT,
retryDelay: RETRY_DELAY
}
}
};
// Jobs object to register the worker with
const jobs = {
startDelayed: Object.assign({ perform: (jobConfig, callback) =>
this.redisBreaker.runCommand('hget', this.periodicBuildTable,
jobConfig.jobId)
.then(fullConfig => this.startPeriodic(Object.assign(JSON.parse(fullConfig)),
{ triggerBuild: true }))
.then(result => callback(null, result), (err) => {
winston.error('err in startDelayed job: ', err);
callback(err);
})
}, retryOptions)
};

// eslint-disable-next-line new-cap
this.multiWorker = new Resque.multiWorker({
connection: redisConnection,
queues: [this.periodicBuildQueue],
minTaskProcessors: 1,
maxTaskProcessors: 10,
checkTimeout: 1000,
maxEventLoopDelay: 10,
toDisconnectProcessors: true
}, jobs);
// eslint-disable-next-line new-cap
this.scheduler = new Resque.scheduler({ connection: redisConnection });

this.multiWorker.on('start', workerId =>
winston.info(`worker[${workerId}] started`));
this.multiWorker.on('end', workerId =>
winston.info(`worker[${workerId}] ended`));
this.multiWorker.on('cleaning_worker', (workerId, worker, pid) =>
winston.info(`cleaning old worker ${worker} pid ${pid}`));
this.multiWorker.on('job', (workerId, queue, job) =>
winston.info(`worker[${workerId}] working job ${queue} ${JSON.stringify(job)}`));
this.multiWorker.on('reEnqueue', (workerId, queue, job, plugin) =>
// eslint-disable-next-line max-len
winston.info(`worker[${workerId}] reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`));
this.multiWorker.on('success', (workerId, queue, job, result) =>
// eslint-disable-next-line max-len
winston.info(`worker[${workerId}] ${job} success ${queue} ${JSON.stringify(job)} >> ${result}`));
this.multiWorker.on('failure', (workerId, queue, job, failure) =>
// eslint-disable-next-line max-len
winston.info(`worker[${workerId}] ${job} failure ${queue} ${JSON.stringify(job)} >> ${failure}`));
this.multiWorker.on('error', (workerId, queue, job, error) =>
winston.error(`worker[${workerId}] error ${queue} ${JSON.stringify(job)} >> ${error}`));
this.multiWorker.on('pause', workerId =>
winston.info(`worker[${workerId}] paused`));

// multiWorker emitters
this.multiWorker.on('internalError', error =>
winston.error(error));
this.multiWorker.on('multiWorkerAction', (verb, delay) =>
winston.info(`*** checked for worker status: ${verb} (event loop delay: ${delay}ms)`));

this.scheduler.on('start', () =>
winston.info('scheduler started'));
this.scheduler.on('end', () =>
winston.info('scheduler ended'));
this.scheduler.on('master', state =>
winston.info(`scheduler became master ${state}`));
this.scheduler.on('error', error =>
winston.info(`scheduler error >> ${error}`));
this.scheduler.on('working_timestamp', timestamp =>
winston.info(`scheduler working timestamp ${timestamp}`));
this.scheduler.on('transferred_job', (timestamp, job) =>
winston.info(`scheduler enqueuing job timestamp >> ${JSON.stringify(job)}`));

this.multiWorker.start();
this.scheduler.connect(() => {
this.scheduler.start();
});

process.on('SIGTERM', () => {
this.multiWorker.end((error) => {
if (error) {
winston.error(`failed to end the worker: ${error}`);
}

this.scheduler.end((err) => {
if (err) {
winston.error(`failed to end the scheduler: ${err}`);
process.exit(128);
}
process.exit(0);
});
});
});
}

/**
* Posts a new build event to the API
* @method postBuildEvent
* @param {Object} config Configuration
* @param {Object} config.pipeline Pipeline of the job
* @param {Object} config.job Job object to create periodic builds for
* @return {Promise}
*/
async postBuildEvent({ pipeline, job }) {
const jwt = this.tokenGen(pipeline.admins[0], {}, pipeline.scmContext);

const options = {
url: '/events',
method: 'POST',
headers: {
Authorization: `Bearer ${jwt}`,
'Content-Type': 'application/json'
},
body: {
pipelineId: pipeline.id,
startFrom: job.name
}
};

return req(options, (err, response) => {
if (!err && response.statusCode === 201) {
return Promise.resolve(response);
}

return Promise.reject(err);
});
}

/**
* Starts a new periodic build in an executor
* @method _startPeriodic
* @param {Object} config Configuration
* @param {Object} config.pipeline Pipeline of the job
* @param {Object} config.job Job object to create periodic builds for
* @param {Function} config.tokenGen Function to generate JWT from username, scope and scmContext
* @param {Boolean} config.isUpdate Boolean to determine if updating existing periodic build
* @param {Boolean} config.triggerBuild Flag to post new build event
* @return {Promise}
*/
async _startPeriodic(config) {
// eslint-disable-next-line max-len
const buildCron = hoek.reach(config.job, 'permutations>0>annotations>screwdriver.cd/buildPeriodically',
{ separator: '>' });

// Save tokenGen to current executor object so we can access it in postBuildEvent
if (!this.tokenGen) {
this.tokenGen = config.tokenGen;
}

if (config.isUpdate) {
// eslint-disable-next-line no-underscore-dangle
await this._stopPeriodic({
jobId: config.job.id
});
}

if (config.triggerBuild) {
await this.postBuildEvent(config);
}

if (buildCron) {
await this.connect();

const next = cron.next(cron.transform(buildCron, config.job.id));

// Store the config in redis
await this.redisBreaker.runCommand('hset', this.periodicBuildTable,
config.job.id, JSON.stringify(Object.assign(config, {
isUpdate: false,
triggerBuild: false
})));

// Note: arguments to enqueueAt are [timestamp, queue name, job name, array of args]
return this.queueBreaker.runCommand('enqueueAt', next,
this.periodicBuildQueue, 'startDelayed', [{
jobId: config.job.id
}])
.catch(() => Promise.resolve());
}

return Promise.resolve();
}

/**
* Stops a previously scheduled periodic build in an executor
* @async _stopPeriodic
* @param {Object} config Configuration
* @param {Integer} config.jobId ID of the job with periodic builds
* @return {Promise}
*/
async _stopPeriodic(config) {
await this.connect();

await this.queueBreaker.runCommand('delDelayed', this.periodicBuildQueue, 'startDelayed', [{
jobId: config.jobId
}]);

return this.redisBreaker.runCommand('hdel', this.periodicBuildTable, config.jobId);
}

/**
Expand Down
116 changes: 116 additions & 0 deletions lib/cron.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
'use strict';

const parser = require('cron-parser');
const stringHash = require('string-hash');

/**
* Evaluate a numeric hash to a number within the range of min and max
*
* @method evaluateHash
* @param {Number} hash Hash to evaluate
* @param {Number} min Minimum evaluated value
* @param {String} max Maximum evaluated value
* @return {Number} Evaluated hash
*/
const evaluateHash = (hash, min, max) => (hash % ((max + 1) - min)) + min;

/**
* Transform a cron value containing a valid 'H' symbol into a valid cron value
* @method transformValue
* @param {String} cronValue Value to transform
* @param {Number} min Minimum acceptable value
* @param {Number} max Maximum acceptable value
* @param {Number} hashValue Numeric hash to determine new value
* @return {String} Transformed cron value
*/
const transformValue = (cronValue, min, max, hashValue) => {
const values = cronValue.split(',');

// Transform each ',' seperated value
// Ignore values that do not have a valid 'H' symbol
values.forEach((value, i) => {
// 'H' should evaluate to some value within the range (e.g. [0-59])
if (value === 'H') {
values[i] = evaluateHash(hashValue, min, max);

return;
}

// e.g. H/5 -> #/5
if (value.match(/H\/\d+/)) {
values[i] = value.replace('H', evaluateHash(hashValue, min, max));

return;
}

// e.g. H(0-5) -> #
if (value.match(/H\(\d+-\d+\)/)) {
const newMin = Number(value.substring(2, value.lastIndexOf('-')));
const newMax = Number(value.substring(value.lastIndexOf('-') + 1,
value.lastIndexOf(')')));

// Range is invalid, throw an error
if (newMin < min || newMax > max || newMin > newMax) {
throw new Error(`${value} has an invalid range, expected range ${min}-${max}`);
}

values[i] = evaluateHash(hashValue, newMin, newMax);
}
});

return values.join(',');
};

/**
* Transform a cron expression containing valid 'H' symbol(s) into a valid cron expression
* @method transformCron
* @param {String} cronExp Cron expression to transform
* @param {Number} jobId Job ID
* @return {String} Transformed cron expression
*/
const transformCron = (cronExp, jobId) => {
const fields = cronExp.trim().split(/\s+/);

// The seconds field is not allowed (e.g. '* * * * * *')
if (fields.length !== 5) {
throw new Error(`${cronExp} does not have exactly 5 fields`);
}

const jobIdHash = stringHash(jobId.toString());

// Minutes [0-59]
// Always treat the minutes value as 'H'
fields[0] = transformValue('H', 0, 59, jobIdHash);
// Hours [0-23]
fields[1] = transformValue(fields[1], 0, 23, jobIdHash);
// Day of month [1-31]
fields[2] = transformValue(fields[2], 1, 31, jobIdHash);
// Months [1-12]
fields[3] = transformValue(fields[3], 1, 12, jobIdHash);
// Day of week [0-6]
fields[4] = transformValue(fields[4], 0, 6, jobIdHash);

const newCronExp = fields.join(' ');

// Perform final validation before returning
parser.parseExpression(newCronExp);

return newCronExp;
};

/**
* Get the next time of execution based on the cron expression
* @method nextExecution
* @param {String} cronExp Cron expression to calculate next execution time
* @return {Number} Epoch timestamp (time of next execution).
*/
const nextExecution = (cronExp) => {
const interval = parser.parseExpression(cronExp);

return interval.next().getTime();
};

module.exports = {
transform: transformCron,
next: nextExecution
};
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@
},
"dependencies": {
"circuit-fuses": "^3.0.0",
"cron-parser": "^2.5.0",
"hoek": "^5.0.3",
"ioredis": "^3.1.2",
"node-resque": "^4.0.7",
"screwdriver-executor-base": "^5.2.2"
"screwdriver-executor-base": "^6.1.0",
"request": "^2.85.0",
"string-hash": "^1.1.3",
"winston": "^2.3.1"
},
"release": {
"debug": false,
Expand Down
13 changes: 13 additions & 0 deletions test/data/testJob.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"id": 1234,
"pipelineId": 123,
"name": "deploy",
"state": "ENABLED",
"permutations": [
{
"annotations": {
"screwdriver.cd/buildPeriodically": "* * * * *"
}
}
]
}
9 changes: 9 additions & 0 deletions test/data/testPipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"id": 123,
"scmUri": "github.com:12345:branchName",
"scmContext": "github:github.com",
"createTime": "2038-01-19T03:14:08.131Z",
"admins": {
"stjohn": true
}
}
Loading

0 comments on commit dbcdff0

Please sign in to comment.