-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
26 lines (22 loc) · 1002 Bytes
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
'use strict';
const defaultLogger = console;
module.exports = function({queue, logger = defaultLogger}) {
let consumeLoop = function(callback) {
logger.info(`Consuming messages from ${queue.queueUrl}`);
const processMessage = (message) => {
return Promise.resolve()
.then(() => callback(message.Body, message.MessageAttributes, message.MessageId))
.then(() => queue.deleteMessage(message))
.catch((err) => {
logger.error(`Error processing message ${message.MessageId}`, err, err.stack);
});
};
queue.getNextNonEmptyBatch() // eslint-disable-line promise/catch-or-return
.then((messages) => Promise.all(messages.map(m => processMessage(m, callback))))
.catch(err => {
logger.error('Error processing messages', err, err.stack);
})
.finally(() => consumeLoop(callback));
};
return consumeLoop;
};