Skip to content

Commit

Permalink
Merge pull request #61 from screwdriver-cd/fix
Browse files Browse the repository at this point in the history
fix: check against enqueueAt err msg for duplicate
  • Loading branch information
minzcmu authored May 17, 2019
2 parents c3783e7 + d8e7010 commit 5b8514d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
41 changes: 22 additions & 19 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -316,29 +316,32 @@ class ExecutorQueue extends Executor {
triggerBuild: false
})));

// If the job already exists, do not re-enqueue
// Node-resque store timestamp only to seconds: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L61
const jobs = await this.redisBreaker.runCommand('lrange',
`resque:delayed:${next / 1000}`, 0, -1);

// example job: "{\"class\":\"startDelayed\",\"queue\":\"periodicBuilds\",\"args\":[{\"jobId\":212502}]}"
if (jobs && jobs.length > 0) {
const parsedJobs = jobs.map(j => JSON.parse(j));

if (parsedJobs.find(j => j.class === 'startDelayed'
&& j.args[0].jobId === job.id)) {
// Note: arguments to enqueueAt are [timestamp, queue name, job name, array of args]
return new Promise((resolve) => {
let shouldRetry = false;

this.queue.enqueueAt(next,
this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }], (err) => {
// Error thrown by node-resque if there is duplicate: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L65
// eslint-disable-next-line max-len
if (err && err.message !== 'Job already enqueued at this time with same arguments') {
shouldRetry = true;
}
});

return resolve(shouldRetry);
}).then((shouldRetry) => {
if (!shouldRetry) {
return Promise.resolve();
}
}

// Note: arguments to enqueueAt are [timestamp, queue name, job name, array of args]
return this.queueBreaker.runCommand('enqueueAt', next,
this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }])
.catch((err) => {
winston.error(`failed to add to delayed queue for job ${job.id}: ${err}`);
return this.queueBreaker.runCommand('enqueueAt', next,
this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }]);
}).catch((err) => {
winston.error(`failed to add to delayed queue for job ${job.id}: ${err}`);

return Promise.resolve();
});
return Promise.resolve();
});
}

return Promise.resolve();
Expand Down
13 changes: 4 additions & 9 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ describe('index test', () => {
hdel: sinon.stub().yieldsAsync(),
hset: sinon.stub().yieldsAsync(),
set: sinon.stub().yieldsAsync(),
expire: sinon.stub().yieldsAsync(),
lrange: sinon.stub().yieldsAsync()
expire: sinon.stub().yieldsAsync()
};
redisConstructorMock = sinon.stub().returns(redisMock);
cronMock = {
Expand Down Expand Up @@ -257,17 +256,13 @@ describe('index test', () => {
}));

it('do not enqueue the same delayed job in the queue', () => {
const job = {
class: 'startDelayed',
queue: 'periodicBuilds',
args: [{ jobId: testJob.id }]
};
const err = new Error('Job already enqueued at this time with same arguments');

redisMock.lrange = sinon.stub().yieldsAsync(null, [JSON.stringify(job)]);
queueMock.enqueueAt = sinon.stub().yieldsAsync(err);

return executor.startPeriodic(testDelayedConfig).then(() => {
assert.calledWith(cronMock.next, 'H H H H H');
assert.notCalled(queueMock.enqueueAt);
assert.calledOnce(queueMock.enqueueAt);
});
});

Expand Down

0 comments on commit 5b8514d

Please sign in to comment.