diff --git a/README.md b/README.md index 57a7206..f3c2be1 100644 --- a/README.md +++ b/README.md @@ -345,6 +345,17 @@ queue.get((err, msg) => { }) ``` +You can also reset the job tries, effectively creating an atomic ack + add for the +same job using `resetTries`: + +```js +queue.get((err, msg) => { + queue.ping(msg.ack, { resetTries: true }, (err, id) => { + // This message now has 0 tries + }) +}) +``` + ### .total() ### Returns the total number of messages that has ever been in the queue, including diff --git a/mongodb-queue.js b/mongodb-queue.js index a712f1c..20d3833 100644 --- a/mongodb-queue.js +++ b/mongodb-queue.js @@ -175,6 +175,11 @@ Queue.prototype.ping = function(ack, opts, callback) { visible : nowPlusSecs(visibility) } } + + if (opts.resetTries) { + update.$set.tries = 0 + } + self.col.findOneAndUpdate(query, update, { returnOriginal : false }, function(err, msg, blah) { if (err) return callback(err) if ( !msg.value ) { diff --git a/test/ping.js b/test/ping.js index 298786a..5c61593 100644 --- a/test/ping.js +++ b/test/ping.js @@ -174,6 +174,61 @@ test("ping: check visibility option overrides the queue visibility", function(t) ) }) + test("ping: reset tries", function(t) { + var queue = mongoDbQueue(db, 'ping', { visibility: 3 }) + var msg + + async.series( + [ + function(next) { + queue.add('Hello, World!', function(err, id) { + t.ok(!err, 'There is no error when adding a message.') + t.ok(id, 'There is an id returned when adding a message.') + next() + }) + }, + function(next) { + queue.get(function(err, thisMsg) { + msg = thisMsg + // message should reset in three seconds + t.ok(msg.id, 'Got a msg.id (sanity check)') + setTimeout(next, 2 * 1000) + }) + }, + function(next) { + queue.ping(msg.ack, { resetTries: true }, function(err, id) { + t.ok(!err, 'No error when pinging a message') + t.ok(id, 'Received an id when acking this message') + // wait until the msg has returned to the queue + setTimeout(next, 6 * 1000) + }) + }, + function(next) { + queue.get(function(err, msg) { + t.equal(msg.tries, 1, 'Tries were reset') + queue.ack(msg.ack, function(err) { + t.ok(!err, 'No error when acking the message') + next() + }) + }) + }, + function(next) { + queue.get(function(err, msg) { + // no more messages + t.ok(!err, 'No error when getting no messages') + t.ok(!msg, 'No msg received') + next() + }) + } + ], + function(err) { + if (err) t.fail(err) + t.pass('Finished test ok') + t.end() + } + ) + }) + test('client.close()', function(t) { t.pass('client.close()') client.close()