Skip to content

Commit

Permalink
Merge pull request #6 from civicteam/feature/log_improvement
Browse files Browse the repository at this point in the history
Dependency upgrades
  • Loading branch information
dankelleher authored Jun 22, 2020
2 parents 810ac2a + ebdf173 commit fcf9d4b
Show file tree
Hide file tree
Showing 7 changed files with 2,639 additions and 1,774 deletions.
6 changes: 3 additions & 3 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ rules:
- printWidth: 120
singleQuote: true
overrides:
files: "*.test.js"
rules:
no-unused-expressions: "off"
- files: "*.test.js"
rules:
no-unused-expressions: "off"
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v12.13
21 changes: 12 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ let connection;
const disconnectTasks = [];
let clientClosed = false;

const sleepSeconds = seconds => new Promise(resolve => setTimeout(resolve, seconds * 1000));
const sleepSeconds = (seconds) => new Promise((resolve) => setTimeout(resolve, seconds * 1000));

async function bindStream(fn, config) {
const { maxRetries = Infinity, server = 'amqp://localhost', reconnectDelay = 5 } = config;
Expand All @@ -14,7 +14,10 @@ async function bindStream(fn, config) {
function consume(message) {
const content = JSON.parse(message.content);

fn(content.data);
// for backwards-compatibility. sometimes the message content is double-strinigified
const payload = content.data || JSON.parse(content).data

fn(payload);
}

async function initialize() {
Expand All @@ -26,12 +29,12 @@ async function bindStream(fn, config) {
channel = await connection.createChannel();

// If some error causes the channel or the connection to go down, attempt to reconnect
channel.on('error', e => {
channel.on('error', (e) => {
console.error('Feathers-AMQP-Client: Channel closed with error', { reason: e });
attemptReconnect();
});

connection.on('error', e => {
connection.on('error', (e) => {
console.error('Feathers-AMQP-Client: Connection closed with error', { reason: e });
attemptReconnect();
});
Expand All @@ -46,7 +49,7 @@ async function bindStream(fn, config) {
});

channel.assertExchange(config.exchange.name, config.exchange.type || 'fanout', {
durable: config.durable || false
durable: config.durable || false,
});

const queue = await channel.assertQueue(config.queue.name, { exclusive: config.queue.exclusive || false });
Expand All @@ -58,7 +61,7 @@ async function bindStream(fn, config) {
disconnectTasks.push(close);

console.log('Feathers-AMQP-Client: Waiting for messages in %s.', config.queue.name);
return channel.consume(queue.queue, message => consume(message), { noAck: true });
return channel.consume(queue.queue, (message) => consume(message), { noAck: true });
}

async function close() {
Expand Down Expand Up @@ -89,7 +92,7 @@ async function bindStream(fn, config) {
);
sleepSeconds(reconnectDelay)
.then(() => initialize(reconnectRetries - 1))
.catch(error => {
.catch((error) => {
// reconnection failed - try again (decrementing retries)
console.error('Feathers-AMQP-Client: Reconnection failed', { reason: error });
return attemptReconnect(reconnectRetries - 1);
Expand All @@ -103,10 +106,10 @@ async function bindStream(fn, config) {
}

function disconnect() {
return Promise.all(disconnectTasks.map(fn => fn()));
return Promise.all(disconnectTasks.map((fn) => fn()));
}

module.exports = {
bindStream,
disconnect
disconnect,
};
Loading

0 comments on commit fcf9d4b

Please sign in to comment.