Skip to content

Commit

Permalink
Merge fork
Browse files Browse the repository at this point in the history
  • Loading branch information
anonrig committed Aug 29, 2017
2 parents d65a6ad + e70a2e1 commit bd45537
Show file tree
Hide file tree
Showing 11 changed files with 325 additions and 177 deletions.
30 changes: 15 additions & 15 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "microservice-kit",
"version": "0.3.14",
"description": "",
"version": "0.4.1",
"description": "Utility belt for building microservices",
"main": "src/index.js",
"scripts": {
"test": "./node_modules/mocha/bin/mocha"
"test": "./node_modules/mocha/bin/mocha --all"
},
"repository": {
"type": "git",
Expand All @@ -14,19 +14,19 @@
"license": "ISC",
"dependencies": {
"amqplib": "0.5.1",
"async": "^1.5.0",
"async-q": "^0.2.2",
"boom": "^5.1.0",
"chance": "^0.8.0",
"debug": "^2.2.0",
"lodash": "^3.10.1",
"uuid": "^3.1.0"
"async": "2.5.0",
"async-q": "0.3.1",
"boom": "^5.2.0",
"chance": "1.0.10",
"debug": "3.0.0",
"lodash": "4.17.4",
"uuid": "3.1.0"
},
"devDependencies": {
"chai": "^3.4.1",
"chai-as-promised": "^5.2.0",
"mocha": "^2.3.4",
"sinon": "^1.17.3",
"sinon-chai": "^2.8.0"
"chai": "4.1.1",
"chai-as-promised": "7.1.1",
"mocha": "3.5.0",
"sinon": "3.1.0",
"sinon-chai": "2.12.0"
}
}
28 changes: 20 additions & 8 deletions src/amqpkit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
const async = require('async-q');
const _ = require('lodash');
const amqp = require('amqplib');
const uuid = require('uuid');
const uuid = require('uuid/v4');
const debug = require('debug')('microservice-kit:amqpkit');
const url = require('url');

