diff --git a/index.js b/index.js index c9efb96..ba16257 100644 --- a/index.js +++ b/index.js @@ -72,7 +72,7 @@ class ExecutorQueue extends Executor { !!err || (response.statusCode !== 201 && response.statusCode !== 200); this.requestRetryStrategyPostEvent = (err, response) => !!err || (response.statusCode !== 201 && response.statusCode !== 200 - && response.statusCode !== 404); // postEvent can return 404 if no job to start + && response.statusCode !== 404); // postEvent can return 404 if no job to start this.fuseBox = new FuseBox(); this.fuseBox.addFuse(this.queueBreaker); this.fuseBox.addFuse(this.redisBreaker); @@ -88,29 +88,33 @@ class ExecutorQueue extends Executor { }; // Jobs object to register the worker with const jobs = { - startDelayed: Object.assign({ perform: async (jobConfig) => { - try { - const fullConfig = await this.redisBreaker - .runCommand('hget', this.periodicBuildTable, jobConfig.jobId); - - return await this.startPeriodic( - Object.assign(JSON.parse(fullConfig), { triggerBuild: true })); - } catch (err) { - winston.error('err in startDelayed job: ', err); - throw err; + startDelayed: Object.assign({ + perform: async (jobConfig) => { + try { + const fullConfig = await this.redisBreaker + .runCommand('hget', this.periodicBuildTable, jobConfig.jobId); + + return await this.startPeriodic( + Object.assign(JSON.parse(fullConfig), { triggerBuild: true })); + } catch (err) { + winston.error('err in startDelayed job: ', err); + throw err; + } } - } }, retryOptions), - startFrozen: Object.assign({ perform: async (jobConfig) => { - try { - const fullConfig = await this.redisBreaker - .runCommand('hget', this.frozenBuildTable, jobConfig.jobId); - - return await this.startFrozen(JSON.parse(fullConfig)); - } catch (err) { - winston.error('err in startFrozen job: ', err); - throw err; + }, retryOptions), + startFrozen: Object.assign({ + perform: async (jobConfig) => { + try { + const fullConfig = await this.redisBreaker + .runCommand('hget', this.frozenBuildTable, jobConfig.jobId); + + return await this.startFrozen(JSON.parse(fullConfig)); + } catch (err) { + winston.error('err in startFrozen job: ', err); + throw err; + } } - } }, retryOptions) + }, retryOptions) }; // eslint-disable-next-line new-cap @@ -165,15 +169,19 @@ class ExecutorQueue extends Executor { this.multiWorker.start(); this.scheduler.connect().then(() => this.scheduler.start()); + } - process.on('SIGTERM', () => { - this.multiWorker.end().catch((err) => { - winston.error(`failed to end the worker: ${err}`); - }).then(() => this.scheduler.end()).catch((err) => { - winston.error(`failed to end the scheduler: ${err}`); - process.exit(128); - }); - }); + /** + * Cleanup any reladed processing + */ + async cleanUp() { + try { + await this.multiWorker.end(); + await this.scheduler.end(); + await this.queue.end(); + } catch (err) { + winston.error(`failed to end executor queue: ${err}`); + } } /** diff --git a/test/index.test.js b/test/index.test.js index dade331..4fcc748 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -18,7 +18,8 @@ const partialTestConfig = { blockedBy }; const partialTestConfigToString = Object.assign({}, partialTestConfig, { - blockedBy: blockedBy.toString() }); + blockedBy: blockedBy.toString() +}); const testAdmin = { username: 'admin' }; @@ -64,7 +65,7 @@ describe('index test', () => { tokenGen: userTokenGen }; multiWorker = function () { - this.start = () => {}; + this.start = () => { }; this.end = sinon.stub().resolves(); }; scheduler = function () { @@ -82,7 +83,8 @@ describe('index test', () => { delDelayed: sinon.stub().resolves(1), connection: { connected: false - } + }, + end: sinon.stub().resolves() }; resqueMock = { Queue: sinon.stub().returns(queueMock), @@ -581,7 +583,8 @@ describe('index test', () => { it('adds a stop event to the queue if it has no blocked job', () => { queueMock.del.resolves(0); const partialTestConfigUndefined = Object.assign({}, partialTestConfig, { - blockedBy: undefined }); + blockedBy: undefined + }); const stopConfig = Object.assign({ started: true }, partialTestConfigUndefined); return executor.stop(partialTestConfigUndefined).then(() => { @@ -594,15 +597,27 @@ describe('index test', () => { it('doesn\'t call connect if there\'s already a connection', () => { queueMock.connection.connected = true; - return executor.stop(Object.assign({}, partialTestConfig, { annotations: { - 'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s' - } })).then(() => { + return executor.stop(Object.assign({}, partialTestConfig, { + annotations: { + 'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s' + } + })).then(() => { assert.notCalled(queueMock.connect); assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfigToString]); }); }); }); + describe('cleanUp', () => { + it('worker.end() is called', () => { + executor.cleanUp().then(() => { + assert.calledWith(spyMultiWorker); + assert.calledWith(spyScheduler); + assert.calledWith(queueMock.end); + }); + }); + }); + describe('stats', () => { it('returns the correct stats', () => { assert.deepEqual(executor.stats(), {