diff --git a/README.md b/README.md index b64e50b61..b4497dc97 100644 --- a/README.md +++ b/README.md @@ -322,6 +322,7 @@ the `connect` event. Typically a `net.Socket`. urls which upon reconnect can have become expired. * `resubscribe` : if connection is broken and reconnects, subscribed topics are automatically subscribed again (default `true`) + * `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided. In case mqtts (mqtt over tls) is required, the `options` object is passed through to diff --git a/lib/client.js b/lib/client.js index 07e8de115..c481e7a04 100644 --- a/lib/client.js +++ b/lib/client.js @@ -6,6 +6,7 @@ var EventEmitter = require('events').EventEmitter var Store = require('./store') var mqttPacket = require('mqtt-packet') +var DefaultMessageIdProvider = require('./default-message-id-provider') var Writable = require('readable-stream').Writable var inherits = require('inherits') var reInterval = require('reinterval') @@ -184,6 +185,8 @@ function MqttClient (streamBuilder, options) { this.streamBuilder = streamBuilder + this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider + // Inflight message storages this.outgoingStore = options.outgoingStore || new Store() this.incomingStore = options.incomingStore || new Store() @@ -213,11 +216,8 @@ function MqttClient (streamBuilder, options) { this._storeProcessing = false // Packet Ids are put into the store during store processing this._packetIdsDuringStoreProcessing = {} - /** - * MessageIDs starting with 1 - * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 - */ - this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) + // Store processing queue + this._storeProcessingQueue = [] // Inflight callbacks this.outgoing = {} @@ -240,15 +240,29 @@ function MqttClient (streamBuilder, options) { packet = entry.packet debug('deliver :: call _sendPacket for %o', packet) - that._sendPacket( - packet, - function (err) { - if (entry.cb) { - entry.cb(err) + var send = true + if (packet.messageId && packet.messageId !== 0) { + if (!that.messageIdProvider.register(packet.messageId)) { + packet.messageeId = that.messageIdProvider.allocate() + if (packet.messageId === null) { + send = false } - deliver() } - ) + } + if (send) { + that._sendPacket( + packet, + function (err) { + if (entry.cb) { + entry.cb(err) + } + deliver() + } + ) + } else { + debug('messageId: %d has already used.', packet.messageId) + deliver() + } } debug('connect :: sending queued packets') @@ -490,60 +504,72 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) { return this } - packet = { - cmd: 'publish', - topic: topic, - payload: message, - qos: opts.qos, - retain: opts.retain, - messageId: this._nextId(), - dup: opts.dup - } - - if (options.protocolVersion === 5) { - packet.properties = opts.properties - if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && - ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || - (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { - /* - if we are don`t setup topic alias or - topic alias maximum less than topic alias or - server don`t give topic alias maximum, - we are removing topic alias from packet - */ - delete packet.properties.topicAlias + var that = this + var publishProc = function () { + var messageId = 0 + if (opts.qos === 1 || opts.qos === 2) { + messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + } + packet = { + cmd: 'publish', + topic: topic, + payload: message, + qos: opts.qos, + retain: opts.retain, + messageId: messageId, + dup: opts.dup } - } - debug('publish :: qos', opts.qos) - switch (opts.qos) { - case 1: - case 2: - // Add to callbacks - this.outgoing[packet.messageId] = { - volatile: false, - cb: callback || nop + if (options.protocolVersion === 5) { + packet.properties = opts.properties + if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && + ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || + (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { + /* + if we are don`t setup topic alias or + topic alias maximum less than topic alias or + server don`t give topic alias maximum, + we are removing topic alias from packet + */ + delete packet.properties.topicAlias } - if (this._storeProcessing) { - debug('_storeProcessing enabled') - this._packetIdsDuringStoreProcessing[packet.messageId] = false - this._storePacket(packet, undefined, opts.cbStorePut) - } else { + } + + debug('publish :: qos', opts.qos) + switch (opts.qos) { + case 1: + case 2: + // Add to callbacks + that.outgoing[packet.messageId] = { + volatile: false, + cb: callback || nop + } debug('MqttClient:publish: packet cmd: %s', packet.cmd) - this._sendPacket(packet, undefined, opts.cbStorePut) - } - break - default: - if (this._storeProcessing) { - debug('_storeProcessing enabled') - this._storePacket(packet, callback, opts.cbStorePut) - } else { + that._sendPacket(packet, undefined, opts.cbStorePut) + break + default: debug('MqttClient:publish: packet cmd: %s', packet.cmd) - this._sendPacket(packet, callback, opts.cbStorePut) - } - break + that._sendPacket(packet, callback, opts.cbStorePut) + break + } + return true } + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': publishProc, + 'cbStorePut': opts.cbStorePut, + 'callback': callback + } + ) + } else { + publishProc() + } return this } @@ -564,7 +590,7 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) { * @example client.subscribe('topic', console.log); */ MqttClient.prototype.subscribe = function () { - var packet + var that = this var args = new Array(arguments.length) for (var i = 0; i < arguments.length; i++) { args[i] = arguments[i] @@ -574,8 +600,6 @@ MqttClient.prototype.subscribe = function () { var resubscribe = obj.resubscribe var callback = args.pop() || nop var opts = args.pop() - var invalidTopic - var that = this var version = this.options.protocolVersion delete obj.resubscribe @@ -589,7 +613,7 @@ MqttClient.prototype.subscribe = function () { callback = nop } - invalidTopic = validations.validateTopics(obj) + var invalidTopic = validations.validateTopics(obj) if (invalidTopic !== null) { setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) return this @@ -654,59 +678,79 @@ MqttClient.prototype.subscribe = function () { }) } - packet = { - cmd: 'subscribe', - subscriptions: subs, - qos: 1, - retain: false, - dup: false, - messageId: this._nextId() - } - - if (opts.properties) { - packet.properties = opts.properties - } - if (!subs.length) { callback(null, []) - return + return this } - // subscriptions to resubscribe to in case of disconnect - if (this.options.resubscribe) { - debug('subscribe :: resubscribe true') - var topics = [] - subs.forEach(function (sub) { - if (that.options.reconnectPeriod > 0) { - var topic = { qos: sub.qos } - if (version === 5) { - topic.nl = sub.nl || false - topic.rap = sub.rap || false - topic.rh = sub.rh || 0 - topic.properties = sub.properties + var subscribeProc = function () { + var messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + + var packet = { + cmd: 'subscribe', + subscriptions: subs, + qos: 1, + retain: false, + dup: false, + messageId: messageId + } + + if (opts.properties) { + packet.properties = opts.properties + } + + // subscriptions to resubscribe to in case of disconnect + if (that.options.resubscribe) { + debug('subscribe :: resubscribe true') + var topics = [] + subs.forEach(function (sub) { + if (that.options.reconnectPeriod > 0) { + var topic = { qos: sub.qos } + if (version === 5) { + topic.nl = sub.nl || false + topic.rap = sub.rap || false + topic.rh = sub.rh || 0 + topic.properties = sub.properties + } + that._resubscribeTopics[sub.topic] = topic + topics.push(sub.topic) } - that._resubscribeTopics[sub.topic] = topic - topics.push(sub.topic) - } - }) - that.messageIdToTopic[packet.messageId] = topics - } + }) + that.messageIdToTopic[packet.messageId] = topics + } - this.outgoing[packet.messageId] = { - volatile: true, - cb: function (err, packet) { - if (!err) { - var granted = packet.granted - for (var i = 0; i < granted.length; i += 1) { - subs[i].qos = granted[i] + that.outgoing[packet.messageId] = { + volatile: true, + cb: function (err, packet) { + if (!err) { + var granted = packet.granted + for (var i = 0; i < granted.length; i += 1) { + subs[i].qos = granted[i] + } } - } - callback(err, subs) + callback(err, subs) + } } + debug('subscribe :: call _sendPacket') + that._sendPacket(packet) + return true + } + + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': subscribeProc, + 'callback': callback + } + ) + } else { + subscribeProc() } - debug('subscribe :: call _sendPacket') - this._sendPacket(packet) return this } @@ -724,11 +768,6 @@ MqttClient.prototype.subscribe = function () { * @example client.unsubscribe('topic', console.log); */ MqttClient.prototype.unsubscribe = function () { - var packet = { - cmd: 'unsubscribe', - qos: 1, - messageId: this._nextId() - } var that = this var args = new Array(arguments.length) for (var i = 0; i < arguments.length; i++) { @@ -737,7 +776,6 @@ MqttClient.prototype.unsubscribe = function () { var topic = args.shift() var callback = args.pop() || nop var opts = args.pop() - if (typeof topic === 'string') { topic = [topic] } @@ -747,33 +785,65 @@ MqttClient.prototype.unsubscribe = function () { callback = nop } - if (this._checkDisconnecting(callback)) { + var invalidTopic = validations.validateTopics(topic) + if (invalidTopic !== null) { + setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) return this } - if (typeof topic === 'string') { - packet.unsubscriptions = [topic] - } else if (Array.isArray(topic)) { - packet.unsubscriptions = topic + if (that._checkDisconnecting(callback)) { + return this } - if (this.options.resubscribe) { - packet.unsubscriptions.forEach(function (topic) { - delete that._resubscribeTopics[topic] - }) - } + var unsubscribeProc = function () { + var messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + var packet = { + cmd: 'unsubscribe', + qos: 1, + messageId: messageId + } - if (typeof opts === 'object' && opts.properties) { - packet.properties = opts.properties - } + if (typeof topic === 'string') { + packet.unsubscriptions = [topic] + } else if (Array.isArray(topic)) { + packet.unsubscriptions = topic + } + + if (that.options.resubscribe) { + packet.unsubscriptions.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } - this.outgoing[packet.messageId] = { - volatile: true, - cb: callback + if (typeof opts === 'object' && opts.properties) { + packet.properties = opts.properties + } + + that.outgoing[packet.messageId] = { + volatile: true, + cb: callback + } + + debug('unsubscribe: call _sendPacket') + that._sendPacket(packet) + + return true } - debug('unsubscribe: call _sendPacket') - this._sendPacket(packet) + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': unsubscribeProc, + 'callback': callback + } + ) + } else { + unsubscribeProc() + } return this } @@ -874,7 +944,7 @@ MqttClient.prototype.end = function (force, opts, cb) { * @returns {MqttClient} this - for chaining * @api public * - * @example client.removeOutgoingMessage(client.getLastMessageId()); + * @example client.removeOutgoingMessage(client.getLastAllocated()); */ MqttClient.prototype.removeOutgoingMessage = function (messageId) { var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null @@ -1334,6 +1404,8 @@ MqttClient.prototype._handleAck = function (packet) { } delete this.outgoing[messageId] this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() break case 'pubrec': response = { @@ -1353,6 +1425,7 @@ MqttClient.prototype._handleAck = function (packet) { break case 'suback': delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { if ((packet.granted[grantedI] & 0x80) !== 0) { // suback with Failure status @@ -1364,10 +1437,13 @@ MqttClient.prototype._handleAck = function (packet) { } } } + this._invokeStoreProcessingQueue() cb(null, packet) break case 'unsuback': delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() cb(null) break default: @@ -1425,13 +1501,7 @@ MqttClient.prototype._handleDisconnect = function (packet) { * @return unsigned int */ MqttClient.prototype._nextId = function () { - // id becomes current state of this.nextId and increments afterwards - var id = this.nextId++ - // Ensure 16 bit unsigned int (max 65535, nextId got one higher) - if (this.nextId === 65536) { - this.nextId = 1 - } - return id + return this.messageIdProvider.allocate() } /** @@ -1439,7 +1509,7 @@ MqttClient.prototype._nextId = function () { * @return unsigned int */ MqttClient.prototype.getLastMessageId = function () { - return (this.nextId === 1) ? 65535 : (this.nextId - 1) + return this.messageIdProvider.getLastAllocated() } /** @@ -1486,6 +1556,7 @@ MqttClient.prototype._onConnect = function (packet) { var that = this + this.messageIdProvider.clear() this._setupPingTimer() this._resubscribe(packet) @@ -1502,6 +1573,7 @@ MqttClient.prototype._onConnect = function (packet) { that.once('close', remove) outStore.on('error', function (err) { clearStoreProcessing() + that._flushStoreProcessingQueue() that.removeListener('close', remove) that.emit('error', err) }) @@ -1509,6 +1581,7 @@ MqttClient.prototype._onConnect = function (packet) { function remove () { outStore.destroy() outStore = null + that._flushStoreProcessingQueue() clearStoreProcessing() } @@ -1550,7 +1623,11 @@ MqttClient.prototype._onConnect = function (packet) { } } that._packetIdsDuringStoreProcessing[packet.messageId] = true - that._sendPacket(packet) + if (that.messageIdProvider.register(packet.messageId)) { + that._sendPacket(packet) + } else { + debug('messageId: %d has already used.', packet.messageId) + } } else if (outStore.destroy) { outStore.destroy() } @@ -1567,6 +1644,7 @@ MqttClient.prototype._onConnect = function (packet) { if (allProcessed) { clearStoreProcessing() that.removeListener('close', remove) + that._invokeAllStoreProcessingQueue() that.emit('connect', packet) } else { startStreamProcess() @@ -1578,4 +1656,27 @@ MqttClient.prototype._onConnect = function (packet) { startStreamProcess() } +MqttClient.prototype._invokeStoreProcessingQueue = function () { + if (this._storeProcessingQueue.length > 0) { + var f = this._storeProcessingQueue[0] + if (f && f.invoke()) { + this._storeProcessingQueue.shift() + return true + } + } + return false +} + +MqttClient.prototype._invokeAllStoreProcessingQueue = function () { + while (this._invokeStoreProcessingQueue()) {} +} + +MqttClient.prototype._flushStoreProcessingQueue = function () { + for (var f of this._storeProcessingQueue) { + if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) + if (f.callback) f.callback(new Error('Connection closed')) + } + this._storeProcessingQueue.splice(0) +} + module.exports = MqttClient diff --git a/lib/default-message-id-provider.js b/lib/default-message-id-provider.js new file mode 100644 index 000000000..c0a953f3f --- /dev/null +++ b/lib/default-message-id-provider.js @@ -0,0 +1,69 @@ +'use strict' + +/** + * DefaultMessageAllocator constructor + * @constructor + */ +function DefaultMessageIdProvider () { + if (!(this instanceof DefaultMessageIdProvider)) { + return new DefaultMessageIdProvider() + } + + /** + * MessageIDs starting with 1 + * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 + */ + this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) +} + +/** + * allocate + * + * Get the next messageId. + * @return unsigned int + */ +DefaultMessageIdProvider.prototype.allocate = function () { + // id becomes current state of this.nextId and increments afterwards + var id = this.nextId++ + // Ensure 16 bit unsigned int (max 65535, nextId got one higher) + if (this.nextId === 65536) { + this.nextId = 1 + } + return id +} + +/** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ +DefaultMessageIdProvider.prototype.getLastAllocated = function () { + return (this.nextId === 1) ? 65535 : (this.nextId - 1) +} + +/** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ +DefaultMessageIdProvider.prototype.register = function (messageId) { + return true +} + +/** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ +DefaultMessageIdProvider.prototype.deallocate = function (messageId) { +} + +/** + * clear + * Deallocate all messageIds. + */ +DefaultMessageIdProvider.prototype.clear = function () { +} + +module.exports = DefaultMessageIdProvider diff --git a/lib/unique-message-id-provider.js b/lib/unique-message-id-provider.js new file mode 100644 index 000000000..6ffd4bde6 --- /dev/null +++ b/lib/unique-message-id-provider.js @@ -0,0 +1,65 @@ +'use strict' + +var NumberAllocator = require('number-allocator').NumberAllocator + +/** + * UniqueMessageAllocator constructor + * @constructor + */ +function UniqueMessageIdProvider () { + if (!(this instanceof UniqueMessageIdProvider)) { + return new UniqueMessageIdProvider() + } + + this.numberAllocator = new NumberAllocator(1, 65535) +} + +/** + * allocate + * + * Get the next messageId. + * @return if messageId is fully allocated then return null, + * otherwise return the smallest usable unsigned int messageId. + */ +UniqueMessageIdProvider.prototype.allocate = function () { + this.lastId = this.numberAllocator.alloc() + return this.lastId +} + +/** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ +UniqueMessageIdProvider.prototype.getLastAllocated = function () { + return this.lastId +} + +/** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ +UniqueMessageIdProvider.prototype.register = function (messageId) { + return this.numberAllocator.use(messageId) +} + +/** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ +UniqueMessageIdProvider.prototype.deallocate = function (messageId) { + this.numberAllocator.free(messageId) +} + +/** + * clear + * Deallocate all messageIds. + */ +UniqueMessageIdProvider.prototype.clear = function () { + this.numberAllocator.clear() +} + +module.exports = UniqueMessageIdProvider diff --git a/mqtt.js b/mqtt.js index ab12375c8..c8b94fda1 100644 --- a/mqtt.js +++ b/mqtt.js @@ -8,6 +8,8 @@ var MqttClient = require('./lib/client') var connect = require('./lib/connect') var Store = require('./lib/store') +var DefaultMessageIdProvider = require('./lib/default-message-id-provider') +var UniqueMessageIdProvider = require('./lib/unique-message-id-provider') module.exports.connect = connect @@ -15,3 +17,5 @@ module.exports.connect = connect module.exports.MqttClient = MqttClient module.exports.Client = MqttClient module.exports.Store = Store +module.exports.DefaultMessageIdProvider = DefaultMessageIdProvider +module.exports.UniqueMessageIdProvider = UniqueMessageIdProvider diff --git a/package.json b/package.json index 7f94be668..3be7ba77e 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "inherits": "^2.0.3", "minimist": "^1.2.5", "mqtt-packet": "^6.8.0", + "number-allocator": "^1.0.7", "pump": "^3.0.0", "readable-stream": "^3.6.0", "reinterval": "^1.1.0", diff --git a/test/client.js b/test/client.js index 084bfed95..4ea052ab8 100644 --- a/test/client.js +++ b/test/client.js @@ -54,26 +54,6 @@ describe('MqttClient', function () { client.end() }) - it('should return 1 once the internal counter reached limit', function () { - client = mqtt.connect(config) - client.nextId = 65535 - - assert.equal(client._nextId(), 65535) - assert.equal(client._nextId(), 1) - client.end() - }) - - it('should return 65535 for last message id once the internal counter reached limit', function () { - client = mqtt.connect(config) - client.nextId = 65535 - - assert.equal(client._nextId(), 65535) - assert.equal(client.getLastMessageId(), 65535) - assert.equal(client._nextId(), 1) - assert.equal(client.getLastMessageId(), 1) - client.end() - }) - it('should not throw an error if packet\'s messageId is not found when receiving a pubrel packet', function (done) { var server2 = new MqttServer(function (serverClient) { serverClient.on('connect', function (packet) { diff --git a/test/helpers/port_list.js b/test/helpers/port_list.js index 46253bf21..89648b3c0 100644 --- a/test/helpers/port_list.js +++ b/test/helpers/port_list.js @@ -1,4 +1,5 @@ var PORT = 9876 +var PORTAND40 = PORT + 40 var PORTAND41 = PORT + 41 var PORTAND42 = PORT + 42 var PORTAND43 = PORT + 43 @@ -19,9 +20,11 @@ var PORTAND119 = PORT + 119 var PORTAND316 = PORT + 316 var PORTAND326 = PORT + 326 var PORTAND327 = PORT + 327 +var PORTAND400 = PORT + 400 module.exports = { PORT, + PORTAND40, PORTAND41, PORTAND42, PORTAND43, @@ -41,5 +44,6 @@ module.exports = { PORTAND119, PORTAND316, PORTAND326, - PORTAND327 + PORTAND327, + PORTAND400 } diff --git a/test/message-id-provider.js b/test/message-id-provider.js new file mode 100644 index 000000000..2f84bdf35 --- /dev/null +++ b/test/message-id-provider.js @@ -0,0 +1,91 @@ +'use strict' +var assert = require('chai').assert +var DefaultMessageIdProvider = require('../lib/default-message-id-provider') +var UniqueMessageIdProvider = require('../lib/unique-message-id-provider') + +describe('message id provider', function () { + describe('default', function () { + it('should return 1 once the internal counter reached limit', function () { + var provider = new DefaultMessageIdProvider() + provider.nextId = 65535 + + assert.equal(provider.allocate(), 65535) + assert.equal(provider.allocate(), 1) + }) + + it('should return 65535 for last message id once the internal counter reached limit', function () { + var provider = new DefaultMessageIdProvider() + provider.nextId = 65535 + + assert.equal(provider.allocate(), 65535) + assert.equal(provider.getLastAllocated(), 65535) + assert.equal(provider.allocate(), 1) + assert.equal(provider.getLastAllocated(), 1) + }) + it('should return true when register with non allocated messageId', function () { + var provider = new DefaultMessageIdProvider() + assert.equal(provider.register(10), true) + }) + }) + describe('unique', function () { + it('should return 1, 2, 3.., when allocate', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.allocate(), 2) + assert.equal(provider.allocate(), 3) + }) + it('should skip registerd messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.register(2), true) + assert.equal(provider.allocate(), 1) + assert.equal(provider.allocate(), 3) + }) + it('should return false register allocated messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.register(1), false) + assert.equal(provider.register(5), true) + assert.equal(provider.register(5), false) + }) + it('should retrun correct last messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.getLastAllocated(), 1) + assert.equal(provider.register(2), true) + assert.equal(provider.getLastAllocated(), 1) + assert.equal(provider.allocate(), 3) + assert.equal(provider.getLastAllocated(), 3) + }) + it('should be reusable deallocated messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.allocate(), 2) + assert.equal(provider.allocate(), 3) + provider.deallocate(2) + assert.equal(provider.allocate(), 2) + }) + it('should allocate all messageId and then return null', function () { + var provider = new UniqueMessageIdProvider() + for (var i = 1; i <= 65535; i++) { + assert.equal(provider.allocate(), i) + } + assert.equal(provider.allocate(), null) + provider.deallocate(10000) + assert.equal(provider.allocate(), 10000) + assert.equal(provider.allocate(), null) + }) + it('should all messageId reallocatable after clear', function () { + var provider = new UniqueMessageIdProvider() + var i + for (i = 1; i <= 65535; i++) { + assert.equal(provider.allocate(), i) + } + assert.equal(provider.allocate(), null) + provider.clear() + for (i = 1; i <= 65535; i++) { + assert.equal(provider.allocate(), i) + } + assert.equal(provider.allocate(), null) + }) + }) +}) diff --git a/test/server_helpers_for_client_tests.js b/test/server_helpers_for_client_tests.js index 375b96bb6..9527d47e2 100644 --- a/test/server_helpers_for_client_tests.js +++ b/test/server_helpers_for_client_tests.js @@ -22,12 +22,14 @@ var MQTTConnection = require('mqtt-connection') function serverBuilder (protocol, handler) { var defaultHandler = function (serverClient) { serverClient.on('auth', function (packet) { + if (serverClient.writable) return false var rc = 'reasonCode' var connack = {} connack[rc] = 0 serverClient.connack(connack) }) serverClient.on('connect', function (packet) { + if (!serverClient.writable) return false var rc = 'returnCode' var connack = {} if (serverClient.options && serverClient.options.protocolVersion === 5) { @@ -52,6 +54,7 @@ function serverBuilder (protocol, handler) { }) serverClient.on('publish', function (packet) { + if (!serverClient.writable) return false setImmediate(function () { switch (packet.qos) { case 0: @@ -67,10 +70,12 @@ function serverBuilder (protocol, handler) { }) serverClient.on('pubrel', function (packet) { + if (!serverClient.writable) return false serverClient.pubcomp(packet) }) serverClient.on('pubrec', function (packet) { + if (!serverClient.writable) return false serverClient.pubrel(packet) }) @@ -79,6 +84,7 @@ function serverBuilder (protocol, handler) { }) serverClient.on('subscribe', function (packet) { + if (!serverClient.writable) return false serverClient.suback({ messageId: packet.messageId, granted: packet.subscriptions.map(function (e) { @@ -88,11 +94,13 @@ function serverBuilder (protocol, handler) { }) serverClient.on('unsubscribe', function (packet) { + if (!serverClient.writable) return false packet.granted = packet.unsubscriptions.map(function () { return 0 }) serverClient.unsuback(packet) }) serverClient.on('pingreq', function () { + if (!serverClient.writable) return false serverClient.pingresp() }) diff --git a/test/typescript/broker-connect-subscribe-and-publish.ts b/test/typescript/broker-connect-subscribe-and-publish.ts index ecdb363cc..359e752a7 100644 --- a/test/typescript/broker-connect-subscribe-and-publish.ts +++ b/test/typescript/broker-connect-subscribe-and-publish.ts @@ -1,12 +1,13 @@ // relative path uses package.json {"types":"types/index.d.ts", ...} -import {IClientOptions, Client, connect, IConnackPacket} from '../..' +import {IClientOptions, Client, connect, IConnackPacket, UniqueMessageIdProvider} from '../..' const BROKER = 'test.mosquitto.org' const PAYLOAD_WILL = Buffer.from('bye from TS') const PAYLOAD_QOS = Buffer.from('hello from TS (with qos=2)') const PAYLOAD_RETAIN = 'hello from TS (with retain=true)' const TOPIC = 'typescript-test-' + Math.random().toString(16).substr(2) -const opts: IClientOptions = {will: {topic: TOPIC, payload: PAYLOAD_WILL, qos: 0, retain: false}} +const opts: IClientOptions = {will: {topic: TOPIC, payload: PAYLOAD_WILL, qos: 0, retain: false}, + messageIdProvider: new UniqueMessageIdProvider()} console.log(`connect(${JSON.stringify(BROKER)})`) const client:Client = connect(`mqtt://${BROKER}`, opts) diff --git a/test/unique_message_id_provider_client.js b/test/unique_message_id_provider_client.js new file mode 100644 index 000000000..933d85b82 --- /dev/null +++ b/test/unique_message_id_provider_client.js @@ -0,0 +1,21 @@ +'use strict' + +var abstractClientTests = require('./abstract_client') +var serverBuilder = require('./server_helpers_for_client_tests').serverBuilder +var UniqueMessageIdProvider = require('../lib/unique-message-id-provider') +var ports = require('./helpers/port_list') + +describe('UniqueMessageIdProviderMqttClient', function () { + var server = serverBuilder('mqtt') + var config = {protocol: 'mqtt', port: ports.PORTAND400, messageIdProvider: new UniqueMessageIdProvider()} + server.listen(ports.PORTAND400) + + after(function () { + // clean up and make sure the server is no longer listening... + if (server.listening) { + server.close() + } + }) + + abstractClientTests(server, config) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 9bca9c2ff..a3496b103 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -26,3 +26,5 @@ export { Packet, UserProperties } from 'mqtt-packet' +export { IMessageIdProvider } from './lib/message-id-provider' +export { UniqueMessageIdProvider } from './lib/unique-message-id-provider' diff --git a/types/lib/client-options.d.ts b/types/lib/client-options.d.ts index c9c667d85..a8cf962d6 100644 --- a/types/lib/client-options.d.ts +++ b/types/lib/client-options.d.ts @@ -3,6 +3,7 @@ import { Store } from './store' import { ClientOptions } from 'ws' import { ClientRequestArgs } from 'http' import { QoS, UserProperties } from 'mqtt-packet' +import { IMessageIdProvider } from './message-id-provider' export declare type StorePutCallback = () => void @@ -113,7 +114,8 @@ export interface IClientOptions extends ISecureClientOptions { userProperties?: UserProperties, authenticationMethod?: string, authenticationData?: Buffer - } + }, + messageIdProvider?: IMessageIdProvider } export interface ISecureClientOptions { /** diff --git a/types/lib/default-message-id-provider.d.ts b/types/lib/default-message-id-provider.d.ts new file mode 100644 index 000000000..fafaa4c9b --- /dev/null +++ b/types/lib/default-message-id-provider.d.ts @@ -0,0 +1,49 @@ +import { IMessageIdProvider } from './message-id-provider' + +/** + * DefaultMessageIdProvider + * This is compatible behavior with the original MQTT.js internal messageId allocation. + */ +declare class DefaultMessageIdProvider implements IMessageIdProvider { + /** + * DefaultMessageIdProvider constructor. + * Randomize initial messageId + * @constructor + */ + constructor () + + /** + * Return the current messageId and increment the current messageId. + * @return {Number} - messageId + */ + public allocate (): Number | null + + /** + * Get the last allocated messageId. + * @return {Number} - messageId. + */ + public getLastAllocated (): Number | null + + /** + * Register the messageId. + * This function actually nothing and always return true. + * @param {Number} num - The messageId to request use. + * @return {Boolean} - If `num` was not occupied, then return true, otherwise return false. + */ + public register (num: Number): Boolean + + /** + * Deallocate the messageId. + * This function actually nothing. + * @param {Number} num - The messageId to deallocate. + */ + public deallocate (num: Number): void + + /** + * Clear all occupied messageIds. + * This function actually nothing. + */ + public clear (): void +} + +export { DefaultMessageIdProvider } diff --git a/types/lib/message-id-provider.d.ts b/types/lib/message-id-provider.d.ts new file mode 100644 index 000000000..9468cf3e2 --- /dev/null +++ b/types/lib/message-id-provider.d.ts @@ -0,0 +1,40 @@ +/** + * MessageIdProvider + */ +declare interface IMessageIdProvider { + /** + * Allocate the first vacant messageId. The messageId become occupied status. + * @return {Number} - The first vacant messageId. If all messageIds are occupied, return null. + */ + allocate (): Number | null + + /** + * Get the last allocated messageId. + * @return {Number} - messageId. + */ + getLastAllocated (): Number | null + + /** + * Register the messageId. The messageId become occupied status. + * If the messageId has already been occupied, then return false. + * @param {Number} num - The messageId to request use. + * @return {Boolean} - If `num` was not occupied, then return true, otherwise return false. + */ + register (num: Number): Boolean + + /** + * Deallocate the messageId. The messageId become vacant status. + * @param {Number} num - The messageId to deallocate. The messageId must be occupied status. + * In other words, the messageId must be allocated by allocate() or + * occupied by register(). + */ + deallocate (num: Number): void + + /** + * Clear all occupied messageIds. + * The all messageIds are set to vacant status. + */ + clear (): void +} + +export { IMessageIdProvider } diff --git a/types/lib/unique-message-id-provider.d.ts b/types/lib/unique-message-id-provider.d.ts new file mode 100644 index 000000000..0941b2865 --- /dev/null +++ b/types/lib/unique-message-id-provider.d.ts @@ -0,0 +1,48 @@ +import { IMessageIdProvider } from './message-id-provider' + +/** + * UniqueMessageIdProvider + */ +declare class UniqueMessageIdProvider implements IMessageIdProvider { + /** + * UniqueMessageIdProvider constructor. + * @constructor + */ + constructor () + + /** + * Allocate the first vacant messageId. The messageId become occupied status. + * @return {Number} - The first vacant messageId. If all messageIds are occupied, return null. + */ + public allocate (): Number | null + + /** + * Get the last allocated messageId. + * @return {Number} - messageId. + */ + public getLastAllocated (): Number | null + + /** + * Register the messageId. The messageId become occupied status. + * If the messageId has already been occupied, then return false. + * @param {Number} num - The messageId to request use. + * @return {Boolean} - If `num` was not occupied, then return true, otherwise return false. + */ + public register (num: Number): Boolean + + /** + * Deallocate the messageId. The messageId become vacant status. + * @param {Number} num - The messageId to deallocate. The messageId must be occupied status. + * In other words, the messageId must be allocated by allocate() or + * occupied by register(). + */ + public deallocate (num: Number): void + + /** + * Clear all occupied messageIds. + * The all messageIds are set to vacant status. + */ + public clear (): void +} + +export { UniqueMessageIdProvider }