diff --git a/index.js b/index.js index 558650f..a98ff89 100644 --- a/index.js +++ b/index.js @@ -2,7 +2,9 @@ const Executor = require('screwdriver-executor-base'); const Resque = require('node-resque'); -const Breaker = require('circuit-fuses').breaker; +const fuses = require('circuit-fuses'); +const Breaker = fuses.breaker; +const FuseBox = fuses.box; class ExecutorQueue extends Executor { /** @@ -24,12 +26,21 @@ class ExecutorQueue extends Executor { this.prefix = config.prefix || ''; this.buildQueue = `${this.prefix}builds`; + this.buildConfigTable = `${this.prefix}buildConfigs`; const redisConnection = Object.assign({}, config.redisConnection, { pkg: 'ioredis' }); // eslint-disable-next-line new-cap this.queue = new Resque.queue({ connection: redisConnection }); - this.breaker = new Breaker((funcName, ...args) => this.queue[funcName](...args), breaker); + this.queueBreaker = new Breaker((funcName, ...args) => + this.queue[funcName](...args), breaker); + this.redisBreaker = new Breaker((funcName, ...args) => + // Use the queue's built-in connection to send redis commands instead of instantiating a new one + this.queue.connection.redis[funcName](...args), breaker); + + this.fuseBox = new FuseBox(); + this.fuseBox.addFuse(this.queueBreaker); + this.fuseBox.addFuse(this.redisBreaker); } /** @@ -45,29 +56,38 @@ class ExecutorQueue extends Executor { */ _start(config) { return this.connect() - // Note: arguments to enqueue are [queue name, job type, array of args] - .then(() => this.breaker.runCommand('enqueue', this.buildQueue, 'start', [config])); + // Store the config in redis + .then(() => this.redisBreaker.runCommand('hset', this.buildConfigTable, + config.buildId, JSON.stringify(config))) + // Note: arguments to enqueue are [queue name, job name, array of args] + .then(() => this.queueBreaker.runCommand('enqueue', this.buildQueue, 'start', [{ + buildId: config.buildId + }])); } /** * Stop a running or finished build * @method _stop * @param {Object} config Configuration - * @param {Object} [config.annotations] Optional key/value object * @param {String} config.buildId Unique ID for a build * @return {Promise} */ _stop(config) { return this.connect() - .then(() => this.breaker.runCommand('del', this.buildQueue, 'start', [config])) + .then(() => this.queueBreaker.runCommand('del', this.buildQueue, 'start', [{ + buildId: config.buildId + }])) .then((numDeleted) => { if (numDeleted !== 0) { // Build hadn't been started, "start" event was removed from queue - return null; + return this.redisBreaker.runCommand('hdel', this.buildConfigTable, + config.buildId); } // "start" event has been processed, need worker to stop the executor - return this.breaker.runCommand('enqueue', this.buildQueue, 'stop', [config]); + return this.queueBreaker.runCommand('enqueue', this.buildQueue, 'stop', [{ + buildId: config.buildId + }]); }); } @@ -81,7 +101,7 @@ class ExecutorQueue extends Executor { return Promise.resolve(); } - return this.breaker.runCommand('connect'); + return this.queueBreaker.runCommand('connect'); } /** @@ -90,7 +110,7 @@ class ExecutorQueue extends Executor { * @param {Response} Object Object containing stats for the executor */ stats() { - return this.breaker.stats(); + return this.queueBreaker.stats(); } } diff --git a/test/data/start.json b/test/data/fullConfig.json similarity index 100% rename from test/data/start.json rename to test/data/fullConfig.json diff --git a/test/data/stop.json b/test/data/stop.json deleted file mode 100644 index fcc3daa..0000000 --- a/test/data/stop.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "annotations": { - "beta.screwdriver.cd/executor": "screwdriver-executor-k8s" - }, - "buildId": 8609 -} diff --git a/test/index.test.js b/test/index.test.js index bde9274..1d159d0 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -7,8 +7,10 @@ const assert = chai.assert; const mockery = require('mockery'); const sinon = require('sinon'); const testConnection = require('./data/testConnection.json'); -const testStartConfig = require('./data/start.json'); -const testStopConfig = require('./data/stop.json'); +const testConfig = require('./data/fullConfig.json'); +const partialTestConfig = { + buildId: testConfig.buildId +}; sinon.assert.expose(chai.assert, { prefix: '' }); @@ -31,7 +33,11 @@ describe('index test', () => { enqueue: sinon.stub().yieldsAsync(), del: sinon.stub().yieldsAsync(null, 1), connection: { - connected: false + connected: false, + redis: { + hdel: sinon.stub().yieldsAsync(), + hset: sinon.stub().yieldsAsync() + } } }; resqueMock = { @@ -84,6 +90,8 @@ describe('index test', () => { assert.instanceOf(executor, Executor); assert.strictEqual(executor.prefix, 'beta_'); + assert.strictEqual(executor.buildQueue, 'beta_builds'); + assert.strictEqual(executor.buildConfigTable, 'beta_buildConfigs'); }); it('throws when not given a redis connection', () => { @@ -110,7 +118,7 @@ describe('index test', () => { }); }); - it('enqueues a build', () => executor.start({ + it('enqueues a build and caches the config', () => executor.start({ annotations: { 'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s' }, @@ -120,7 +128,9 @@ describe('index test', () => { token: 'asdf' }).then(() => { assert.calledOnce(queueMock.connect); - assert.calledWith(queueMock.enqueue, 'builds', 'start', [testStartConfig]); + assert.calledWith(queueMock.connection.redis.hset, 'buildConfigs', testConfig.buildId, + JSON.stringify(testConfig)); + assert.calledWith(queueMock.enqueue, 'builds', 'start', [partialTestConfig]); })); it('doesn\'t call connect if there\'s already a connection', () => { @@ -136,7 +146,7 @@ describe('index test', () => { token: 'asdf' }).then(() => { assert.notCalled(queueMock.connect); - assert.calledWith(queueMock.enqueue, 'builds', 'start', [testStartConfig]); + assert.calledWith(queueMock.enqueue, 'builds', 'start', [partialTestConfig]); }); }); }); @@ -154,14 +164,12 @@ describe('index test', () => { }); }); - it('removes a start event from the queue', () => executor.stop({ - annotations: { - 'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s' - }, + it('removes a start event from the queue and the cached buildconfig', () => executor.stop({ buildId: 8609 }).then(() => { assert.calledOnce(queueMock.connect); - assert.calledWith(queueMock.del, 'builds', 'start', [testStopConfig]); + assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]); + assert.calledWith(queueMock.connection.redis.hdel, 'buildConfigs', 8609); assert.notCalled(queueMock.enqueue); })); @@ -169,14 +177,11 @@ describe('index test', () => { queueMock.del.yieldsAsync(null, 0); return executor.stop({ - annotations: { - 'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s' - }, buildId: 8609 }).then(() => { assert.calledOnce(queueMock.connect); - assert.calledWith(queueMock.del, 'builds', 'start', [testStopConfig]); - assert.calledWith(queueMock.enqueue, 'builds', 'stop', [testStopConfig]); + assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]); + assert.calledWith(queueMock.enqueue, 'builds', 'stop', [partialTestConfig]); }); }); @@ -190,7 +195,7 @@ describe('index test', () => { buildId: 8609 }).then(() => { assert.notCalled(queueMock.connect); - assert.calledWith(queueMock.del, 'builds', 'start', [testStopConfig]); + assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfig]); }); }); });