Skip to content

Commit

Permalink
Merge pull request #7 from screwdriver-cd/cache-stuff
Browse files Browse the repository at this point in the history
feat(queue): cache buildconfig in redis
  • Loading branch information
ian-fox authored Aug 21, 2017
2 parents c3e66e0 + ae1f94c commit c3a5429
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 33 deletions.
40 changes: 30 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

const Executor = require('screwdriver-executor-base');
const Resque = require('node-resque');
const Breaker = require('circuit-fuses').breaker;
const fuses = require('circuit-fuses');
const Breaker = fuses.breaker;
const FuseBox = fuses.box;

class ExecutorQueue extends Executor {
/**
Expand All @@ -24,12 +26,21 @@ class ExecutorQueue extends Executor {

this.prefix = config.prefix || '';
this.buildQueue = `${this.prefix}builds`;
this.buildConfigTable = `${this.prefix}buildConfigs`;

const redisConnection = Object.assign({}, config.redisConnection, { pkg: 'ioredis' });

// eslint-disable-next-line new-cap
this.queue = new Resque.queue({ connection: redisConnection });
this.breaker = new Breaker((funcName, ...args) => this.queue[funcName](...args), breaker);
this.queueBreaker = new Breaker((funcName, ...args) =>
this.queue[funcName](...args), breaker);
this.redisBreaker = new Breaker((funcName, ...args) =>
// Use the queue's built-in connection to send redis commands instead of instantiating a new one
this.queue.connection.redis[funcName](...args), breaker);

this.fuseBox = new FuseBox();
this.fuseBox.addFuse(this.queueBreaker);
this.fuseBox.addFuse(this.redisBreaker);
}

/**
Expand All @@ -45,29 +56,38 @@ class ExecutorQueue extends Executor {
*/
_start(config) {
return this.connect()
// Note: arguments to enqueue are [queue name, job type, array of args]
.then(() => this.breaker.runCommand('enqueue', this.buildQueue, 'start', [config]));
// Store the config in redis
.then(() => this.redisBreaker.runCommand('hset', this.buildConfigTable,
config.buildId, JSON.stringify(config)))
// Note: arguments to enqueue are [queue name, job name, array of args]
.then(() => this.queueBreaker.runCommand('enqueue', this.buildQueue, 'start', [{
buildId: config.buildId
}]));
}

/**
* Stop a running or finished build
* @method _stop
* @param {Object} config Configuration
* @param {Object} [config.annotations] Optional key/value object
* @param {String} config.buildId Unique ID for a build
* @return {Promise}
*/
_stop(config) {
return this.connect()
.then(() => this.breaker.runCommand('del', this.buildQueue, 'start', [config]))
.then(() => this.queueBreaker.runCommand('del', this.buildQueue, 'start', [{
buildId: config.buildId
}]))
.then((numDeleted) => {
if (numDeleted !== 0) {
// Build hadn't been started, "start" event was removed from queue
return null;
return this.redisBreaker.runCommand('hdel', this.buildConfigTable,
config.buildId);
}

// "start" event has been processed, need worker to stop the executor
return this.breaker.runCommand('enqueue', this.buildQueue, 'stop', [config]);
return this.queueBreaker.runCommand('enqueue', this.buildQueue, 'stop', [{
buildId: config.buildId
}]);
});
}

Expand All @@ -81,7 +101,7 @@ class ExecutorQueue extends Executor {
return Promise.resolve();
}

return this.breaker.runCommand('connect');
return this.queueBreaker.runCommand('connect');
}

/**
Expand All @@ -90,7 +110,7 @@ class ExecutorQueue extends Executor {
* @param {Response} Object Object containing stats for the executor
*/
stats() {
return this.breaker.stats();
return this.queueBreaker.stats();
}
}

Expand Down
File renamed without changes.
6 changes: 0 additions & 6 deletions test/data/stop.json

This file was deleted.

39 changes: 22 additions & 17 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ const assert = chai.assert;
const mockery = require('mockery');
const sinon = require('sinon');
const testConnection = require('./data/testConnection.json');
const testStartConfig = require('./data/start.json');
const testStopConfig = require('./data/stop.json');
const testConfig = require('./data/fullConfig.json');
const partialTestConfig = {
buildId: testConfig.buildId
};

sinon.assert.expose(chai.assert, { prefix: '' });

Expand All @@ -31,7 +33,11 @@ describe('index test', () => {
enqueue: sinon.stub().yieldsAsync(),
del: sinon.stub().yieldsAsync(null, 1),
connection: {
connected: false
connected: false,
redis: {
hdel: sinon.stub().yieldsAsync(),
hset: sinon.stub().yieldsAsync()
}
}
};
resqueMock = {
Expand Down Expand Up @@ -84,6 +90,8 @@ describe('index test', () => {

assert.instanceOf(executor, Executor);
assert.strictEqual(executor.prefix, 'beta_');
assert.strictEqual(executor.buildQueue, 'beta_builds');
assert.strictEqual(executor.buildConfigTable, 'beta_buildConfigs');
});

it('throws when not given a redis connection', () => {
Expand All @@ -110,7 +118,7 @@ describe('index test', () => {
});
});

it('enqueues a build', () => executor.start({
it('enqueues a build and caches the config', () => executor.start({
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
},
Expand All @@ -120,7 +128,9 @@ describe('index test', () => {
token: 'asdf'
}).then(() => {
assert.calledOnce(queueMock.connect);
assert.calledWith(queueMock.enqueue, 'builds', 'start', [testStartConfig]);
assert.calledWith(queueMock.connection.redis.hset, 'buildConfigs', testConfig.buildId,
JSON.stringify(testConfig));
assert.calledWith(queueMock.enqueue, 'builds', 'start', [partialTestConfig]);
}));

it('doesn\'t call connect if there\'s already a connection', () => {
Expand All @@ -136,7 +146,7 @@ describe('index test', () => {
token: 'asdf'
}).then(() => {
assert.notCalled(queueMock.connect);
assert.calledWith(queueMock.enqueue, 'builds', 'start', [testStartConfig]);
assert.calledWith(queueMock.enqueue, 'builds', 'start', [partialTestConfig]);
});
});
});
Expand All @@ -154,29 +164,24 @@ describe('index test', () => {
});
});

it('removes a start event from the queue', () => executor.stop({
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
},
it('removes a start event from the queue and the cached buildconfig', () => executor.stop({
buildId: 8609
}).then(() => {
assert.calledOnce(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [testStopConfig]);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]);
assert.calledWith(queueMock.connection.redis.hdel, 'buildConfigs', 8609);
assert.notCalled(queueMock.enqueue);
}));

it('adds a stop event to the queue if no start events were removed', () => {
queueMock.del.yieldsAsync(null, 0);

return executor.stop({
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
},
buildId: 8609
}).then(() => {
assert.calledOnce(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [testStopConfig]);
assert.calledWith(queueMock.enqueue, 'builds', 'stop', [testStopConfig]);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]);
assert.calledWith(queueMock.enqueue, 'builds', 'stop', [partialTestConfig]);
});
});

Expand All @@ -190,7 +195,7 @@ describe('index test', () => {
buildId: 8609
}).then(() => {
assert.notCalled(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [testStopConfig]);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]);
});
});
});
Expand Down

0 comments on commit c3a5429

Please sign in to comment.