From dc285afe7c5f5721060f4e67a5b6a1015216afc4 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Fri, 25 Jun 2021 16:01:15 +0100 Subject: [PATCH] Option to reset tries in `ping()` The `ping()` method can be useful for setting up recurring jobs if we deliberately avoid acking the job. For example: 1. Submit job 2. Pull job from queue 3. Process job 4. `ping()` 5. Go to Step 2 A real-world example of this might be notifying for recurring appointments, or setting up long-running, cross-process, periodic jobs. The main advantage this has over using `ack()` and `add()` is that it effectively requeues a job in a single, atomic commit. If we tried the above with `ack()` and `add()`: 1. Submit job 2. Pull job from queue 3. Process job 4. `ack()` 5. `add()` 6. Go to Step 2 In this version, the process could crash or quit between Steps 4 & 5, and our recurring job would be lost. We could also try inverting Steps 4 & 5, but then we get the opposite issue: if the process crashes or quits, then we might accidentally duplicate our recurring job. It also prevents us from setting up any unique indexes on our `payload`. Using `ping()` perfectly solves this problem: there's only ever one version of the job, and it's never dropped (because it's never acked). If the process crashes before we `ping()`, we'll retry it, as with any other normal job. The one issue with this approach is that `tries` will steadily increase, and - if you have `maxRetries` set up - the job will eventually be moved to the dead queue, which isn't what we want. This change adds an option to the `ping()` method: `resetTries`, which will reset `tries` to zero, so that the job is treated like a "new" job when it's pinged, and is only moved to the dead queue if it's genuinely retried. --- README.md | 11 ++++++++++ mongodb-queue.js | 5 +++++ test/ping.js | 55 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/README.md b/README.md index 57a7206..f3c2be1 100644 --- a/README.md +++ b/README.md @@ -345,6 +345,17 @@ queue.get((err, msg) => { }) ``` +You can also reset the job tries, effectively creating an atomic ack + add for the +same job using `resetTries`: + +```js +queue.get((err, msg) => { + queue.ping(msg.ack, { resetTries: true }, (err, id) => { + // This message now has 0 tries + }) +}) +``` + ### .total() ### Returns the total number of messages that has ever been in the queue, including diff --git a/mongodb-queue.js b/mongodb-queue.js index a712f1c..20d3833 100644 --- a/mongodb-queue.js +++ b/mongodb-queue.js @@ -175,6 +175,11 @@ Queue.prototype.ping = function(ack, opts, callback) { visible : nowPlusSecs(visibility) } } + + if (opts.resetTries) { + update.$set.tries = 0 + } + self.col.findOneAndUpdate(query, update, { returnOriginal : false }, function(err, msg, blah) { if (err) return callback(err) if ( !msg.value ) { diff --git a/test/ping.js b/test/ping.js index 298786a..5c61593 100644 --- a/test/ping.js +++ b/test/ping.js @@ -174,6 +174,61 @@ test("ping: check visibility option overrides the queue visibility", function(t) ) }) + test("ping: reset tries", function(t) { + var queue = mongoDbQueue(db, 'ping', { visibility: 3 }) + var msg + + async.series( + [ + function(next) { + queue.add('Hello, World!', function(err, id) { + t.ok(!err, 'There is no error when adding a message.') + t.ok(id, 'There is an id returned when adding a message.') + next() + }) + }, + function(next) { + queue.get(function(err, thisMsg) { + msg = thisMsg + // message should reset in three seconds + t.ok(msg.id, 'Got a msg.id (sanity check)') + setTimeout(next, 2 * 1000) + }) + }, + function(next) { + queue.ping(msg.ack, { resetTries: true }, function(err, id) { + t.ok(!err, 'No error when pinging a message') + t.ok(id, 'Received an id when acking this message') + // wait until the msg has returned to the queue + setTimeout(next, 6 * 1000) + }) + }, + function(next) { + queue.get(function(err, msg) { + t.equal(msg.tries, 1, 'Tries were reset') + queue.ack(msg.ack, function(err) { + t.ok(!err, 'No error when acking the message') + next() + }) + }) + }, + function(next) { + queue.get(function(err, msg) { + // no more messages + t.ok(!err, 'No error when getting no messages') + t.ok(!msg, 'No msg received') + next() + }) + } + ], + function(err) { + if (err) t.fail(err) + t.pass('Finished test ok') + t.end() + } + ) + }) + test('client.close()', function(t) { t.pass('client.close()') client.close()