Skip to content

Commit

Permalink
fix: use async/await (#68)
Browse files Browse the repository at this point in the history
* fix: use async/await

Co-authored-by: catto <[email protected]>

* fix

* fix try place

* fix scheduler and multi worker

* fix not able to use async/await in constructor

* refact code

* fix

* fix

* fix test
  • Loading branch information
yuichi10 authored and jithine committed Jul 16, 2019
1 parent 4a4104c commit ea92074
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 41 deletions.
61 changes: 24 additions & 37 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,14 @@ class ExecutorQueue extends Executor {
winston.info(`scheduler enqueuing job timestamp >> ${JSON.stringify(job)}`));

this.multiWorker.start();
this.scheduler.connect(() => {
this.scheduler.start();
});
this.scheduler.connect().then(() => this.scheduler.start());

process.on('SIGTERM', () => {
this.multiWorker.end((error) => {
if (error) {
winston.error(`failed to end the worker: ${error}`);
}

this.scheduler.end((err) => {
if (err) {
winston.error(`failed to end the scheduler: ${err}`);
process.exit(128);
}
process.exit(0);
});
this.multiWorker.end().catch((err) => {
winston.error(`failed to end the worker: ${err}`);
}).then(() => this.scheduler.end()).catch((err) => {
winston.error(`failed to end the scheduler: ${err}`);
process.exit(128);
});
});
}
Expand Down Expand Up @@ -326,31 +317,27 @@ class ExecutorQueue extends Executor {
})));

// Note: arguments to enqueueAt are [timestamp, queue name, job name, array of args]
return new Promise((resolve) => {
let shouldRetry = false;

this.queue.enqueueAt(next,
this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }], (err) => {
// Error thrown by node-resque if there is duplicate: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L65
// eslint-disable-next-line max-len
if (err && err.message !== 'Job already enqueued at this time with same arguments') {
shouldRetry = true;
}
});

return resolve(shouldRetry);
}).then((shouldRetry) => {
if (!shouldRetry) {
return Promise.resolve();
}
let shouldRetry = false;

return this.queueBreaker.runCommand('enqueueAt', next,
try {
await this.queue.enqueueAt(next, this.periodicBuildQueue,
'startDelayed', [{ jobId: job.id }]);
} catch (err) {
// Error thrown by node-resque if there is duplicate: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L65
// eslint-disable-next-line max-len
if (err && err.message !== 'Job already enqueued at this time with same arguments') {
shouldRetry = true;
}
}
if (!shouldRetry) {
return Promise.resolve();
}
try {
await this.queueBreaker.runCommand('enqueueAt', next,
this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }]);
}).catch((err) => {
} catch (err) {
winston.error(`failed to add to delayed queue for job ${job.id}: ${err}`);

return Promise.resolve();
});
}
}

return Promise.resolve();
Expand Down
8 changes: 4 additions & 4 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ describe('index test', () => {
};
multiWorker = function () {
this.start = () => {};
this.end = sinon.stub();
this.end = sinon.stub().resolves();
};
scheduler = function () {
this.start = () => {};
this.connect = () => {};
this.end = sinon.stub();
this.start = sinon.stub().resolves();
this.connect = sinon.stub().resolves();
this.end = sinon.stub().resolves();
};
util.inherits(multiWorker, EventEmitter);
util.inherits(scheduler, EventEmitter);
Expand Down

0 comments on commit ea92074

Please sign in to comment.