From ea92074b347699908fd7fafa5190fa313b3da595 Mon Sep 17 00:00:00 2001 From: Yuichi Sawada <4645011+yuichi10@users.noreply.github.com> Date: Tue, 16 Jul 2019 12:11:51 +0900 Subject: [PATCH] fix: use async/await (#68) * fix: use async/await Co-authored-by: catto * fix * fix try place * fix scheduler and multi worker * fix not able to use async/await in constructor * refact code * fix * fix * fix test --- index.js | 61 ++++++++++++++++++---------------------------- test/index.test.js | 8 +++--- 2 files changed, 28 insertions(+), 41 deletions(-) diff --git a/index.js b/index.js index 7e2cc63..49aff1a 100644 --- a/index.js +++ b/index.js @@ -162,23 +162,14 @@ class ExecutorQueue extends Executor { winston.info(`scheduler enqueuing job timestamp >> ${JSON.stringify(job)}`)); this.multiWorker.start(); - this.scheduler.connect(() => { - this.scheduler.start(); - }); + this.scheduler.connect().then(() => this.scheduler.start()); process.on('SIGTERM', () => { - this.multiWorker.end((error) => { - if (error) { - winston.error(`failed to end the worker: ${error}`); - } - - this.scheduler.end((err) => { - if (err) { - winston.error(`failed to end the scheduler: ${err}`); - process.exit(128); - } - process.exit(0); - }); + this.multiWorker.end().catch((err) => { + winston.error(`failed to end the worker: ${err}`); + }).then(() => this.scheduler.end()).catch((err) => { + winston.error(`failed to end the scheduler: ${err}`); + process.exit(128); }); }); } @@ -326,31 +317,27 @@ class ExecutorQueue extends Executor { }))); // Note: arguments to enqueueAt are [timestamp, queue name, job name, array of args] - return new Promise((resolve) => { - let shouldRetry = false; - - this.queue.enqueueAt(next, - this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }], (err) => { - // Error thrown by node-resque if there is duplicate: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L65 - // eslint-disable-next-line max-len - if (err && err.message !== 'Job already enqueued at this time with same arguments') { - shouldRetry = true; - } - }); - - return resolve(shouldRetry); - }).then((shouldRetry) => { - if (!shouldRetry) { - return Promise.resolve(); - } + let shouldRetry = false; - return this.queueBreaker.runCommand('enqueueAt', next, + try { + await this.queue.enqueueAt(next, this.periodicBuildQueue, + 'startDelayed', [{ jobId: job.id }]); + } catch (err) { + // Error thrown by node-resque if there is duplicate: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L65 + // eslint-disable-next-line max-len + if (err && err.message !== 'Job already enqueued at this time with same arguments') { + shouldRetry = true; + } + } + if (!shouldRetry) { + return Promise.resolve(); + } + try { + await this.queueBreaker.runCommand('enqueueAt', next, this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }]); - }).catch((err) => { + } catch (err) { winston.error(`failed to add to delayed queue for job ${job.id}: ${err}`); - - return Promise.resolve(); - }); + } } return Promise.resolve(); diff --git a/test/index.test.js b/test/index.test.js index a04fc5d..4bb6159 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -66,12 +66,12 @@ describe('index test', () => { }; multiWorker = function () { this.start = () => {}; - this.end = sinon.stub(); + this.end = sinon.stub().resolves(); }; scheduler = function () { - this.start = () => {}; - this.connect = () => {}; - this.end = sinon.stub(); + this.start = sinon.stub().resolves(); + this.connect = sinon.stub().resolves(); + this.end = sinon.stub().resolves(); }; util.inherits(multiWorker, EventEmitter); util.inherits(scheduler, EventEmitter);