Skip to content

Commit

Permalink
feat(653): pass blocked_by to worker (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
d2lam authored May 21, 2018
1 parent 79685e9 commit 643cc56
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 59 deletions.
22 changes: 17 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,47 +267,59 @@ class ExecutorQueue extends Executor {
* @async _start
* @param {Object} config Configuration
* @param {Object} [config.annotations] Optional key/value object
* @param {Array} config.blockedBy Array of job IDs that this job is blocked by. Always blockedby itself
* @param {String} config.apiUri Screwdriver's API
* @param {String} config.jobId JobID that this build belongs to
* @param {String} config.buildId Unique ID for a build
* @param {String} config.container Container for the build to run in
* @param {String} config.token JWT to act on behalf of the build
* @return {Promise}
*/
async _start(config) {
await this.connect();
const { buildId, jobId, blockedBy } = config;

// Store the config in redis
await this.redisBreaker.runCommand('hset', this.buildConfigTable,
config.buildId, JSON.stringify(config));
buildId, JSON.stringify(config));

// Note: arguments to enqueue are [queue name, job name, array of args]
return this.queueBreaker.runCommand('enqueue', this.buildQueue, 'start', [{
buildId: config.buildId
buildId,
jobId,
blockedBy
}]);
}

/**
* Stop a running or finished build
* @async _stop
* @param {Object} config Configuration
* @param {Array} config.blockedBy Array of job IDs that this job is blocked by. Always blockedby itself
* @param {String} config.buildId Unique ID for a build
* @param {String} config.jobId JobID that this build belongs to
* @return {Promise}
*/
async _stop(config) {
await this.connect();

const { buildId, jobId, blockedBy } = config; // in case config contains something else
const numDeleted = await this.queueBreaker.runCommand('del', this.buildQueue, 'start', [{
buildId: config.buildId
buildId,
jobId,
blockedBy
}]);

if (numDeleted !== 0) {
// Build hadn't been started, "start" event was removed from queue
return this.redisBreaker.runCommand('hdel', this.buildConfigTable, config.buildId);
return this.redisBreaker.runCommand('hdel', this.buildConfigTable, buildId);
}

// "start" event has been processed, need worker to stop the executor
return this.queueBreaker.runCommand('enqueue', this.buildQueue, 'stop', [{
buildId: config.buildId
buildId,
jobId,
blockedBy
}]);
}

Expand Down
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,21 @@
],
"devDependencies": {
"chai": "^3.5.0",
"eslint": "^4.3.0",
"eslint": "^4.19.1",
"eslint-config-screwdriver": "^3.0.0",
"jenkins-mocha": "^4.0.0",
"mockery": "^2.0.0",
"sinon": "^2.3.4"
},
"dependencies": {
"circuit-fuses": "^3.0.0",
"circuit-fuses": "^3.3.0",
"cron-parser": "^2.5.0",
"hoek": "^5.0.3",
"ioredis": "^3.1.2",
"node-resque": "^4.0.7",
"screwdriver-executor-base": "^6.1.0",
"ioredis": "^3.2.2",
"node-resque": "^4.0.9",
"request": "^2.85.0",
"screwdriver-data-schema": "^18.19.0",
"screwdriver-executor-base": "^6.1.0",
"string-hash": "^1.1.3",
"winston": "^2.3.1"
},
Expand Down
4 changes: 3 additions & 1 deletion test/data/fullConfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
"annotations": {
"beta.screwdriver.cd/executor": "screwdriver-executor-k8s"
},
"jobId": 777,
"buildId": 8609,
"container": "node:4",
"apiUri": "http://api.com",
"token": "asdf"
"token": "asdf",
"blockedBy": [777]
}
66 changes: 18 additions & 48 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ const testConfig = require('./data/fullConfig.json');
const testPipeline = require('./data/testPipeline.json');
const testJob = require('./data/testJob.json');
const partialTestConfig = {
buildId: testConfig.buildId
buildId: testConfig.buildId,
jobId: testConfig.jobId,
blockedBy: testConfig.blockedBy
};
const tokenGen = sinon.stub().returns('123456abc');
const testDelayedConfig = {
Expand Down Expand Up @@ -289,30 +291,14 @@ describe('index test', () => {
it('rejects if it can\'t establish a connection', function () {
queueMock.connect.yieldsAsync(new Error('couldn\'t connect'));

return executor.start({
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
},
buildId: 8609,
container: 'node:4',
apiUri: 'http://api.com',
token: 'asdf'
}).then(() => {
return executor.start(testConfig).then(() => {
assert.fail('Should not get here');
}, (err) => {
assert.instanceOf(err, Error);
});
});

it('enqueues a build and caches the config', () => executor.start({
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
},
buildId: 8609,
container: 'node:4',
apiUri: 'http://api.com',
token: 'asdf'
}).then(() => {
it('enqueues a build and caches the config', () => executor.start(testConfig).then(() => {
assert.calledOnce(queueMock.connect);
assert.calledWith(redisMock.hset, 'buildConfigs', testConfig.buildId,
JSON.stringify(testConfig));
Expand All @@ -322,15 +308,7 @@ describe('index test', () => {
it('doesn\'t call connect if there\'s already a connection', () => {
queueMock.connection.connected = true;

return executor.start({
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
},
buildId: 8609,
container: 'node:4',
apiUri: 'http://api.com',
token: 'asdf'
}).then(() => {
return executor.start(testConfig).then(() => {
assert.notCalled(queueMock.connect);
assert.calledWith(queueMock.enqueue, 'builds', 'start', [partialTestConfig]);
});
Expand All @@ -341,30 +319,25 @@ describe('index test', () => {
it('rejects if it can\'t establish a connection', function () {
queueMock.connect.yieldsAsync(new Error('couldn\'t connect'));

return executor.stop({
buildId: 8609
}).then(() => {
return executor.stop(partialTestConfig).then(() => {
assert.fail('Should not get here');
}, (err) => {
assert.instanceOf(err, Error);
});
});

it('removes a start event from the queue and the cached buildconfig', () => executor.stop({
buildId: 8609
}).then(() => {
assert.calledOnce(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]);
assert.calledWith(redisMock.hdel, 'buildConfigs', 8609);
assert.notCalled(queueMock.enqueue);
}));
it('removes a start event from the queue and the cached buildconfig',
() => executor.stop(partialTestConfig).then(() => {
assert.calledOnce(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]);
assert.calledWith(redisMock.hdel, 'buildConfigs', 8609);
assert.notCalled(queueMock.enqueue);
}));

it('adds a stop event to the queue if no start events were removed', () => {
queueMock.del.yieldsAsync(null, 0);

return executor.stop({
buildId: 8609
}).then(() => {
return executor.stop(partialTestConfig).then(() => {
assert.calledOnce(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]);
assert.calledWith(queueMock.enqueue, 'builds', 'stop', [partialTestConfig]);
Expand All @@ -374,12 +347,9 @@ describe('index test', () => {
it('doesn\'t call connect if there\'s already a connection', () => {
queueMock.connection.connected = true;

return executor.stop({
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
},
buildId: 8609
}).then(() => {
return executor.stop(Object.assign({}, partialTestConfig, { annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
} })).then(() => {
assert.notCalled(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]);
});
Expand Down

0 comments on commit 643cc56

Please sign in to comment.