Skip to content

Commit

Permalink
feat(1610): Add cleanup method
Browse files Browse the repository at this point in the history
  • Loading branch information
pritamstyz4ever committed Nov 1, 2019
1 parent f8e8fec commit d8abcf7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 37 deletions.
68 changes: 38 additions & 30 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ExecutorQueue extends Executor {
!!err || (response.statusCode !== 201 && response.statusCode !== 200);
this.requestRetryStrategyPostEvent = (err, response) =>
!!err || (response.statusCode !== 201 && response.statusCode !== 200
&& response.statusCode !== 404); // postEvent can return 404 if no job to start
&& response.statusCode !== 404); // postEvent can return 404 if no job to start
this.fuseBox = new FuseBox();
this.fuseBox.addFuse(this.queueBreaker);
this.fuseBox.addFuse(this.redisBreaker);
Expand All @@ -88,29 +88,33 @@ class ExecutorQueue extends Executor {
};
// Jobs object to register the worker with
const jobs = {
startDelayed: Object.assign({ perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.periodicBuildTable, jobConfig.jobId);

return await this.startPeriodic(
Object.assign(JSON.parse(fullConfig), { triggerBuild: true }));
} catch (err) {
winston.error('err in startDelayed job: ', err);
throw err;
startDelayed: Object.assign({
perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.periodicBuildTable, jobConfig.jobId);

return await this.startPeriodic(
Object.assign(JSON.parse(fullConfig), { triggerBuild: true }));
} catch (err) {
winston.error('err in startDelayed job: ', err);
throw err;
}
}
} }, retryOptions),
startFrozen: Object.assign({ perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.frozenBuildTable, jobConfig.jobId);

return await this.startFrozen(JSON.parse(fullConfig));
} catch (err) {
winston.error('err in startFrozen job: ', err);
throw err;
}, retryOptions),
startFrozen: Object.assign({
perform: async (jobConfig) => {
try {
const fullConfig = await this.redisBreaker
.runCommand('hget', this.frozenBuildTable, jobConfig.jobId);

return await this.startFrozen(JSON.parse(fullConfig));
} catch (err) {
winston.error('err in startFrozen job: ', err);
throw err;
}
}
} }, retryOptions)
}, retryOptions)
};

// eslint-disable-next-line new-cap
Expand Down Expand Up @@ -165,15 +169,19 @@ class ExecutorQueue extends Executor {

this.multiWorker.start();
this.scheduler.connect().then(() => this.scheduler.start());
}

process.on('SIGTERM', () => {
this.multiWorker.end().catch((err) => {
winston.error(`failed to end the worker: ${err}`);
}).then(() => this.scheduler.end()).catch((err) => {
winston.error(`failed to end the scheduler: ${err}`);
process.exit(128);
});
});
/**
* Cleanup any reladed processing
*/
async cleanUp() {
try {
await this.multiWorker.end();
await this.scheduler.end();
await this.queue.end();
} catch (err) {
winston.error(`failed to end executor queue: ${err}`);
}
}

/**
Expand Down
29 changes: 22 additions & 7 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const partialTestConfig = {
blockedBy
};
const partialTestConfigToString = Object.assign({}, partialTestConfig, {
blockedBy: blockedBy.toString() });
blockedBy: blockedBy.toString()
});
const testAdmin = {
username: 'admin'
};
Expand Down Expand Up @@ -64,7 +65,7 @@ describe('index test', () => {
tokenGen: userTokenGen
};
multiWorker = function () {
this.start = () => {};
this.start = () => { };
this.end = sinon.stub().resolves();
};
scheduler = function () {
Expand All @@ -82,7 +83,8 @@ describe('index test', () => {
delDelayed: sinon.stub().resolves(1),
connection: {
connected: false
}
},
end: sinon.stub().resolves()
};
resqueMock = {
Queue: sinon.stub().returns(queueMock),
Expand Down Expand Up @@ -581,7 +583,8 @@ describe('index test', () => {
it('adds a stop event to the queue if it has no blocked job', () => {
queueMock.del.resolves(0);
const partialTestConfigUndefined = Object.assign({}, partialTestConfig, {
blockedBy: undefined });
blockedBy: undefined
});
const stopConfig = Object.assign({ started: true }, partialTestConfigUndefined);

return executor.stop(partialTestConfigUndefined).then(() => {
Expand All @@ -594,15 +597,27 @@ describe('index test', () => {
it('doesn\'t call connect if there\'s already a connection', () => {
queueMock.connection.connected = true;

return executor.stop(Object.assign({}, partialTestConfig, { annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
} })).then(() => {
return executor.stop(Object.assign({}, partialTestConfig, {
annotations: {
'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s'
}
})).then(() => {
assert.notCalled(queueMock.connect);
assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfigToString]);
});
});
});

describe('cleanUp', () => {
it('worker.end() is called', () => {
executor.cleanUp().then(() => {
assert.calledWith(spyMultiWorker);
assert.calledWith(spyScheduler);
assert.calledWith(queueMock.end);
});
});
});

describe('stats', () => {
it('returns the correct stats', () => {
assert.deepEqual(executor.stats(), {
Expand Down

0 comments on commit d8abcf7

Please sign in to comment.