From 0eb339653a2bce980dcd084c62a5147da0cb804e Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Tue, 2 Feb 2021 22:12:13 +0900 Subject: [PATCH 1/3] Add non conflict unique messageId support. In order to avoid messageId allocating and registering conflict, during store processing, publish, subscribe, and unsubscribe functions are enqueued. The enqueued functions are invoked when the store processing will be finished. During the invocation, messageId is allocated. messageIds could be run out. In this case, stop invocation. The rest of functions in the queue are remained. When puback, pubcomp, suback, or unsuback is received, the messageId is deallocated and become reusable, so the queue invocation process is continued. --- README.md | 1 + lib/client.js | 381 +++++++++++------- lib/default-message-id-provider.js | 69 ++++ lib/unique-message-id-provider.js | 64 +++ mqtt.js | 4 + package.json | 1 + test/client.js | 20 - test/helpers/port_list.js | 6 +- test/message-id-provider.js | 91 +++++ .../broker-connect-subscribe-and-publish.ts | 5 +- test/unique_message_id_provider_client.js | 21 + types/index.d.ts | 2 + types/lib/client-options.d.ts | 4 +- types/lib/default-message-id-provider.d.ts | 49 +++ types/lib/message-id-provider.d.ts | 40 ++ types/lib/unique-message-id-provider.d.ts | 48 +++ 16 files changed, 642 insertions(+), 164 deletions(-) create mode 100644 lib/default-message-id-provider.js create mode 100644 lib/unique-message-id-provider.js create mode 100644 test/message-id-provider.js create mode 100644 test/unique_message_id_provider_client.js create mode 100644 types/lib/default-message-id-provider.d.ts create mode 100644 types/lib/message-id-provider.d.ts create mode 100644 types/lib/unique-message-id-provider.d.ts diff --git a/README.md b/README.md index 86a335eb3..db627fd0d 100644 --- a/README.md +++ b/README.md @@ -322,6 +322,7 @@ the `connect` event. Typically a `net.Socket`. urls which upon reconnect can have become expired. * `resubscribe` : if connection is broken and reconnects, subscribed topics are automatically subscribed again (default `true`) + * `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided. In case mqtts (mqtt over tls) is required, the `options` object is passed through to diff --git a/lib/client.js b/lib/client.js index 07e8de115..c481e7a04 100644 --- a/lib/client.js +++ b/lib/client.js @@ -6,6 +6,7 @@ var EventEmitter = require('events').EventEmitter var Store = require('./store') var mqttPacket = require('mqtt-packet') +var DefaultMessageIdProvider = require('./default-message-id-provider') var Writable = require('readable-stream').Writable var inherits = require('inherits') var reInterval = require('reinterval') @@ -184,6 +185,8 @@ function MqttClient (streamBuilder, options) { this.streamBuilder = streamBuilder + this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider + // Inflight message storages this.outgoingStore = options.outgoingStore || new Store() this.incomingStore = options.incomingStore || new Store() @@ -213,11 +216,8 @@ function MqttClient (streamBuilder, options) { this._storeProcessing = false // Packet Ids are put into the store during store processing this._packetIdsDuringStoreProcessing = {} - /** - * MessageIDs starting with 1 - * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 - */ - this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) + // Store processing queue + this._storeProcessingQueue = [] // Inflight callbacks this.outgoing = {} @@ -240,15 +240,29 @@ function MqttClient (streamBuilder, options) { packet = entry.packet debug('deliver :: call _sendPacket for %o', packet) - that._sendPacket( - packet, - function (err) { - if (entry.cb) { - entry.cb(err) + var send = true + if (packet.messageId && packet.messageId !== 0) { + if (!that.messageIdProvider.register(packet.messageId)) { + packet.messageeId = that.messageIdProvider.allocate() + if (packet.messageId === null) { + send = false } - deliver() } - ) + } + if (send) { + that._sendPacket( + packet, + function (err) { + if (entry.cb) { + entry.cb(err) + } + deliver() + } + ) + } else { + debug('messageId: %d has already used.', packet.messageId) + deliver() + } } debug('connect :: sending queued packets') @@ -490,60 +504,72 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) { return this } - packet = { - cmd: 'publish', - topic: topic, - payload: message, - qos: opts.qos, - retain: opts.retain, - messageId: this._nextId(), - dup: opts.dup - } - - if (options.protocolVersion === 5) { - packet.properties = opts.properties - if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && - ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || - (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { - /* - if we are don`t setup topic alias or - topic alias maximum less than topic alias or - server don`t give topic alias maximum, - we are removing topic alias from packet - */ - delete packet.properties.topicAlias + var that = this + var publishProc = function () { + var messageId = 0 + if (opts.qos === 1 || opts.qos === 2) { + messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + } + packet = { + cmd: 'publish', + topic: topic, + payload: message, + qos: opts.qos, + retain: opts.retain, + messageId: messageId, + dup: opts.dup } - } - debug('publish :: qos', opts.qos) - switch (opts.qos) { - case 1: - case 2: - // Add to callbacks - this.outgoing[packet.messageId] = { - volatile: false, - cb: callback || nop + if (options.protocolVersion === 5) { + packet.properties = opts.properties + if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && + ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || + (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { + /* + if we are don`t setup topic alias or + topic alias maximum less than topic alias or + server don`t give topic alias maximum, + we are removing topic alias from packet + */ + delete packet.properties.topicAlias } - if (this._storeProcessing) { - debug('_storeProcessing enabled') - this._packetIdsDuringStoreProcessing[packet.messageId] = false - this._storePacket(packet, undefined, opts.cbStorePut) - } else { + } + + debug('publish :: qos', opts.qos) + switch (opts.qos) { + case 1: + case 2: + // Add to callbacks + that.outgoing[packet.messageId] = { + volatile: false, + cb: callback || nop + } debug('MqttClient:publish: packet cmd: %s', packet.cmd) - this._sendPacket(packet, undefined, opts.cbStorePut) - } - break - default: - if (this._storeProcessing) { - debug('_storeProcessing enabled') - this._storePacket(packet, callback, opts.cbStorePut) - } else { + that._sendPacket(packet, undefined, opts.cbStorePut) + break + default: debug('MqttClient:publish: packet cmd: %s', packet.cmd) - this._sendPacket(packet, callback, opts.cbStorePut) - } - break + that._sendPacket(packet, callback, opts.cbStorePut) + break + } + return true } + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': publishProc, + 'cbStorePut': opts.cbStorePut, + 'callback': callback + } + ) + } else { + publishProc() + } return this } @@ -564,7 +590,7 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) { * @example client.subscribe('topic', console.log); */ MqttClient.prototype.subscribe = function () { - var packet + var that = this var args = new Array(arguments.length) for (var i = 0; i < arguments.length; i++) { args[i] = arguments[i] @@ -574,8 +600,6 @@ MqttClient.prototype.subscribe = function () { var resubscribe = obj.resubscribe var callback = args.pop() || nop var opts = args.pop() - var invalidTopic - var that = this var version = this.options.protocolVersion delete obj.resubscribe @@ -589,7 +613,7 @@ MqttClient.prototype.subscribe = function () { callback = nop } - invalidTopic = validations.validateTopics(obj) + var invalidTopic = validations.validateTopics(obj) if (invalidTopic !== null) { setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) return this @@ -654,59 +678,79 @@ MqttClient.prototype.subscribe = function () { }) } - packet = { - cmd: 'subscribe', - subscriptions: subs, - qos: 1, - retain: false, - dup: false, - messageId: this._nextId() - } - - if (opts.properties) { - packet.properties = opts.properties - } - if (!subs.length) { callback(null, []) - return + return this } - // subscriptions to resubscribe to in case of disconnect - if (this.options.resubscribe) { - debug('subscribe :: resubscribe true') - var topics = [] - subs.forEach(function (sub) { - if (that.options.reconnectPeriod > 0) { - var topic = { qos: sub.qos } - if (version === 5) { - topic.nl = sub.nl || false - topic.rap = sub.rap || false - topic.rh = sub.rh || 0 - topic.properties = sub.properties + var subscribeProc = function () { + var messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + + var packet = { + cmd: 'subscribe', + subscriptions: subs, + qos: 1, + retain: false, + dup: false, + messageId: messageId + } + + if (opts.properties) { + packet.properties = opts.properties + } + + // subscriptions to resubscribe to in case of disconnect + if (that.options.resubscribe) { + debug('subscribe :: resubscribe true') + var topics = [] + subs.forEach(function (sub) { + if (that.options.reconnectPeriod > 0) { + var topic = { qos: sub.qos } + if (version === 5) { + topic.nl = sub.nl || false + topic.rap = sub.rap || false + topic.rh = sub.rh || 0 + topic.properties = sub.properties + } + that._resubscribeTopics[sub.topic] = topic + topics.push(sub.topic) } - that._resubscribeTopics[sub.topic] = topic - topics.push(sub.topic) - } - }) - that.messageIdToTopic[packet.messageId] = topics - } + }) + that.messageIdToTopic[packet.messageId] = topics + } - this.outgoing[packet.messageId] = { - volatile: true, - cb: function (err, packet) { - if (!err) { - var granted = packet.granted - for (var i = 0; i < granted.length; i += 1) { - subs[i].qos = granted[i] + that.outgoing[packet.messageId] = { + volatile: true, + cb: function (err, packet) { + if (!err) { + var granted = packet.granted + for (var i = 0; i < granted.length; i += 1) { + subs[i].qos = granted[i] + } } - } - callback(err, subs) + callback(err, subs) + } } + debug('subscribe :: call _sendPacket') + that._sendPacket(packet) + return true + } + + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': subscribeProc, + 'callback': callback + } + ) + } else { + subscribeProc() } - debug('subscribe :: call _sendPacket') - this._sendPacket(packet) return this } @@ -724,11 +768,6 @@ MqttClient.prototype.subscribe = function () { * @example client.unsubscribe('topic', console.log); */ MqttClient.prototype.unsubscribe = function () { - var packet = { - cmd: 'unsubscribe', - qos: 1, - messageId: this._nextId() - } var that = this var args = new Array(arguments.length) for (var i = 0; i < arguments.length; i++) { @@ -737,7 +776,6 @@ MqttClient.prototype.unsubscribe = function () { var topic = args.shift() var callback = args.pop() || nop var opts = args.pop() - if (typeof topic === 'string') { topic = [topic] } @@ -747,33 +785,65 @@ MqttClient.prototype.unsubscribe = function () { callback = nop } - if (this._checkDisconnecting(callback)) { + var invalidTopic = validations.validateTopics(topic) + if (invalidTopic !== null) { + setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) return this } - if (typeof topic === 'string') { - packet.unsubscriptions = [topic] - } else if (Array.isArray(topic)) { - packet.unsubscriptions = topic + if (that._checkDisconnecting(callback)) { + return this } - if (this.options.resubscribe) { - packet.unsubscriptions.forEach(function (topic) { - delete that._resubscribeTopics[topic] - }) - } + var unsubscribeProc = function () { + var messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + var packet = { + cmd: 'unsubscribe', + qos: 1, + messageId: messageId + } - if (typeof opts === 'object' && opts.properties) { - packet.properties = opts.properties - } + if (typeof topic === 'string') { + packet.unsubscriptions = [topic] + } else if (Array.isArray(topic)) { + packet.unsubscriptions = topic + } + + if (that.options.resubscribe) { + packet.unsubscriptions.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } - this.outgoing[packet.messageId] = { - volatile: true, - cb: callback + if (typeof opts === 'object' && opts.properties) { + packet.properties = opts.properties + } + + that.outgoing[packet.messageId] = { + volatile: true, + cb: callback + } + + debug('unsubscribe: call _sendPacket') + that._sendPacket(packet) + + return true } - debug('unsubscribe: call _sendPacket') - this._sendPacket(packet) + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': unsubscribeProc, + 'callback': callback + } + ) + } else { + unsubscribeProc() + } return this } @@ -874,7 +944,7 @@ MqttClient.prototype.end = function (force, opts, cb) { * @returns {MqttClient} this - for chaining * @api public * - * @example client.removeOutgoingMessage(client.getLastMessageId()); + * @example client.removeOutgoingMessage(client.getLastAllocated()); */ MqttClient.prototype.removeOutgoingMessage = function (messageId) { var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null @@ -1334,6 +1404,8 @@ MqttClient.prototype._handleAck = function (packet) { } delete this.outgoing[messageId] this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() break case 'pubrec': response = { @@ -1353,6 +1425,7 @@ MqttClient.prototype._handleAck = function (packet) { break case 'suback': delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { if ((packet.granted[grantedI] & 0x80) !== 0) { // suback with Failure status @@ -1364,10 +1437,13 @@ MqttClient.prototype._handleAck = function (packet) { } } } + this._invokeStoreProcessingQueue() cb(null, packet) break case 'unsuback': delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() cb(null) break default: @@ -1425,13 +1501,7 @@ MqttClient.prototype._handleDisconnect = function (packet) { * @return unsigned int */ MqttClient.prototype._nextId = function () { - // id becomes current state of this.nextId and increments afterwards - var id = this.nextId++ - // Ensure 16 bit unsigned int (max 65535, nextId got one higher) - if (this.nextId === 65536) { - this.nextId = 1 - } - return id + return this.messageIdProvider.allocate() } /** @@ -1439,7 +1509,7 @@ MqttClient.prototype._nextId = function () { * @return unsigned int */ MqttClient.prototype.getLastMessageId = function () { - return (this.nextId === 1) ? 65535 : (this.nextId - 1) + return this.messageIdProvider.getLastAllocated() } /** @@ -1486,6 +1556,7 @@ MqttClient.prototype._onConnect = function (packet) { var that = this + this.messageIdProvider.clear() this._setupPingTimer() this._resubscribe(packet) @@ -1502,6 +1573,7 @@ MqttClient.prototype._onConnect = function (packet) { that.once('close', remove) outStore.on('error', function (err) { clearStoreProcessing() + that._flushStoreProcessingQueue() that.removeListener('close', remove) that.emit('error', err) }) @@ -1509,6 +1581,7 @@ MqttClient.prototype._onConnect = function (packet) { function remove () { outStore.destroy() outStore = null + that._flushStoreProcessingQueue() clearStoreProcessing() } @@ -1550,7 +1623,11 @@ MqttClient.prototype._onConnect = function (packet) { } } that._packetIdsDuringStoreProcessing[packet.messageId] = true - that._sendPacket(packet) + if (that.messageIdProvider.register(packet.messageId)) { + that._sendPacket(packet) + } else { + debug('messageId: %d has already used.', packet.messageId) + } } else if (outStore.destroy) { outStore.destroy() } @@ -1567,6 +1644,7 @@ MqttClient.prototype._onConnect = function (packet) { if (allProcessed) { clearStoreProcessing() that.removeListener('close', remove) + that._invokeAllStoreProcessingQueue() that.emit('connect', packet) } else { startStreamProcess() @@ -1578,4 +1656,27 @@ MqttClient.prototype._onConnect = function (packet) { startStreamProcess() } +MqttClient.prototype._invokeStoreProcessingQueue = function () { + if (this._storeProcessingQueue.length > 0) { + var f = this._storeProcessingQueue[0] + if (f && f.invoke()) { + this._storeProcessingQueue.shift() + return true + } + } + return false +} + +MqttClient.prototype._invokeAllStoreProcessingQueue = function () { + while (this._invokeStoreProcessingQueue()) {} +} + +MqttClient.prototype._flushStoreProcessingQueue = function () { + for (var f of this._storeProcessingQueue) { + if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) + if (f.callback) f.callback(new Error('Connection closed')) + } + this._storeProcessingQueue.splice(0) +} + module.exports = MqttClient diff --git a/lib/default-message-id-provider.js b/lib/default-message-id-provider.js new file mode 100644 index 000000000..c0a953f3f --- /dev/null +++ b/lib/default-message-id-provider.js @@ -0,0 +1,69 @@ +'use strict' + +/** + * DefaultMessageAllocator constructor + * @constructor + */ +function DefaultMessageIdProvider () { + if (!(this instanceof DefaultMessageIdProvider)) { + return new DefaultMessageIdProvider() + } + + /** + * MessageIDs starting with 1 + * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 + */ + this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) +} + +/** + * allocate + * + * Get the next messageId. + * @return unsigned int + */ +DefaultMessageIdProvider.prototype.allocate = function () { + // id becomes current state of this.nextId and increments afterwards + var id = this.nextId++ + // Ensure 16 bit unsigned int (max 65535, nextId got one higher) + if (this.nextId === 65536) { + this.nextId = 1 + } + return id +} + +/** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ +DefaultMessageIdProvider.prototype.getLastAllocated = function () { + return (this.nextId === 1) ? 65535 : (this.nextId - 1) +} + +/** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ +DefaultMessageIdProvider.prototype.register = function (messageId) { + return true +} + +/** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ +DefaultMessageIdProvider.prototype.deallocate = function (messageId) { +} + +/** + * clear + * Deallocate all messageIds. + */ +DefaultMessageIdProvider.prototype.clear = function () { +} + +module.exports = DefaultMessageIdProvider diff --git a/lib/unique-message-id-provider.js b/lib/unique-message-id-provider.js new file mode 100644 index 000000000..5d2203f14 --- /dev/null +++ b/lib/unique-message-id-provider.js @@ -0,0 +1,64 @@ +'use strict' + +var NumberAllocator = require('number-allocator').NumberAllocator + +/** + * UniqueMessageAllocator constructor + * @constructor + */ +function UniqueMessageIdProvider () { + if (!(this instanceof UniqueMessageIdProvider)) { + return new UniqueMessageIdProvider() + } + + this.numberAllocator = new NumberAllocator(1, 65535) +} + +/** + * allocate + * + * Get the next messageId. + * @return unsigned int + */ +UniqueMessageIdProvider.prototype.allocate = function () { + this.lastId = this.numberAllocator.alloc() + return this.lastId +} + +/** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ +UniqueMessageIdProvider.prototype.getLastAllocated = function () { + return this.lastId +} + +/** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ +UniqueMessageIdProvider.prototype.register = function (messageId) { + return this.numberAllocator.use(messageId) +} + +/** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ +UniqueMessageIdProvider.prototype.deallocate = function (messageId) { + this.numberAllocator.free(messageId) +} + +/** + * clear + * Deallocate all messageIds. + */ +UniqueMessageIdProvider.prototype.clear = function () { + this.numberAllocator.clear() +} + +module.exports = UniqueMessageIdProvider diff --git a/mqtt.js b/mqtt.js index ab12375c8..c8b94fda1 100644 --- a/mqtt.js +++ b/mqtt.js @@ -8,6 +8,8 @@ var MqttClient = require('./lib/client') var connect = require('./lib/connect') var Store = require('./lib/store') +var DefaultMessageIdProvider = require('./lib/default-message-id-provider') +var UniqueMessageIdProvider = require('./lib/unique-message-id-provider') module.exports.connect = connect @@ -15,3 +17,5 @@ module.exports.connect = connect module.exports.MqttClient = MqttClient module.exports.Client = MqttClient module.exports.Store = Store +module.exports.DefaultMessageIdProvider = DefaultMessageIdProvider +module.exports.UniqueMessageIdProvider = UniqueMessageIdProvider diff --git a/package.json b/package.json index 0ce555a23..0dde135ea 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "inherits": "^2.0.3", "minimist": "^1.2.5", "mqtt-packet": "^6.8.0", + "number-allocator": "^1.0.7", "pump": "^3.0.0", "readable-stream": "^3.6.0", "reinterval": "^1.1.0", diff --git a/test/client.js b/test/client.js index 084bfed95..4ea052ab8 100644 --- a/test/client.js +++ b/test/client.js @@ -54,26 +54,6 @@ describe('MqttClient', function () { client.end() }) - it('should return 1 once the internal counter reached limit', function () { - client = mqtt.connect(config) - client.nextId = 65535 - - assert.equal(client._nextId(), 65535) - assert.equal(client._nextId(), 1) - client.end() - }) - - it('should return 65535 for last message id once the internal counter reached limit', function () { - client = mqtt.connect(config) - client.nextId = 65535 - - assert.equal(client._nextId(), 65535) - assert.equal(client.getLastMessageId(), 65535) - assert.equal(client._nextId(), 1) - assert.equal(client.getLastMessageId(), 1) - client.end() - }) - it('should not throw an error if packet\'s messageId is not found when receiving a pubrel packet', function (done) { var server2 = new MqttServer(function (serverClient) { serverClient.on('connect', function (packet) { diff --git a/test/helpers/port_list.js b/test/helpers/port_list.js index 46253bf21..89648b3c0 100644 --- a/test/helpers/port_list.js +++ b/test/helpers/port_list.js @@ -1,4 +1,5 @@ var PORT = 9876 +var PORTAND40 = PORT + 40 var PORTAND41 = PORT + 41 var PORTAND42 = PORT + 42 var PORTAND43 = PORT + 43 @@ -19,9 +20,11 @@ var PORTAND119 = PORT + 119 var PORTAND316 = PORT + 316 var PORTAND326 = PORT + 326 var PORTAND327 = PORT + 327 +var PORTAND400 = PORT + 400 module.exports = { PORT, + PORTAND40, PORTAND41, PORTAND42, PORTAND43, @@ -41,5 +44,6 @@ module.exports = { PORTAND119, PORTAND316, PORTAND326, - PORTAND327 + PORTAND327, + PORTAND400 } diff --git a/test/message-id-provider.js b/test/message-id-provider.js new file mode 100644 index 000000000..2f84bdf35 --- /dev/null +++ b/test/message-id-provider.js @@ -0,0 +1,91 @@ +'use strict' +var assert = require('chai').assert +var DefaultMessageIdProvider = require('../lib/default-message-id-provider') +var UniqueMessageIdProvider = require('../lib/unique-message-id-provider') + +describe('message id provider', function () { + describe('default', function () { + it('should return 1 once the internal counter reached limit', function () { + var provider = new DefaultMessageIdProvider() + provider.nextId = 65535 + + assert.equal(provider.allocate(), 65535) + assert.equal(provider.allocate(), 1) + }) + + it('should return 65535 for last message id once the internal counter reached limit', function () { + var provider = new DefaultMessageIdProvider() + provider.nextId = 65535 + + assert.equal(provider.allocate(), 65535) + assert.equal(provider.getLastAllocated(), 65535) + assert.equal(provider.allocate(), 1) + assert.equal(provider.getLastAllocated(), 1) + }) + it('should return true when register with non allocated messageId', function () { + var provider = new DefaultMessageIdProvider() + assert.equal(provider.register(10), true) + }) + }) + describe('unique', function () { + it('should return 1, 2, 3.., when allocate', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.allocate(), 2) + assert.equal(provider.allocate(), 3) + }) + it('should skip registerd messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.register(2), true) + assert.equal(provider.allocate(), 1) + assert.equal(provider.allocate(), 3) + }) + it('should return false register allocated messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.register(1), false) + assert.equal(provider.register(5), true) + assert.equal(provider.register(5), false) + }) + it('should retrun correct last messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.getLastAllocated(), 1) + assert.equal(provider.register(2), true) + assert.equal(provider.getLastAllocated(), 1) + assert.equal(provider.allocate(), 3) + assert.equal(provider.getLastAllocated(), 3) + }) + it('should be reusable deallocated messageId', function () { + var provider = new UniqueMessageIdProvider() + assert.equal(provider.allocate(), 1) + assert.equal(provider.allocate(), 2) + assert.equal(provider.allocate(), 3) + provider.deallocate(2) + assert.equal(provider.allocate(), 2) + }) + it('should allocate all messageId and then return null', function () { + var provider = new UniqueMessageIdProvider() + for (var i = 1; i <= 65535; i++) { + assert.equal(provider.allocate(), i) + } + assert.equal(provider.allocate(), null) + provider.deallocate(10000) + assert.equal(provider.allocate(), 10000) + assert.equal(provider.allocate(), null) + }) + it('should all messageId reallocatable after clear', function () { + var provider = new UniqueMessageIdProvider() + var i + for (i = 1; i <= 65535; i++) { + assert.equal(provider.allocate(), i) + } + assert.equal(provider.allocate(), null) + provider.clear() + for (i = 1; i <= 65535; i++) { + assert.equal(provider.allocate(), i) + } + assert.equal(provider.allocate(), null) + }) + }) +}) diff --git a/test/typescript/broker-connect-subscribe-and-publish.ts b/test/typescript/broker-connect-subscribe-and-publish.ts index ecdb363cc..359e752a7 100644 --- a/test/typescript/broker-connect-subscribe-and-publish.ts +++ b/test/typescript/broker-connect-subscribe-and-publish.ts @@ -1,12 +1,13 @@ // relative path uses package.json {"types":"types/index.d.ts", ...} -import {IClientOptions, Client, connect, IConnackPacket} from '../..' +import {IClientOptions, Client, connect, IConnackPacket, UniqueMessageIdProvider} from '../..' const BROKER = 'test.mosquitto.org' const PAYLOAD_WILL = Buffer.from('bye from TS') const PAYLOAD_QOS = Buffer.from('hello from TS (with qos=2)') const PAYLOAD_RETAIN = 'hello from TS (with retain=true)' const TOPIC = 'typescript-test-' + Math.random().toString(16).substr(2) -const opts: IClientOptions = {will: {topic: TOPIC, payload: PAYLOAD_WILL, qos: 0, retain: false}} +const opts: IClientOptions = {will: {topic: TOPIC, payload: PAYLOAD_WILL, qos: 0, retain: false}, + messageIdProvider: new UniqueMessageIdProvider()} console.log(`connect(${JSON.stringify(BROKER)})`) const client:Client = connect(`mqtt://${BROKER}`, opts) diff --git a/test/unique_message_id_provider_client.js b/test/unique_message_id_provider_client.js new file mode 100644 index 000000000..933d85b82 --- /dev/null +++ b/test/unique_message_id_provider_client.js @@ -0,0 +1,21 @@ +'use strict' + +var abstractClientTests = require('./abstract_client') +var serverBuilder = require('./server_helpers_for_client_tests').serverBuilder +var UniqueMessageIdProvider = require('../lib/unique-message-id-provider') +var ports = require('./helpers/port_list') + +describe('UniqueMessageIdProviderMqttClient', function () { + var server = serverBuilder('mqtt') + var config = {protocol: 'mqtt', port: ports.PORTAND400, messageIdProvider: new UniqueMessageIdProvider()} + server.listen(ports.PORTAND400) + + after(function () { + // clean up and make sure the server is no longer listening... + if (server.listening) { + server.close() + } + }) + + abstractClientTests(server, config) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 9bca9c2ff..a3496b103 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -26,3 +26,5 @@ export { Packet, UserProperties } from 'mqtt-packet' +export { IMessageIdProvider } from './lib/message-id-provider' +export { UniqueMessageIdProvider } from './lib/unique-message-id-provider' diff --git a/types/lib/client-options.d.ts b/types/lib/client-options.d.ts index cbcfb5a2e..fb388304d 100644 --- a/types/lib/client-options.d.ts +++ b/types/lib/client-options.d.ts @@ -1,6 +1,7 @@ import { MqttClient } from './client' import { Store } from './store' import { QoS, UserProperties } from 'mqtt-packet' +import { IMessageIdProvider } from './message-id-provider' export declare type StorePutCallback = () => void @@ -113,7 +114,8 @@ export interface IClientOptions extends ISecureClientOptions { userProperties?: UserProperties, authenticationMethod?: string, authenticationData?: Buffer - } + }, + messageIdProvider?: IMessageIdProvider } export interface ISecureClientOptions { /** diff --git a/types/lib/default-message-id-provider.d.ts b/types/lib/default-message-id-provider.d.ts new file mode 100644 index 000000000..fafaa4c9b --- /dev/null +++ b/types/lib/default-message-id-provider.d.ts @@ -0,0 +1,49 @@ +import { IMessageIdProvider } from './message-id-provider' + +/** + * DefaultMessageIdProvider + * This is compatible behavior with the original MQTT.js internal messageId allocation. + */ +declare class DefaultMessageIdProvider implements IMessageIdProvider { + /** + * DefaultMessageIdProvider constructor. + * Randomize initial messageId + * @constructor + */ + constructor () + + /** + * Return the current messageId and increment the current messageId. + * @return {Number} - messageId + */ + public allocate (): Number | null + + /** + * Get the last allocated messageId. + * @return {Number} - messageId. + */ + public getLastAllocated (): Number | null + + /** + * Register the messageId. + * This function actually nothing and always return true. + * @param {Number} num - The messageId to request use. + * @return {Boolean} - If `num` was not occupied, then return true, otherwise return false. + */ + public register (num: Number): Boolean + + /** + * Deallocate the messageId. + * This function actually nothing. + * @param {Number} num - The messageId to deallocate. + */ + public deallocate (num: Number): void + + /** + * Clear all occupied messageIds. + * This function actually nothing. + */ + public clear (): void +} + +export { DefaultMessageIdProvider } diff --git a/types/lib/message-id-provider.d.ts b/types/lib/message-id-provider.d.ts new file mode 100644 index 000000000..9468cf3e2 --- /dev/null +++ b/types/lib/message-id-provider.d.ts @@ -0,0 +1,40 @@ +/** + * MessageIdProvider + */ +declare interface IMessageIdProvider { + /** + * Allocate the first vacant messageId. The messageId become occupied status. + * @return {Number} - The first vacant messageId. If all messageIds are occupied, return null. + */ + allocate (): Number | null + + /** + * Get the last allocated messageId. + * @return {Number} - messageId. + */ + getLastAllocated (): Number | null + + /** + * Register the messageId. The messageId become occupied status. + * If the messageId has already been occupied, then return false. + * @param {Number} num - The messageId to request use. + * @return {Boolean} - If `num` was not occupied, then return true, otherwise return false. + */ + register (num: Number): Boolean + + /** + * Deallocate the messageId. The messageId become vacant status. + * @param {Number} num - The messageId to deallocate. The messageId must be occupied status. + * In other words, the messageId must be allocated by allocate() or + * occupied by register(). + */ + deallocate (num: Number): void + + /** + * Clear all occupied messageIds. + * The all messageIds are set to vacant status. + */ + clear (): void +} + +export { IMessageIdProvider } diff --git a/types/lib/unique-message-id-provider.d.ts b/types/lib/unique-message-id-provider.d.ts new file mode 100644 index 000000000..0941b2865 --- /dev/null +++ b/types/lib/unique-message-id-provider.d.ts @@ -0,0 +1,48 @@ +import { IMessageIdProvider } from './message-id-provider' + +/** + * UniqueMessageIdProvider + */ +declare class UniqueMessageIdProvider implements IMessageIdProvider { + /** + * UniqueMessageIdProvider constructor. + * @constructor + */ + constructor () + + /** + * Allocate the first vacant messageId. The messageId become occupied status. + * @return {Number} - The first vacant messageId. If all messageIds are occupied, return null. + */ + public allocate (): Number | null + + /** + * Get the last allocated messageId. + * @return {Number} - messageId. + */ + public getLastAllocated (): Number | null + + /** + * Register the messageId. The messageId become occupied status. + * If the messageId has already been occupied, then return false. + * @param {Number} num - The messageId to request use. + * @return {Boolean} - If `num` was not occupied, then return true, otherwise return false. + */ + public register (num: Number): Boolean + + /** + * Deallocate the messageId. The messageId become vacant status. + * @param {Number} num - The messageId to deallocate. The messageId must be occupied status. + * In other words, the messageId must be allocated by allocate() or + * occupied by register(). + */ + public deallocate (num: Number): void + + /** + * Clear all occupied messageIds. + * The all messageIds are set to vacant status. + */ + public clear (): void +} + +export { UniqueMessageIdProvider } From 7c1368fee5bd78b3b5ceb8bc38edb855bfc9cdd8 Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Thu, 4 Feb 2021 19:52:02 +0900 Subject: [PATCH 2/3] Fixed test server helper sometimes write after end. It caused Uncaught Error: write after end as follows. It had happened very subtle timing. ``` 1) Websocket Client auto reconnect should resubscribe when reconnecting: Uncaught Error: write after end at writeAfterEnd (node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:288:12) at Connection.Writable.write (node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:332:20) at Connection. [as pingresp] (node_modules/mqtt-connection/connection.js:95:10) at Connection. (test/server_helpers_for_client_tests.js:96:20) at Connection.emitPacket (node_modules/mqtt-connection/connection.js:10:8) at addChunk (node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js:291:12) at readableAddChunk (node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js:278:11) at Connection.Readable.push (node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js:245:10) at Connection.Duplexify._forward (node_modules/duplexify/index.js:170:26) at DestroyableTransform.onreadable (node_modules/duplexify/index.js:134:10) at emitReadable_ (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:504:10) at emitReadable (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:498:62) at addChunk (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:298:29) at readableAddChunk (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:278:11) at DestroyableTransform.Readable.push (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:245:10) at DestroyableTransform.Transform.push (node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:148:32) at Parser.push (node_modules/mqtt-connection/lib/parseStream.js:19:12) at Parser._newPacket (node_modules/mqtt-packet/parser.js:672:12) at Parser.parse (node_modules/mqtt-packet/parser.js:43:45) at DestroyableTransform.process [as _transform] (node_modules/mqtt-connection/lib/parseStream.js:14:17) at DestroyableTransform.Transform._read (node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:184:10) at DestroyableTransform.Transform._write (node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:172:83) at doWrite (node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:428:64) at writeOrBuffer (node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:417:5) at DestroyableTransform.Writable.write (node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:334:11) at Socket.ondata (internal/streams/readable.js:719:22) at addChunk (internal/streams/readable.js:309:12) at readableAddChunk (internal/streams/readable.js:284:9) at Socket.Readable.push (internal/streams/readable.js:223:10) at TCP.onStreamRead (internal/stream_base_commons.js:188:23) at TCP.callbackTrampoline (internal/async_hooks.js:131:14) ``` --- test/server_helpers_for_client_tests.js | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/server_helpers_for_client_tests.js b/test/server_helpers_for_client_tests.js index 375b96bb6..9527d47e2 100644 --- a/test/server_helpers_for_client_tests.js +++ b/test/server_helpers_for_client_tests.js @@ -22,12 +22,14 @@ var MQTTConnection = require('mqtt-connection') function serverBuilder (protocol, handler) { var defaultHandler = function (serverClient) { serverClient.on('auth', function (packet) { + if (serverClient.writable) return false var rc = 'reasonCode' var connack = {} connack[rc] = 0 serverClient.connack(connack) }) serverClient.on('connect', function (packet) { + if (!serverClient.writable) return false var rc = 'returnCode' var connack = {} if (serverClient.options && serverClient.options.protocolVersion === 5) { @@ -52,6 +54,7 @@ function serverBuilder (protocol, handler) { }) serverClient.on('publish', function (packet) { + if (!serverClient.writable) return false setImmediate(function () { switch (packet.qos) { case 0: @@ -67,10 +70,12 @@ function serverBuilder (protocol, handler) { }) serverClient.on('pubrel', function (packet) { + if (!serverClient.writable) return false serverClient.pubcomp(packet) }) serverClient.on('pubrec', function (packet) { + if (!serverClient.writable) return false serverClient.pubrel(packet) }) @@ -79,6 +84,7 @@ function serverBuilder (protocol, handler) { }) serverClient.on('subscribe', function (packet) { + if (!serverClient.writable) return false serverClient.suback({ messageId: packet.messageId, granted: packet.subscriptions.map(function (e) { @@ -88,11 +94,13 @@ function serverBuilder (protocol, handler) { }) serverClient.on('unsubscribe', function (packet) { + if (!serverClient.writable) return false packet.granted = packet.unsubscriptions.map(function () { return 0 }) serverClient.unsuback(packet) }) serverClient.on('pingreq', function () { + if (!serverClient.writable) return false serverClient.pingresp() }) From 3907b67fc63c30564e47db50f242ccf0a5aa5cbe Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Tue, 22 Jun 2021 07:39:09 +0900 Subject: [PATCH 3/3] Fix the comment. --- lib/unique-message-id-provider.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/unique-message-id-provider.js b/lib/unique-message-id-provider.js index 5d2203f14..6ffd4bde6 100644 --- a/lib/unique-message-id-provider.js +++ b/lib/unique-message-id-provider.js @@ -18,7 +18,8 @@ function UniqueMessageIdProvider () { * allocate * * Get the next messageId. - * @return unsigned int + * @return if messageId is fully allocated then return null, + * otherwise return the smallest usable unsigned int messageId. */ UniqueMessageIdProvider.prototype.allocate = function () { this.lastId = this.numberAllocator.alloc()