diff --git a/README.md b/README.md
index b4497dc97..478a7a722 100644
--- a/README.md
+++ b/README.md
@@ -206,7 +206,41 @@ the final connection when it drops.
The default value is 1000 ms which means it will try to reconnect 1 second
after losing the connection.
+
+## About Topic Alias management
+### Enabling automatic Topic Alias using
+If the client sets the option `autoUseTopicAlias:true` then MQTT.js uses existing topic alias automatically.
+
+example scenario:
+```
+1. PUBLISH topic:'t1', ta:1 (register)
+2. PUBLISH topic:'t1' -> topic:'', ta:1 (auto use existing map entry)
+3. PUBLISH topic:'t2', ta:1 (register overwrite)
+4. PUBLISH topic:'t2' -> topic:'', ta:1 (auto use existing map entry based on the receent map)
+5. PUBLISH topic:'t1' (t1 is no longer mapped to ta:1)
+```
+
+User doesn't need to manage which topic is mapped to which topic alias.
+If the user want to register topic alias, then publish topic with topic alias.
+If the user want to use topic alias, then publish topic without topic alias. If there is a mapped topic alias then added it as a property and update the topic to empty string.
+
+### Enabling automatic Topic Alias assign
+
+If the client sets the option `autoAssignTopicAlias:true` then MQTT.js uses existing topic alias automatically.
+If no topic alias exists, then assign a new vacant topic alias automatically. If topic alias is fully used, then LRU(Least Recently Used) topic-alias entry is overwritten.
+
+example scenario:
+```
+The broker returns CONNACK (TopicAliasMaximum:3)
+1. PUBLISH topic:'t1' -> 't1', ta:1 (auto assign t1:1 and register)
+2. PUBLISH topic:'t1' -> '' , ta:1 (auto use existing map entry)
+3. PUBLISH topic:'t2' -> 't2', ta:2 (auto assign t1:2 and register. 2 was vacant)
+4. PUBLISH topic:'t3' -> 't3', ta:3 (auto assign t1:3 and register. 3 was vacant)
+5. PUBLISH topic:'t4' -> 't4', ta:1 (LRU entry is overwritten)
+```
+
+Also user can manually register topic-alias pair using PUBLISH topic:'some', ta:X. It works well with automatic topic alias assign.
## API
@@ -291,6 +325,8 @@ the `connect` event. Typically a `net.Socket`.
```js
customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/}
```
+ * `autoUseTopicAlias`: enabling automatic Topic Alias using functionality
+ * `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality
* `properties`: properties MQTT 5.0.
`object` that supports the following properties:
* `sessionExpiryInterval`: representing the Session Expiry Interval in seconds `number`,
@@ -661,7 +697,7 @@ npm install browserify
npm install tinyify
cd node_modules/mqtt/
npm install .
-npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
+npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
# show size for compressed browser transfer
gzip 0) {
+ if (options.topicAliasMaximum > 0xffff) {
+ debug('MqttClient :: options.topicAliasMaximum is out of range')
+ } else {
+ this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum)
+ }
+ }
+
// Send queued packets
this.on('connect', function () {
var queue = this.queue
@@ -282,6 +380,10 @@ function MqttClient (streamBuilder, options) {
that.pingTimer = null
}
+ if (this.topicAliasRecv) {
+ this.topicAliasRecv.clear()
+ }
+
debug('close :: calling _setupReconnect')
this._setupReconnect()
})
@@ -378,6 +480,14 @@ MqttClient.prototype._setupStream = function () {
debug('_setupStream: sending packet `connect`')
connectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'
+ if (this.topicAliasRecv) {
+ if (!connectPacket.properties) {
+ connectPacket.properties = {}
+ }
+ if (this.topicAliasRecv) {
+ connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
+ }
+ }
// avoid message queue
sendPacket(this, connectPacket)
@@ -526,17 +636,6 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
if (options.protocolVersion === 5) {
packet.properties = opts.properties
- if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
- ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
- (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
- /*
- if we are don`t setup topic alias or
- topic alias maximum less than topic alias or
- server don`t give topic alias maximum,
- we are removing topic alias from packet
- */
- delete packet.properties.topicAlias
- }
}
debug('publish :: qos', opts.qos)
@@ -1102,6 +1201,13 @@ MqttClient.prototype._cleanUp = function (forced, done) {
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
debug('_sendPacket :: (%s) :: start', this.options.clientId)
cbStorePut = cbStorePut || nop
+ cb = cb || nop
+
+ var err = applyTopicAlias(this, packet)
+ if (err) {
+ cb(err)
+ return
+ }
if (!this.connected) {
debug('_sendPacket :: client not connected. Storing packet offline.')
@@ -1154,12 +1260,20 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
debug('_storePacket :: cb? %s', !!cb)
cbStorePut = cbStorePut || nop
+ var storePacket = packet
+ if (storePacket.cmd === 'publish') {
+ storePacket = clone(packet)
+ var err = removeTopicAlias(this, storePacket)
+ if (err) {
+ return cb && cb(err)
+ }
+ }
// check that the packet is not a qos of 0, or that the command is not a publish
- if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
- this.queue.push({ packet: packet, cb: cb })
- } else if (packet.qos > 0) {
- cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
- this.outgoingStore.put(packet, function (err) {
+ if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
+ this.queue.push({ packet: storePacket, cb: cb })
+ } else if (storePacket.qos > 0) {
+ cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
+ this.outgoingStore.put(storePacket, function (err) {
if (err) {
return cb && cb(err)
}
@@ -1237,11 +1351,17 @@ MqttClient.prototype._handleConnack = function (packet) {
var rc = version === 5 ? packet.reasonCode : packet.returnCode
clearTimeout(this.connackTimer)
+ delete this.topicAliasSend
if (packet.properties) {
if (packet.properties.topicAliasMaximum) {
- if (!options.properties) { options.properties = {} }
- options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
+ if (packet.properties.topicAliasMaximum > 0xffff) {
+ this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
+ return
+ }
+ if (packet.properties.topicAliasMaximum > 0) {
+ this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
+ }
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
@@ -1303,6 +1423,39 @@ MqttClient.prototype._handlePublish = function (packet, done) {
var that = this
var options = this.options
var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
+ if (this.options.protocolVersion === 5) {
+ var alias
+ if (packet.properties) {
+ alias = packet.properties.topicAlias
+ }
+ if (typeof alias !== 'undefined') {
+ if (topic.length === 0) {
+ if (alias > 0 && alias <= 0xffff) {
+ var gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
+ if (gotTopic) {
+ topic = gotTopic
+ debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
+ } else {
+ debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
+ this.emit('error', new Error('Received unregistered Topic Alias'))
+ return
+ }
+ } else {
+ debug('_handlePublish :: topic alias out of range. alias: %d', alias)
+ this.emit('error', new Error('Received Topic Alias is out of range'))
+ return
+ }
+ } else {
+ if (this.topicAliasRecv.put(topic, alias)) {
+ debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
+ } else {
+ debug('_handlePublish :: topic alias out of range. alias: %d', alias)
+ this.emit('error', new Error('Received Topic Alias is out of range'))
+ return
+ }
+ }
+ }
+ }
debug('_handlePublish: qos %d', qos)
switch (qos) {
case 2: {
diff --git a/lib/topic-alias-recv.js b/lib/topic-alias-recv.js
new file mode 100644
index 000000000..553341100
--- /dev/null
+++ b/lib/topic-alias-recv.js
@@ -0,0 +1,47 @@
+'use strict'
+
+/**
+ * Topic Alias receiving manager
+ * This holds alias to topic map
+ * @param {Number} [max] - topic alias maximum entries
+ */
+function TopicAliasRecv (max) {
+ if (!(this instanceof TopicAliasRecv)) {
+ return new TopicAliasRecv(max)
+ }
+ this.aliasToTopic = {}
+ this.max = max
+}
+
+/**
+ * Insert or update topic - alias entry.
+ * @param {String} [topic] - topic
+ * @param {Number} [alias] - topic alias
+ * @returns {Boolean} - if success return true otherwise false
+ */
+TopicAliasRecv.prototype.put = function (topic, alias) {
+ if (alias === 0 || alias > this.max) {
+ return false
+ }
+ this.aliasToTopic[alias] = topic
+ this.length = Object.keys(this.aliasToTopic).length
+ return true
+}
+
+/**
+ * Get topic by alias
+ * @param {String} [topic] - topic
+ * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
+ */
+TopicAliasRecv.prototype.getTopicByAlias = function (alias) {
+ return this.aliasToTopic[alias]
+}
+
+/**
+ * Clear all entries
+ */
+TopicAliasRecv.prototype.clear = function () {
+ this.aliasToTopic = {}
+}
+
+module.exports = TopicAliasRecv
diff --git a/lib/topic-alias-send.js b/lib/topic-alias-send.js
new file mode 100644
index 000000000..34aef3504
--- /dev/null
+++ b/lib/topic-alias-send.js
@@ -0,0 +1,93 @@
+'use strict'
+
+/**
+ * Module dependencies
+ */
+var LruMap = require('collections/lru-map')
+var NumberAllocator = require('number-allocator').NumberAllocator
+
+/**
+ * Topic Alias sending manager
+ * This holds both topic to alias and alias to topic map
+ * @param {Number} [max] - topic alias maximum entries
+ */
+function TopicAliasSend (max) {
+ if (!(this instanceof TopicAliasSend)) {
+ return new TopicAliasSend(max)
+ }
+
+ if (max > 0) {
+ this.aliasToTopic = new LruMap()
+ this.topicToAlias = {}
+ this.numberAllocator = new NumberAllocator(1, max)
+ this.max = max
+ this.length = 0
+ }
+}
+
+/**
+ * Insert or update topic - alias entry.
+ * @param {String} [topic] - topic
+ * @param {Number} [alias] - topic alias
+ * @returns {Boolean} - if success return true otherwise false
+ */
+TopicAliasSend.prototype.put = function (topic, alias) {
+ if (alias === 0 || alias > this.max) {
+ return false
+ }
+ const entry = this.aliasToTopic.get(alias)
+ if (entry) {
+ delete this.topicToAlias[entry.topic]
+ }
+ this.aliasToTopic.set(alias, {'topic': topic, 'alias': alias})
+ this.topicToAlias[topic] = alias
+ this.numberAllocator.use(alias)
+ this.length = this.aliasToTopic.length
+ return true
+}
+
+/**
+ * Get topic by alias
+ * @param {Number} [alias] - topic alias
+ * @returns {String} - if mapped topic exists return topic, otherwise return undefined
+ */
+TopicAliasSend.prototype.getTopicByAlias = function (alias) {
+ const entry = this.aliasToTopic.get(alias)
+ if (typeof entry === 'undefined') return entry
+ return entry.topic
+}
+
+/**
+ * Get topic by alias
+ * @param {String} [topic] - topic
+ * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
+ */
+TopicAliasSend.prototype.getAliasByTopic = function (topic) {
+ const alias = this.topicToAlias[topic]
+ if (typeof alias !== 'undefined') {
+ this.aliasToTopic.get(alias) // LRU update
+ }
+ return alias
+}
+
+/**
+ * Clear all entries
+ */
+TopicAliasSend.prototype.clear = function () {
+ this.aliasToTopic.clear()
+ this.topicToAlias = {}
+ this.numberAllocator.clear()
+ this.length = 0
+}
+
+/**
+ * Get LRU topic alias
+ * @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias
+ */
+TopicAliasSend.prototype.getLruAlias = function () {
+ const alias = this.numberAllocator.firstVacant()
+ if (alias) return alias
+ return this.aliasToTopic.min().alias
+}
+
+module.exports = TopicAliasSend
diff --git a/package.json b/package.json
index 3be7ba77e..0f9261059 100644
--- a/package.json
+++ b/package.json
@@ -62,6 +62,7 @@
"net": false
},
"dependencies": {
+ "collections": "^5.1.12",
"commist": "^1.0.0",
"concat-stream": "^2.0.0",
"debug": "^4.1.1",
@@ -73,6 +74,7 @@
"number-allocator": "^1.0.7",
"pump": "^3.0.0",
"readable-stream": "^3.6.0",
+ "rfdc": "^1.3.0",
"reinterval": "^1.1.0",
"split2": "^3.1.0",
"ws": "^7.5.0",
diff --git a/test/client_mqtt5.js b/test/client_mqtt5.js
index 48e1bcb6a..b446628de 100644
--- a/test/client_mqtt5.js
+++ b/test/client_mqtt5.js
@@ -13,29 +13,548 @@ describe('MQTT 5.0', function () {
abstractClientTests(server, config)
- // var server = serverBuilder().listen(ports.PORTAND115)
-
- var topicAliasTests = [
- {properties: {}, name: 'should allow any topicAlias when no topicAliasMaximum provided in settings'},
- {properties: { topicAliasMaximum: 15 }, name: 'should not allow topicAlias > topicAliasMaximum when topicAliasMaximum provided in settings'}
- ]
-
- topicAliasTests.forEach(function (test) {
- it(test.name, function (done) {
- this.timeout(15000)
- server.once('client', function (serverClient) {
- serverClient.on('publish', function (packet) {
- if (packet.properties && packet.properties.topicAlias) {
- done(new Error('Packet should not have topicAlias'))
- return false
- } else {
- serverClient.end(done)
+ it('topic should be complemented on receive', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ topicAliasMaximum: 3
+ }
+ var client = mqtt.connect(opts)
+ var publishCount = 0
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ assert.strictEqual(packet.properties.topicAliasMaximum, 3)
+ serverClient.connack({
+ reasonCode: 0
+ })
+ // register topicAlias
+ serverClient.publish({
+ messageId: 0,
+ topic: 'test1',
+ payload: 'Message',
+ qos: 0,
+ properties: { topicAlias: 1 }
+ })
+ // use topicAlias
+ serverClient.publish({
+ messageId: 0,
+ topic: '',
+ payload: 'Message',
+ qos: 0,
+ properties: { topicAlias: 1 }
+ })
+ // overwrite register topicAlias
+ serverClient.publish({
+ messageId: 0,
+ topic: 'test2',
+ payload: 'Message',
+ qos: 0,
+ properties: { topicAlias: 1 }
+ })
+ // use topicAlias
+ serverClient.publish({
+ messageId: 0,
+ topic: '',
+ payload: 'Message',
+ qos: 0,
+ properties: { topicAlias: 1 }
+ })
+ })
+ }).listen(ports.PORTAND103)
+
+ client.on('message', function (topic, messagee, packet) {
+ switch (publishCount++) {
+ case 0:
+ assert.strictEqual(topic, 'test1')
+ assert.strictEqual(packet.topic, 'test1')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 1:
+ assert.strictEqual(topic, 'test1')
+ assert.strictEqual(packet.topic, '')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 2:
+ assert.strictEqual(topic, 'test2')
+ assert.strictEqual(packet.topic, 'test2')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 3:
+ assert.strictEqual(topic, 'test2')
+ assert.strictEqual(packet.topic, '')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ server103.close()
+ client.end(true, done)
+ break
+ }
+ })
+ })
+
+ it('registered topic alias should automatically used if autoUseTopicAlias is true', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ autoUseTopicAlias: true
+ }
+ var client = mqtt.connect(opts)
+
+ var publishCount = 0
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ serverClient.connack({
+ reasonCode: 0,
+ properties: {
+ topicAliasMaximum: 3
+ }
+ })
+ })
+ serverClient.on('publish', function (packet) {
+ switch (publishCount++) {
+ case 0:
+ assert.strictEqual(packet.topic, 'test1')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 1:
+ assert.strictEqual(packet.topic, '')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 2:
+ assert.strictEqual(packet.topic, '')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ server103.close()
+ client.end(true, done)
+ break
+ }
+ })
+ }).listen(ports.PORTAND103)
+
+ client.on('connect', function () {
+ // register topicAlias
+ client.publish('test1', 'Message', { properties: { topicAlias: 1 } })
+ // use topicAlias
+ client.publish('', 'Message', { properties: { topicAlias: 1 } })
+ // use topicAlias by autoApplyTopicAlias
+ client.publish('test1', 'Message')
+ })
+ })
+
+ it('topicAlias is automatically used if autoAssignTopicAlias is true', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ autoAssignTopicAlias: true
+ }
+ var client = mqtt.connect(opts)
+
+ var publishCount = 0
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ serverClient.connack({
+ reasonCode: 0,
+ properties: {
+ topicAliasMaximum: 3
+ }
+ })
+ })
+ serverClient.on('publish', function (packet) {
+ switch (publishCount++) {
+ case 0:
+ assert.strictEqual(packet.topic, 'test1')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 1:
+ assert.strictEqual(packet.topic, 'test2')
+ assert.strictEqual(packet.properties.topicAlias, 2)
+ break
+ case 2:
+ assert.strictEqual(packet.topic, 'test3')
+ assert.strictEqual(packet.properties.topicAlias, 3)
+ break
+ case 3:
+ assert.strictEqual(packet.topic, '')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 4:
+ assert.strictEqual(packet.topic, '')
+ assert.strictEqual(packet.properties.topicAlias, 3)
+ break
+ case 5:
+ assert.strictEqual(packet.topic, 'test4')
+ assert.strictEqual(packet.properties.topicAlias, 2)
+ server103.close()
+ client.end(true, done)
+ break
+ }
+ })
+ }).listen(ports.PORTAND103)
+
+ client.on('connect', function () {
+ // register topicAlias
+ client.publish('test1', 'Message')
+ client.publish('test2', 'Message')
+ client.publish('test3', 'Message')
+
+ // use topicAlias
+ client.publish('test1', 'Message')
+ client.publish('test3', 'Message')
+
+ // renew LRU topicAlias
+ client.publish('test4', 'Message')
+ })
+ })
+
+ it('topicAlias should be removed and topic restored on resend', function (done) {
+ this.timeout(15000)
+
+ var incomingStore = new mqtt.Store({ clean: false })
+ var outgoingStore = new mqtt.Store({ clean: false })
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ clientId: 'cid1',
+ incomingStore: incomingStore,
+ outgoingStore: outgoingStore,
+ clean: false,
+ reconnectPeriod: 100
+ }
+ var client = mqtt.connect(opts)
+
+ var connectCount = 0
+ var publishCount = 0
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ switch (connectCount++) {
+ case 0:
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: false,
+ properties: {
+ topicAliasMaximum: 3
+ }
+ })
+ break
+ case 1:
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: true,
+ properties: {
+ topicAliasMaximum: 3
+ }
+ })
+ break
+ }
+ })
+ serverClient.on('publish', function (packet) {
+ switch (publishCount++) {
+ case 0:
+ assert.strictEqual(packet.topic, 'test1')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ break
+ case 1:
+ assert.strictEqual(packet.topic, '')
+ assert.strictEqual(packet.properties.topicAlias, 1)
+ setImmediate(function () {
+ serverClient.stream.destroy()
+ })
+ break
+ case 2:
+ assert.strictEqual(packet.topic, 'test1')
+ var alias1
+ if (packet.properties) {
+ alias1 = packet.properties.topicAlias
+ }
+ assert.strictEqual(alias1, undefined)
+ serverClient.puback({messageId: packet.messageId})
+ break
+ case 3:
+ assert.strictEqual(packet.topic, 'test1')
+ var alias2
+ if (packet.properties) {
+ alias2 = packet.properties.topicAlias
+ }
+ assert.strictEqual(alias2, undefined)
+ serverClient.puback({messageId: packet.messageId})
+ server103.close()
+ client.end(true, done)
+ break
+ }
+ })
+ }).listen(ports.PORTAND103)
+
+ client.once('connect', function () {
+ // register topicAlias
+ client.publish('test1', 'Message', { qos: 1, properties: { topicAlias: 1 } })
+ // use topicAlias
+ client.publish('', 'Message', { qos: 1, properties: { topicAlias: 1 } })
+ })
+ })
+
+ it('topicAlias should be removed and topic restored on offline publish', function (done) {
+ this.timeout(15000)
+
+ var incomingStore = new mqtt.Store({ clean: false })
+ var outgoingStore = new mqtt.Store({ clean: false })
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ clientId: 'cid1',
+ incomingStore: incomingStore,
+ outgoingStore: outgoingStore,
+ clean: false,
+ reconnectPeriod: 100
+ }
+ var client = mqtt.connect(opts)
+
+ var connectCount = 0
+ var publishCount = 0
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ switch (connectCount++) {
+ case 0:
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: false,
+ properties: {
+ topicAliasMaximum: 3
+ }
+ })
+ setImmediate(function () {
+ serverClient.stream.destroy()
+ })
+ break
+ case 1:
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: true,
+ properties: {
+ topicAliasMaximum: 3
+ }
+ })
+ break
+ }
+ })
+ serverClient.on('publish', function (packet) {
+ switch (publishCount++) {
+ case 0:
+ assert.strictEqual(packet.topic, 'test1')
+ var alias1
+ if (packet.properties) {
+ alias1 = packet.properties.topicAlias
+ }
+ assert.strictEqual(alias1, undefined)
+ assert.strictEqual(packet.qos, 1)
+ serverClient.puback({messageId: packet.messageId})
+ break
+ case 1:
+ assert.strictEqual(packet.topic, 'test1')
+ var alias2
+ if (packet.properties) {
+ alias2 = packet.properties.topicAlias
+ }
+ assert.strictEqual(alias2, undefined)
+ assert.strictEqual(packet.qos, 0)
+ break
+ case 2:
+ assert.strictEqual(packet.topic, 'test1')
+ var alias3
+ if (packet.properties) {
+ alias3 = packet.properties.topicAlias
+ }
+ assert.strictEqual(alias3, undefined)
+ assert.strictEqual(packet.qos, 0)
+ server103.close()
+ client.end(true, done)
+ break
+ }
+ })
+ }).listen(ports.PORTAND103)
+
+ client.once('close', function () {
+ // register topicAlias
+ client.publish('test1', 'Message', { qos: 0, properties: { topicAlias: 1 } })
+ // use topicAlias
+ client.publish('', 'Message', { qos: 0, properties: { topicAlias: 1 } })
+ client.publish('', 'Message', { qos: 1, properties: { topicAlias: 1 } })
+ })
+ })
+
+ it('should error cb call if PUBLISH out of range topicAlias', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5
+ }
+ var client = mqtt.connect(opts)
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: false,
+ properties: {
+ topicAliasMaximum: 3
}
})
})
- var opts = {host: 'localhost', port: ports.PORTAND115, protocolVersion: 5, properties: test.properties}
- var client = mqtt.connect(opts)
- client.publish('t/h', 'Message', { properties: { topicAlias: 22 } })
+ }).listen(ports.PORTAND103)
+
+ client.on('connect', function () {
+ // register topicAlias
+ client.publish(
+ 'test1',
+ 'Message',
+ { properties: { topicAlias: 4 } },
+ function (error) {
+ assert.strictEqual(error.message, 'Sending Topic Alias out of range')
+ server103.close()
+ client.end(true, done)
+ })
+ })
+ })
+
+ it('should error cb call if PUBLISH out of range topicAlias on topicAlias disabled by broker', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5
+ }
+ var client = mqtt.connect(opts)
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: false
+ })
+ })
+ }).listen(ports.PORTAND103)
+
+ client.on('connect', function () {
+ // register topicAlias
+ client.publish(
+ 'test1',
+ 'Message',
+ { properties: { topicAlias: 1 } },
+ function (error) {
+ assert.strictEqual(error.message, 'Sending Topic Alias out of range')
+ server103.close()
+ client.end(true, done)
+ })
+ })
+ })
+
+ it('should throw an error if broker PUBLISH out of range topicAlias', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ topicAliasMaximum: 3
+ }
+ var client = mqtt.connect(opts)
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: false
+ })
+ // register out of range topicAlias
+ serverClient.publish({
+ messageId: 0,
+ topic: 'test1',
+ payload: 'Message',
+ qos: 0,
+ properties: { topicAlias: 4 }
+ })
+ })
+ }).listen(ports.PORTAND103)
+
+ client.on('error', function (error) {
+ assert.strictEqual(error.message, 'Received Topic Alias is out of range')
+ server103.close()
+ client.end(true, done)
+ })
+ })
+
+ it('should throw an error if broker PUBLISH topicAlias:0', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ topicAliasMaximum: 3
+ }
+ var client = mqtt.connect(opts)
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: false
+ })
+ // register out of range topicAlias
+ serverClient.publish({
+ messageId: 0,
+ topic: 'test1',
+ payload: 'Message',
+ qos: 0,
+ properties: { topicAlias: 0 }
+ })
+ })
+ }).listen(ports.PORTAND103)
+
+ client.on('error', function (error) {
+ assert.strictEqual(error.message, 'Received Topic Alias is out of range')
+ server103.close()
+ client.end(true, done)
+ })
+ })
+
+ it('should throw an error if broker PUBLISH unregistered topicAlias', function (done) {
+ this.timeout(15000)
+
+ var opts = {
+ host: 'localhost',
+ port: ports.PORTAND103,
+ protocolVersion: 5,
+ topicAliasMaximum: 3
+ }
+ var client = mqtt.connect(opts)
+ var server103 = new MqttServer(function (serverClient) {
+ serverClient.on('connect', function (packet) {
+ serverClient.connack({
+ reasonCode: 0,
+ sessionPresent: false
+ })
+ // register out of range topicAlias
+ serverClient.publish({
+ messageId: 0,
+ topic: '', // use topic alias
+ payload: 'Message',
+ qos: 0,
+ properties: { topicAlias: 1 } // in range topic alias
+ })
+ })
+ }).listen(ports.PORTAND103)
+
+ client.on('error', function (error) {
+ assert.strictEqual(error.message, 'Received unregistered Topic Alias')
+ server103.close()
+ client.end(true, done)
})
})
@@ -85,7 +604,6 @@ describe('MQTT 5.0', function () {
serverClient.connack({
reasonCode: 0,
properties: {
- topicAliasMaximum: 15,
serverKeepAlive: 16,
maximumPacketSize: 95
}
@@ -105,7 +623,6 @@ describe('MQTT 5.0', function () {
var client = mqtt.connect(opts)
client.on('connect', function () {
assert.strictEqual(client.options.keepalive, 16)
- assert.strictEqual(client.options.properties.topicAliasMaximum, 15)
assert.strictEqual(client.options.properties.maximumPacketSize, 95)
server116.close()
client.end(true, done)
diff --git a/test/helpers/port_list.js b/test/helpers/port_list.js
index 89648b3c0..dc77ef07a 100644
--- a/test/helpers/port_list.js
+++ b/test/helpers/port_list.js
@@ -11,6 +11,7 @@ var PORTAND48 = PORT + 48
var PORTAND49 = PORT + 49
var PORTAND50 = PORT + 50
var PORTAND72 = PORT + 72
+var PORTAND103 = PORT + 103
var PORTAND114 = PORT + 114
var PORTAND115 = PORT + 115
var PORTAND116 = PORT + 116
@@ -36,6 +37,7 @@ module.exports = {
PORTAND49,
PORTAND50,
PORTAND72,
+ PORTAND103,
PORTAND114,
PORTAND115,
PORTAND116,