const Message = require('./lib/message');
const Response = require('./lib/response');
Expand Down Expand Up @@ -44,6 +45,12 @@ class AmqpKit {
throw new Error('MicroserviceKit init failed. ' +
'options.queues must be an array.');

if (this.options_.url) {
this.options_.connectionOptions = _.assign(this.options_.connectionOptions, {
servername: url.parse(this.options_.url).hostname
});
}

return amqp
.connect(this.options_.url, this.options_.connectionOptions)
.then((connection) => {
Expand Down Expand Up @@ -105,12 +112,17 @@ class AmqpKit {

ShutdownKit.addJob((done) => {
debug('Closing connection...');
this.connection
.close()
.then(() => {
done();
})
.catch(done);
try {
this.connection
.close()
.then(() => {
done();
})
.catch(done);
} catch (err) {
debug('Could not close connection', err);
done();
}
});
}

Expand Down Expand Up @@ -157,7 +169,7 @@ class AmqpKit {
return Promise.reject(new Error('You cannot create queue with same key more than once.'));

if (!name && opt_options && opt_options.exclusive)
name = this.options_.id + '-' + 'excl' + '-' + uuid.v4().split('-')[0];
name = this.options_.id + '-' + 'excl' + '-' + uuid().split('-')[0];

const queue = new Queue({
channel: this.channel,
Expand Down
8 changes: 6 additions & 2 deletions src/lib/exchange.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const debug = require('debug')('microservice-kit:lib:exchange');
const async = require('async-q');
const _ = require('lodash');
const uuid = require('uuid');
const uuid = require('uuid/v4');
const Message = require('./message');
const Response = require('./response');

Expand Down Expand Up @@ -66,9 +66,13 @@ class Exchange {
return Promise.resolve(this.channel.publish(this.name, routingKey, content, options));
}

options.correlationId = uuid.v4();
options.correlationId = uuid();
options.replyTo = this.rpc_.getUniqueQueueName();

if (_.isNumber(options.timeout) && options.timeout > 0) {
options.expiration = options.timeout.toString();
}

const rv = new Promise((resolve, reject) => {
this.log_('info', 'Publishing event', {
eventName,
Expand Down
8 changes: 6 additions & 2 deletions src/lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const debug = require('debug')('microservice-kit:lib:queue');
const async = require('async-q');
const uuid = require('uuid');
const uuid = require('uuid/v4');
const _ = require('lodash');
const Message = require('./message');
const Exchange = require('./exchange');
Expand Down Expand Up @@ -187,9 +187,13 @@ class Queue {
return Promise.resolve(this.channel.sendToQueue(queue, content, options));
}

options.correlationId = uuid.v4();
options.correlationId = uuid();
options.replyTo = this.rpc_.getUniqueQueueName();

if (_.isNumber(options.timeout) && options.timeout > 0) {
options.expiration = options.timeout.toString();
}

const rv = new Promise((resolve, reject) => {
this.log_('info', 'Sending event to queue', {
eventName,
Expand Down
1 change: 0 additions & 1 deletion src/lib/rpc.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"use strict";

const debug = require('debug')('microservice-kit:lib:rpc');
const uuid = require('uuid');
const _ = require('lodash');
const Response = require('./response');
const Queue = require('./queue');
Expand Down
4 changes: 2 additions & 2 deletions src/microservicekit.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const _ = require('lodash');
const fs = require('fs');
const uuid = require('uuid');
const uuid = require('uuid/v4');
const debug = require('debug')('microservice-kit:microservicekit');
const Chance = require('chance');

Expand All @@ -13,7 +13,7 @@ const ShutdownKit = require('./shutdownkit');
class MicroserviceKit {
constructor(opt_options) {
this.options_ = _.assign({}, this.defaults, opt_options || {});
this.id = new Chance().first().toLowerCase() + '-' + uuid.v4().split('-')[0];
this.id = new Chance().first().toLowerCase() + '-' + uuid().split('-')[0];
this.amqpKit = null;
this.shutdownKit = ShutdownKit;

Expand Down
3 changes: 3 additions & 0 deletions src/shutdownkit.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ShutdownKit {
this.jobs_ = [];
this.bindEvents_();
this.logger_ = null;
this.isShuttingDown = false;
}


Expand Down Expand Up @@ -67,6 +68,8 @@ class ShutdownKit {
*/
gracefulShutdown() {
// TODO: Add a timeout maybe?
if (this.isShuttingDown) return;
this.isShuttingDown = true;
this.log_('info', 'Trying to shutdown gracefully...');
async.series(this.jobs_.reverse(), (err) => {
if (err) {
Expand Down
8 changes: 4 additions & 4 deletions test/exchange-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ describe('Exchange', function() {
})

it('publishEvent fails without eventName 1', function () {
return this.exchange.publishEvent().should.eventually.fails;
return this.exchange.publishEvent().should.eventually.rejected;
});

it('publishEvent fails without eventName 2', function () {
return this.exchange.publishEvent('routing-key', null, {foo: 'bar'}).should.eventually.fails;
return this.exchange.publishEvent('routing-key', null, {foo: 'bar'}).should.eventually.rejected;
});

it('publishEvent does not fails without routing key', function () {
Expand All @@ -119,11 +119,11 @@ describe('Exchange', function() {
})

it('publishEvent fails without eventName 1', function () {
return this.exchange.publishEvent().should.eventually.fails;
return this.exchange.publishEvent().should.eventually.rejected;
});

it('publishEvent fails without eventName 2', function () {
return this.exchange.publishEvent(null, {foo: 'bar'}).should.eventually.fails;
return this.exchange.publishEvent(null, {foo: 'bar'}).should.eventually.rejected;
});

it('rpc.getUniqueQueueName() should be called', function () {
Expand Down
10 changes: 5 additions & 5 deletions test/queue-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ describe('Queue', function() {


it('sendEvent fails without eventName 2', function () {
return this.queue.sendEvent(null, {foo: 'bar'}).should.eventually.fails;
return this.queue.sendEvent(null, {foo: 'bar'}).should.eventually.rejected;
});
})
})
Expand All @@ -183,11 +183,11 @@ describe('Queue', function() {
})

it('sendEvent fails without eventName 1', function () {
return this.queue.sendEvent().should.eventually.fails;
return this.queue.sendEvent().should.eventually.rejected;
});

it('sendEvent fails without eventName 2', function () {
return this.queue.sendEvent(null, {foo: 'bar'}).should.eventually.fails;
return this.queue.sendEvent(null, {foo: 'bar'}).should.eventually.rejected;
});
})

Expand All @@ -208,11 +208,11 @@ describe('Queue', function() {
})

it('sendEvent fails without eventName 1', function () {
return this.queue.sendEvent().should.eventually.fails;
return this.queue.sendEvent().should.eventually.rejected;
});

it('sendEvent fails without eventName 2', function () {
return this.queue.sendEvent(null, {foo: 'bar'}).should.eventually.fails;
return this.queue.sendEvent(null, {foo: 'bar'}).should.eventually.rejected;
});

it('rpc.getUniqueQueueName() should be called', function () {
Expand Down
5 changes: 3 additions & 2 deletions test/rpc-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,16 @@ describe('RPC', function() {

it('should reject if there is an error', function() {
const msg = Message.mock();
const errorObject = new ErrorTypes.InternalError('Something wrong');
const errMessage = 'Something wrong';
const errorObject = new ErrorTypes.InternalError(errMessage);
msg.properties.correlationId = 'id1';
msg.content.toString = () => {
return JSON.stringify({
err: errorObject
})
};
this.rpc.consumer(msg);
this.callbacks.reject.should.calledWith(errorObject);
this.callbacks.reject.should.have.been.calledWithMatch(sinon.match({message: errMessage, name: 'InternalError'}));
this.callbacks.resolve.should.not.called;
})

Expand Down
Loading

0 comments on commit bd45537

Please sign in to comment.