Skip to content
This repository has been archived by the owner on Oct 24, 2024. It is now read-only.

Option to reset tries in ping() #40

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,17 @@ queue.get((err, msg) => {
})
```

You can also reset the job tries, effectively creating an atomic ack + add for the
same job using `resetTries`:

```js
queue.get((err, msg) => {
queue.ping(msg.ack, { resetTries: true }, (err, id) => {
// This message now has 0 tries
})
})
```

### .total() ###

Returns the total number of messages that has ever been in the queue, including
Expand Down
5 changes: 5 additions & 0 deletions mongodb-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ Queue.prototype.ping = function(ack, opts, callback) {
visible : nowPlusSecs(visibility)
}
}

if (opts.resetTries) {
update.$set.tries = 0
}

self.col.findOneAndUpdate(query, update, { returnOriginal : false }, function(err, msg, blah) {
if (err) return callback(err)
if ( !msg.value ) {
Expand Down
55 changes: 55 additions & 0 deletions test/ping.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,61 @@ test("ping: check visibility option overrides the queue visibility", function(t)
)
})

test("ping: reset tries", function(t) {
var queue = mongoDbQueue(db, 'ping', { visibility: 3 })
var msg

async.series(
[
function(next) {
queue.add('Hello, World!', function(err, id) {
t.ok(!err, 'There is no error when adding a message.')
t.ok(id, 'There is an id returned when adding a message.')
next()
})
},
function(next) {
queue.get(function(err, thisMsg) {
msg = thisMsg
// message should reset in three seconds
t.ok(msg.id, 'Got a msg.id (sanity check)')
setTimeout(next, 2 * 1000)
})
},
function(next) {
queue.ping(msg.ack, { resetTries: true }, function(err, id) {
t.ok(!err, 'No error when pinging a message')
t.ok(id, 'Received an id when acking this message')
// wait until the msg has returned to the queue
setTimeout(next, 6 * 1000)
})
},
function(next) {
queue.get(function(err, msg) {
t.equal(msg.tries, 1, 'Tries were reset')
queue.ack(msg.ack, function(err) {
t.ok(!err, 'No error when acking the message')
next()
})
})
},
function(next) {
queue.get(function(err, msg) {
// no more messages
t.ok(!err, 'No error when getting no messages')
t.ok(!msg, 'No msg received')
next()
})
}
],
function(err) {
if (err) t.fail(err)
t.pass('Finished test ok')
t.end()
}
)
})

test('client.close()', function(t) {
t.pass('client.close()')
client.close()
Expand Down