Skip to content

Commit

Permalink
Merge pull request #18 from signalive/deniz
Browse files Browse the repository at this point in the history
Deniz
  • Loading branch information
mertdogar authored Jun 29, 2016
2 parents 6cbb74c + dabfcf1 commit deb5cdc
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 20 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "microservice-kit",
"version": "0.3.10",
"version": "0.3.11",
"description": "",
"main": "src/index.js",
"scripts": {
Expand Down
15 changes: 11 additions & 4 deletions src/lib/exchange.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ class Exchange {
const content = new Buffer(JSON.stringify(message.toJSON() || {}));

if (!this.rpc_ || options.dontExpectRpc) {
this.log_('info', 'Publishing ' + eventName + ' event to exchange ' + this.key +
(routingKey ? ' for ' + routingKey : '') + ' without rpc');
this.log_('info', 'Publishing event', {
eventName,
routingKey,
exchange: this.key
});

return Promise.resolve(this.channel.publish(this.name, routingKey, content, options));
}
Expand All @@ -67,8 +70,12 @@ class Exchange {
options.replyTo = this.rpc_.getUniqueQueueName();

const rv = new Promise((resolve, reject) => {
this.log_('info', 'Sending ' + eventName + ' event to queue ' + this.key +
(routingKey ? ' for ' + routingKey : '') + ' with correlation id ' + options.correlationId);
this.log_('info', 'Publishing event', {
eventName,
routingKey,
correlationId: options.correlationId,
exchange: this.key
});

this.channel.publish(this.name, routingKey, content, options);
this.rpc_.registerCallback(options.correlationId, {reject, resolve}, options.timeout);
Expand Down
39 changes: 27 additions & 12 deletions src/lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,18 @@ class Queue {
const message = Message.parse(data);
const recievedAt = new Date();

if (msg.properties.correlationId)
this.log_('info', 'Recieved ' + message.eventName + ' event with correlation id ' + msg.properties.correlationId);
else
this.log_('info', 'Recieved ' + message.eventName + ' event without correlation');
this.log_('info', 'Recieved event', {
correlationId: msg.properties.correlationId,
eventName: message.eventName
});

const done = (err, data) => {
const duration = new Date() - recievedAt;
const logPayload = {
duration,
eventName: message.eventName
};

if (msg.properties.replyTo && msg.properties.correlationId) {
const response = new Response(err, data, true);
this.channel.sendToQueue(
Expand All @@ -73,11 +79,10 @@ class Queue {
{correlationId: msg.properties.correlationId}
);

this.log_('info', 'Consumed ' + message.eventName + ' event with correlation id ' +
msg.properties.correlationId + ' in ' + (new Date() - recievedAt) + ' ms');
} else
this.log_('info', 'Consumed ' + message.eventName + ' event without correlation ' +
'in ' + (new Date() - recievedAt) + ' ms');
logPayload.correlationId = msg.properties.correlationId;
}

this.log_('info', `Consumed event`, logPayload);

if (!options.noAck)
this.channel.ack(msg);
Expand All @@ -98,7 +103,7 @@ class Queue {

this.consumer_ && this.consumer_(data, done, progress, routingKey);
} catch(err) {
this.log_('error', 'Error while consuming message:' + msg.content, err);
this.log_('error', 'Error while consuming message', {err, content: msg.content});

if (!options.noAck) {
this.log_('info', 'Negative acknowledging...');
Expand Down Expand Up @@ -174,15 +179,25 @@ class Queue {
const content = new Buffer(JSON.stringify(message.toJSON() || {}));

if (!this.rpc_ || options.dontExpectRpc) {
this.log_('info', 'Sending ' + eventName + ' event to queue ' + (this.name || this.getUniqueName()) + ' without rpc');
this.log_('info', 'Sending event to queue', {
eventName,
target: this.name || this.getUniqueName()
});

return Promise.resolve(this.channel.sendToQueue(queue, content, options));
}

options.correlationId = uuid.v4();
options.replyTo = this.rpc_.getUniqueQueueName();

const rv = new Promise((resolve, reject) => {
this.log_('info', 'Sending ' + eventName + ' event to queue ' + (this.name || this.getUniqueName()) + ' with correlation id ' + options.correlationId);
this.log_('info', 'Sending event to queue', {
eventName,
correlationId: options.correlationId,
target: this.name || this.getUniqueName(),

});

this.channel.sendToQueue(queue, content, options);
this.rpc_.registerCallback(options.correlationId, {resolve, reject}, options.timeout);
});
Expand Down
7 changes: 4 additions & 3 deletions src/lib/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class RPC {
}

if (this.registerDates_[correlationId]) {
this.log_('info', 'Got response for correlation id ' + correlationId + ' after ' + (new Date() - this.registerDates_[correlationId]) + ' ms');
const duration = new Date() - this.registerDates_[correlationId];
this.log_('info', 'Got response', {correlationId, duration});
delete this.registerDates_[correlationId];
}

Expand All @@ -90,7 +91,7 @@ class RPC {

delete this.callbacks_[correlationId];
} catch(err) {
this.log_('error', 'Cannot consume rpc message, probably json parse error.', msg, err);
this.log_('error', 'Cannot consume rpc message, probably json parse error.', {msg, err});
}
}

Expand All @@ -106,7 +107,7 @@ class RPC {
this.timeouts_[key] = setTimeout(() => {
const callbacks = this.callbacks_[key];
callbacks && callbacks.reject && callbacks.reject(new Error('Timeout exceed.'));
this.log_('info', 'Timeout exceed for correlation id ' + key);
this.log_('info', 'Timeout exceed', {correlationId: key});
delete this.callbacks_[key];
delete this.timeouts_[key];
delete this.registerDates_[key];
Expand Down

0 comments on commit deb5cdc

Please sign in to comment.