diff --git a/README.md b/README.md index b4497dc97..478a7a722 100644 --- a/README.md +++ b/README.md @@ -206,7 +206,41 @@ the final connection when it drops. The default value is 1000 ms which means it will try to reconnect 1 second after losing the connection. + +## About Topic Alias management +### Enabling automatic Topic Alias using +If the client sets the option `autoUseTopicAlias:true` then MQTT.js uses existing topic alias automatically. + +example scenario: +``` +1. PUBLISH topic:'t1', ta:1 (register) +2. PUBLISH topic:'t1' -> topic:'', ta:1 (auto use existing map entry) +3. PUBLISH topic:'t2', ta:1 (register overwrite) +4. PUBLISH topic:'t2' -> topic:'', ta:1 (auto use existing map entry based on the receent map) +5. PUBLISH topic:'t1' (t1 is no longer mapped to ta:1) +``` + +User doesn't need to manage which topic is mapped to which topic alias. +If the user want to register topic alias, then publish topic with topic alias. +If the user want to use topic alias, then publish topic without topic alias. If there is a mapped topic alias then added it as a property and update the topic to empty string. + +### Enabling automatic Topic Alias assign + +If the client sets the option `autoAssignTopicAlias:true` then MQTT.js uses existing topic alias automatically. +If no topic alias exists, then assign a new vacant topic alias automatically. If topic alias is fully used, then LRU(Least Recently Used) topic-alias entry is overwritten. + +example scenario: +``` +The broker returns CONNACK (TopicAliasMaximum:3) +1. PUBLISH topic:'t1' -> 't1', ta:1 (auto assign t1:1 and register) +2. PUBLISH topic:'t1' -> '' , ta:1 (auto use existing map entry) +3. PUBLISH topic:'t2' -> 't2', ta:2 (auto assign t1:2 and register. 2 was vacant) +4. PUBLISH topic:'t3' -> 't3', ta:3 (auto assign t1:3 and register. 3 was vacant) +5. PUBLISH topic:'t4' -> 't4', ta:1 (LRU entry is overwritten) +``` + +Also user can manually register topic-alias pair using PUBLISH topic:'some', ta:X. It works well with automatic topic alias assign. ## API @@ -291,6 +325,8 @@ the `connect` event. Typically a `net.Socket`. ```js customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/} ``` + * `autoUseTopicAlias`: enabling automatic Topic Alias using functionality + * `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality * `properties`: properties MQTT 5.0. `object` that supports the following properties: * `sessionExpiryInterval`: representing the Session Expiry Interval in seconds `number`, @@ -661,7 +697,7 @@ npm install browserify npm install tinyify cd node_modules/mqtt/ npm install . -npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag +npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag # show size for compressed browser transfer gzip 0) { + if (options.topicAliasMaximum > 0xffff) { + debug('MqttClient :: options.topicAliasMaximum is out of range') + } else { + this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum) + } + } + // Send queued packets this.on('connect', function () { var queue = this.queue @@ -282,6 +380,10 @@ function MqttClient (streamBuilder, options) { that.pingTimer = null } + if (this.topicAliasRecv) { + this.topicAliasRecv.clear() + } + debug('close :: calling _setupReconnect') this._setupReconnect() }) @@ -378,6 +480,14 @@ MqttClient.prototype._setupStream = function () { debug('_setupStream: sending packet `connect`') connectPacket = Object.create(this.options) connectPacket.cmd = 'connect' + if (this.topicAliasRecv) { + if (!connectPacket.properties) { + connectPacket.properties = {} + } + if (this.topicAliasRecv) { + connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max + } + } // avoid message queue sendPacket(this, connectPacket) @@ -526,17 +636,6 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) { if (options.protocolVersion === 5) { packet.properties = opts.properties - if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && - ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || - (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { - /* - if we are don`t setup topic alias or - topic alias maximum less than topic alias or - server don`t give topic alias maximum, - we are removing topic alias from packet - */ - delete packet.properties.topicAlias - } } debug('publish :: qos', opts.qos) @@ -1102,6 +1201,13 @@ MqttClient.prototype._cleanUp = function (forced, done) { MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) { debug('_sendPacket :: (%s) :: start', this.options.clientId) cbStorePut = cbStorePut || nop + cb = cb || nop + + var err = applyTopicAlias(this, packet) + if (err) { + cb(err) + return + } if (!this.connected) { debug('_sendPacket :: client not connected. Storing packet offline.') @@ -1154,12 +1260,20 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { debug('_storePacket :: cb? %s', !!cb) cbStorePut = cbStorePut || nop + var storePacket = packet + if (storePacket.cmd === 'publish') { + storePacket = clone(packet) + var err = removeTopicAlias(this, storePacket) + if (err) { + return cb && cb(err) + } + } // check that the packet is not a qos of 0, or that the command is not a publish - if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { - this.queue.push({ packet: packet, cb: cb }) - } else if (packet.qos > 0) { - cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null - this.outgoingStore.put(packet, function (err) { + if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { + this.queue.push({ packet: storePacket, cb: cb }) + } else if (storePacket.qos > 0) { + cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null + this.outgoingStore.put(storePacket, function (err) { if (err) { return cb && cb(err) } @@ -1237,11 +1351,17 @@ MqttClient.prototype._handleConnack = function (packet) { var rc = version === 5 ? packet.reasonCode : packet.returnCode clearTimeout(this.connackTimer) + delete this.topicAliasSend if (packet.properties) { if (packet.properties.topicAliasMaximum) { - if (!options.properties) { options.properties = {} } - options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum + if (packet.properties.topicAliasMaximum > 0xffff) { + this.emit('error', new Error('topicAliasMaximum from broker is out of range')) + return + } + if (packet.properties.topicAliasMaximum > 0) { + this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum) + } } if (packet.properties.serverKeepAlive && options.keepalive) { options.keepalive = packet.properties.serverKeepAlive @@ -1303,6 +1423,39 @@ MqttClient.prototype._handlePublish = function (packet, done) { var that = this var options = this.options var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] + if (this.options.protocolVersion === 5) { + var alias + if (packet.properties) { + alias = packet.properties.topicAlias + } + if (typeof alias !== 'undefined') { + if (topic.length === 0) { + if (alias > 0 && alias <= 0xffff) { + var gotTopic = this.topicAliasRecv.getTopicByAlias(alias) + if (gotTopic) { + topic = gotTopic + debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias) + } else { + debug('_handlePublish :: unregistered topic alias. alias: %d', alias) + this.emit('error', new Error('Received unregistered Topic Alias')) + return + } + } else { + debug('_handlePublish :: topic alias out of range. alias: %d', alias) + this.emit('error', new Error('Received Topic Alias is out of range')) + return + } + } else { + if (this.topicAliasRecv.put(topic, alias)) { + debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias) + } else { + debug('_handlePublish :: topic alias out of range. alias: %d', alias) + this.emit('error', new Error('Received Topic Alias is out of range')) + return + } + } + } + } debug('_handlePublish: qos %d', qos) switch (qos) { case 2: { diff --git a/lib/topic-alias-recv.js b/lib/topic-alias-recv.js new file mode 100644 index 000000000..553341100 --- /dev/null +++ b/lib/topic-alias-recv.js @@ -0,0 +1,47 @@ +'use strict' + +/** + * Topic Alias receiving manager + * This holds alias to topic map + * @param {Number} [max] - topic alias maximum entries + */ +function TopicAliasRecv (max) { + if (!(this instanceof TopicAliasRecv)) { + return new TopicAliasRecv(max) + } + this.aliasToTopic = {} + this.max = max +} + +/** + * Insert or update topic - alias entry. + * @param {String} [topic] - topic + * @param {Number} [alias] - topic alias + * @returns {Boolean} - if success return true otherwise false + */ +TopicAliasRecv.prototype.put = function (topic, alias) { + if (alias === 0 || alias > this.max) { + return false + } + this.aliasToTopic[alias] = topic + this.length = Object.keys(this.aliasToTopic).length + return true +} + +/** + * Get topic by alias + * @param {String} [topic] - topic + * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined + */ +TopicAliasRecv.prototype.getTopicByAlias = function (alias) { + return this.aliasToTopic[alias] +} + +/** + * Clear all entries + */ +TopicAliasRecv.prototype.clear = function () { + this.aliasToTopic = {} +} + +module.exports = TopicAliasRecv diff --git a/lib/topic-alias-send.js b/lib/topic-alias-send.js new file mode 100644 index 000000000..34aef3504 --- /dev/null +++ b/lib/topic-alias-send.js @@ -0,0 +1,93 @@ +'use strict' + +/** + * Module dependencies + */ +var LruMap = require('collections/lru-map') +var NumberAllocator = require('number-allocator').NumberAllocator + +/** + * Topic Alias sending manager + * This holds both topic to alias and alias to topic map + * @param {Number} [max] - topic alias maximum entries + */ +function TopicAliasSend (max) { + if (!(this instanceof TopicAliasSend)) { + return new TopicAliasSend(max) + } + + if (max > 0) { + this.aliasToTopic = new LruMap() + this.topicToAlias = {} + this.numberAllocator = new NumberAllocator(1, max) + this.max = max + this.length = 0 + } +} + +/** + * Insert or update topic - alias entry. + * @param {String} [topic] - topic + * @param {Number} [alias] - topic alias + * @returns {Boolean} - if success return true otherwise false + */ +TopicAliasSend.prototype.put = function (topic, alias) { + if (alias === 0 || alias > this.max) { + return false + } + const entry = this.aliasToTopic.get(alias) + if (entry) { + delete this.topicToAlias[entry.topic] + } + this.aliasToTopic.set(alias, {'topic': topic, 'alias': alias}) + this.topicToAlias[topic] = alias + this.numberAllocator.use(alias) + this.length = this.aliasToTopic.length + return true +} + +/** + * Get topic by alias + * @param {Number} [alias] - topic alias + * @returns {String} - if mapped topic exists return topic, otherwise return undefined + */ +TopicAliasSend.prototype.getTopicByAlias = function (alias) { + const entry = this.aliasToTopic.get(alias) + if (typeof entry === 'undefined') return entry + return entry.topic +} + +/** + * Get topic by alias + * @param {String} [topic] - topic + * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined + */ +TopicAliasSend.prototype.getAliasByTopic = function (topic) { + const alias = this.topicToAlias[topic] + if (typeof alias !== 'undefined') { + this.aliasToTopic.get(alias) // LRU update + } + return alias +} + +/** + * Clear all entries + */ +TopicAliasSend.prototype.clear = function () { + this.aliasToTopic.clear() + this.topicToAlias = {} + this.numberAllocator.clear() + this.length = 0 +} + +/** + * Get LRU topic alias + * @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias + */ +TopicAliasSend.prototype.getLruAlias = function () { + const alias = this.numberAllocator.firstVacant() + if (alias) return alias + return this.aliasToTopic.min().alias +} + +module.exports = TopicAliasSend diff --git a/package.json b/package.json index 3be7ba77e..0f9261059 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,7 @@ "net": false }, "dependencies": { + "collections": "^5.1.12", "commist": "^1.0.0", "concat-stream": "^2.0.0", "debug": "^4.1.1", @@ -73,6 +74,7 @@ "number-allocator": "^1.0.7", "pump": "^3.0.0", "readable-stream": "^3.6.0", + "rfdc": "^1.3.0", "reinterval": "^1.1.0", "split2": "^3.1.0", "ws": "^7.5.0", diff --git a/test/client_mqtt5.js b/test/client_mqtt5.js index 48e1bcb6a..b446628de 100644 --- a/test/client_mqtt5.js +++ b/test/client_mqtt5.js @@ -13,29 +13,548 @@ describe('MQTT 5.0', function () { abstractClientTests(server, config) - // var server = serverBuilder().listen(ports.PORTAND115) - - var topicAliasTests = [ - {properties: {}, name: 'should allow any topicAlias when no topicAliasMaximum provided in settings'}, - {properties: { topicAliasMaximum: 15 }, name: 'should not allow topicAlias > topicAliasMaximum when topicAliasMaximum provided in settings'} - ] - - topicAliasTests.forEach(function (test) { - it(test.name, function (done) { - this.timeout(15000) - server.once('client', function (serverClient) { - serverClient.on('publish', function (packet) { - if (packet.properties && packet.properties.topicAlias) { - done(new Error('Packet should not have topicAlias')) - return false - } else { - serverClient.end(done) + it('topic should be complemented on receive', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + topicAliasMaximum: 3 + } + var client = mqtt.connect(opts) + var publishCount = 0 + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + assert.strictEqual(packet.properties.topicAliasMaximum, 3) + serverClient.connack({ + reasonCode: 0 + }) + // register topicAlias + serverClient.publish({ + messageId: 0, + topic: 'test1', + payload: 'Message', + qos: 0, + properties: { topicAlias: 1 } + }) + // use topicAlias + serverClient.publish({ + messageId: 0, + topic: '', + payload: 'Message', + qos: 0, + properties: { topicAlias: 1 } + }) + // overwrite register topicAlias + serverClient.publish({ + messageId: 0, + topic: 'test2', + payload: 'Message', + qos: 0, + properties: { topicAlias: 1 } + }) + // use topicAlias + serverClient.publish({ + messageId: 0, + topic: '', + payload: 'Message', + qos: 0, + properties: { topicAlias: 1 } + }) + }) + }).listen(ports.PORTAND103) + + client.on('message', function (topic, messagee, packet) { + switch (publishCount++) { + case 0: + assert.strictEqual(topic, 'test1') + assert.strictEqual(packet.topic, 'test1') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 1: + assert.strictEqual(topic, 'test1') + assert.strictEqual(packet.topic, '') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 2: + assert.strictEqual(topic, 'test2') + assert.strictEqual(packet.topic, 'test2') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 3: + assert.strictEqual(topic, 'test2') + assert.strictEqual(packet.topic, '') + assert.strictEqual(packet.properties.topicAlias, 1) + server103.close() + client.end(true, done) + break + } + }) + }) + + it('registered topic alias should automatically used if autoUseTopicAlias is true', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + autoUseTopicAlias: true + } + var client = mqtt.connect(opts) + + var publishCount = 0 + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + serverClient.connack({ + reasonCode: 0, + properties: { + topicAliasMaximum: 3 + } + }) + }) + serverClient.on('publish', function (packet) { + switch (publishCount++) { + case 0: + assert.strictEqual(packet.topic, 'test1') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 1: + assert.strictEqual(packet.topic, '') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 2: + assert.strictEqual(packet.topic, '') + assert.strictEqual(packet.properties.topicAlias, 1) + server103.close() + client.end(true, done) + break + } + }) + }).listen(ports.PORTAND103) + + client.on('connect', function () { + // register topicAlias + client.publish('test1', 'Message', { properties: { topicAlias: 1 } }) + // use topicAlias + client.publish('', 'Message', { properties: { topicAlias: 1 } }) + // use topicAlias by autoApplyTopicAlias + client.publish('test1', 'Message') + }) + }) + + it('topicAlias is automatically used if autoAssignTopicAlias is true', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + autoAssignTopicAlias: true + } + var client = mqtt.connect(opts) + + var publishCount = 0 + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + serverClient.connack({ + reasonCode: 0, + properties: { + topicAliasMaximum: 3 + } + }) + }) + serverClient.on('publish', function (packet) { + switch (publishCount++) { + case 0: + assert.strictEqual(packet.topic, 'test1') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 1: + assert.strictEqual(packet.topic, 'test2') + assert.strictEqual(packet.properties.topicAlias, 2) + break + case 2: + assert.strictEqual(packet.topic, 'test3') + assert.strictEqual(packet.properties.topicAlias, 3) + break + case 3: + assert.strictEqual(packet.topic, '') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 4: + assert.strictEqual(packet.topic, '') + assert.strictEqual(packet.properties.topicAlias, 3) + break + case 5: + assert.strictEqual(packet.topic, 'test4') + assert.strictEqual(packet.properties.topicAlias, 2) + server103.close() + client.end(true, done) + break + } + }) + }).listen(ports.PORTAND103) + + client.on('connect', function () { + // register topicAlias + client.publish('test1', 'Message') + client.publish('test2', 'Message') + client.publish('test3', 'Message') + + // use topicAlias + client.publish('test1', 'Message') + client.publish('test3', 'Message') + + // renew LRU topicAlias + client.publish('test4', 'Message') + }) + }) + + it('topicAlias should be removed and topic restored on resend', function (done) { + this.timeout(15000) + + var incomingStore = new mqtt.Store({ clean: false }) + var outgoingStore = new mqtt.Store({ clean: false }) + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + clientId: 'cid1', + incomingStore: incomingStore, + outgoingStore: outgoingStore, + clean: false, + reconnectPeriod: 100 + } + var client = mqtt.connect(opts) + + var connectCount = 0 + var publishCount = 0 + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + switch (connectCount++) { + case 0: + serverClient.connack({ + reasonCode: 0, + sessionPresent: false, + properties: { + topicAliasMaximum: 3 + } + }) + break + case 1: + serverClient.connack({ + reasonCode: 0, + sessionPresent: true, + properties: { + topicAliasMaximum: 3 + } + }) + break + } + }) + serverClient.on('publish', function (packet) { + switch (publishCount++) { + case 0: + assert.strictEqual(packet.topic, 'test1') + assert.strictEqual(packet.properties.topicAlias, 1) + break + case 1: + assert.strictEqual(packet.topic, '') + assert.strictEqual(packet.properties.topicAlias, 1) + setImmediate(function () { + serverClient.stream.destroy() + }) + break + case 2: + assert.strictEqual(packet.topic, 'test1') + var alias1 + if (packet.properties) { + alias1 = packet.properties.topicAlias + } + assert.strictEqual(alias1, undefined) + serverClient.puback({messageId: packet.messageId}) + break + case 3: + assert.strictEqual(packet.topic, 'test1') + var alias2 + if (packet.properties) { + alias2 = packet.properties.topicAlias + } + assert.strictEqual(alias2, undefined) + serverClient.puback({messageId: packet.messageId}) + server103.close() + client.end(true, done) + break + } + }) + }).listen(ports.PORTAND103) + + client.once('connect', function () { + // register topicAlias + client.publish('test1', 'Message', { qos: 1, properties: { topicAlias: 1 } }) + // use topicAlias + client.publish('', 'Message', { qos: 1, properties: { topicAlias: 1 } }) + }) + }) + + it('topicAlias should be removed and topic restored on offline publish', function (done) { + this.timeout(15000) + + var incomingStore = new mqtt.Store({ clean: false }) + var outgoingStore = new mqtt.Store({ clean: false }) + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + clientId: 'cid1', + incomingStore: incomingStore, + outgoingStore: outgoingStore, + clean: false, + reconnectPeriod: 100 + } + var client = mqtt.connect(opts) + + var connectCount = 0 + var publishCount = 0 + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + switch (connectCount++) { + case 0: + serverClient.connack({ + reasonCode: 0, + sessionPresent: false, + properties: { + topicAliasMaximum: 3 + } + }) + setImmediate(function () { + serverClient.stream.destroy() + }) + break + case 1: + serverClient.connack({ + reasonCode: 0, + sessionPresent: true, + properties: { + topicAliasMaximum: 3 + } + }) + break + } + }) + serverClient.on('publish', function (packet) { + switch (publishCount++) { + case 0: + assert.strictEqual(packet.topic, 'test1') + var alias1 + if (packet.properties) { + alias1 = packet.properties.topicAlias + } + assert.strictEqual(alias1, undefined) + assert.strictEqual(packet.qos, 1) + serverClient.puback({messageId: packet.messageId}) + break + case 1: + assert.strictEqual(packet.topic, 'test1') + var alias2 + if (packet.properties) { + alias2 = packet.properties.topicAlias + } + assert.strictEqual(alias2, undefined) + assert.strictEqual(packet.qos, 0) + break + case 2: + assert.strictEqual(packet.topic, 'test1') + var alias3 + if (packet.properties) { + alias3 = packet.properties.topicAlias + } + assert.strictEqual(alias3, undefined) + assert.strictEqual(packet.qos, 0) + server103.close() + client.end(true, done) + break + } + }) + }).listen(ports.PORTAND103) + + client.once('close', function () { + // register topicAlias + client.publish('test1', 'Message', { qos: 0, properties: { topicAlias: 1 } }) + // use topicAlias + client.publish('', 'Message', { qos: 0, properties: { topicAlias: 1 } }) + client.publish('', 'Message', { qos: 1, properties: { topicAlias: 1 } }) + }) + }) + + it('should error cb call if PUBLISH out of range topicAlias', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5 + } + var client = mqtt.connect(opts) + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + serverClient.connack({ + reasonCode: 0, + sessionPresent: false, + properties: { + topicAliasMaximum: 3 } }) }) - var opts = {host: 'localhost', port: ports.PORTAND115, protocolVersion: 5, properties: test.properties} - var client = mqtt.connect(opts) - client.publish('t/h', 'Message', { properties: { topicAlias: 22 } }) + }).listen(ports.PORTAND103) + + client.on('connect', function () { + // register topicAlias + client.publish( + 'test1', + 'Message', + { properties: { topicAlias: 4 } }, + function (error) { + assert.strictEqual(error.message, 'Sending Topic Alias out of range') + server103.close() + client.end(true, done) + }) + }) + }) + + it('should error cb call if PUBLISH out of range topicAlias on topicAlias disabled by broker', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5 + } + var client = mqtt.connect(opts) + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + serverClient.connack({ + reasonCode: 0, + sessionPresent: false + }) + }) + }).listen(ports.PORTAND103) + + client.on('connect', function () { + // register topicAlias + client.publish( + 'test1', + 'Message', + { properties: { topicAlias: 1 } }, + function (error) { + assert.strictEqual(error.message, 'Sending Topic Alias out of range') + server103.close() + client.end(true, done) + }) + }) + }) + + it('should throw an error if broker PUBLISH out of range topicAlias', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + topicAliasMaximum: 3 + } + var client = mqtt.connect(opts) + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + serverClient.connack({ + reasonCode: 0, + sessionPresent: false + }) + // register out of range topicAlias + serverClient.publish({ + messageId: 0, + topic: 'test1', + payload: 'Message', + qos: 0, + properties: { topicAlias: 4 } + }) + }) + }).listen(ports.PORTAND103) + + client.on('error', function (error) { + assert.strictEqual(error.message, 'Received Topic Alias is out of range') + server103.close() + client.end(true, done) + }) + }) + + it('should throw an error if broker PUBLISH topicAlias:0', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + topicAliasMaximum: 3 + } + var client = mqtt.connect(opts) + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + serverClient.connack({ + reasonCode: 0, + sessionPresent: false + }) + // register out of range topicAlias + serverClient.publish({ + messageId: 0, + topic: 'test1', + payload: 'Message', + qos: 0, + properties: { topicAlias: 0 } + }) + }) + }).listen(ports.PORTAND103) + + client.on('error', function (error) { + assert.strictEqual(error.message, 'Received Topic Alias is out of range') + server103.close() + client.end(true, done) + }) + }) + + it('should throw an error if broker PUBLISH unregistered topicAlias', function (done) { + this.timeout(15000) + + var opts = { + host: 'localhost', + port: ports.PORTAND103, + protocolVersion: 5, + topicAliasMaximum: 3 + } + var client = mqtt.connect(opts) + var server103 = new MqttServer(function (serverClient) { + serverClient.on('connect', function (packet) { + serverClient.connack({ + reasonCode: 0, + sessionPresent: false + }) + // register out of range topicAlias + serverClient.publish({ + messageId: 0, + topic: '', // use topic alias + payload: 'Message', + qos: 0, + properties: { topicAlias: 1 } // in range topic alias + }) + }) + }).listen(ports.PORTAND103) + + client.on('error', function (error) { + assert.strictEqual(error.message, 'Received unregistered Topic Alias') + server103.close() + client.end(true, done) }) }) @@ -85,7 +604,6 @@ describe('MQTT 5.0', function () { serverClient.connack({ reasonCode: 0, properties: { - topicAliasMaximum: 15, serverKeepAlive: 16, maximumPacketSize: 95 } @@ -105,7 +623,6 @@ describe('MQTT 5.0', function () { var client = mqtt.connect(opts) client.on('connect', function () { assert.strictEqual(client.options.keepalive, 16) - assert.strictEqual(client.options.properties.topicAliasMaximum, 15) assert.strictEqual(client.options.properties.maximumPacketSize, 95) server116.close() client.end(true, done) diff --git a/test/helpers/port_list.js b/test/helpers/port_list.js index 89648b3c0..dc77ef07a 100644 --- a/test/helpers/port_list.js +++ b/test/helpers/port_list.js @@ -11,6 +11,7 @@ var PORTAND48 = PORT + 48 var PORTAND49 = PORT + 49 var PORTAND50 = PORT + 50 var PORTAND72 = PORT + 72 +var PORTAND103 = PORT + 103 var PORTAND114 = PORT + 114 var PORTAND115 = PORT + 115 var PORTAND116 = PORT + 116 @@ -36,6 +37,7 @@ module.exports = { PORTAND49, PORTAND50, PORTAND72, + PORTAND103, PORTAND114, PORTAND115, PORTAND116,