Skip to content

Commit

Permalink
Refined Topic Alias support. (Implement mqttjs#1300)
Browse files Browse the repository at this point in the history
Add automatic topic alias management functionality.
- On PUBLISH sending, the client can automatic using/assin Topic
Alias (optional).
- On PUBLISH receiving, the topic parameter of on message handler is
automatically complemented but the packet.topic preserves the original
topic.

Fix invalid tests.
  • Loading branch information
redboltz committed Jul 15, 2021
1 parent d8be59e commit f8178aa
Show file tree
Hide file tree
Showing 7 changed files with 892 additions and 42 deletions.
38 changes: 37 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,41 @@ the final connection when it drops.
The default value is 1000 ms which means it will try to reconnect 1 second
after losing the connection.

<a name="topicalias"></a>
## About Topic Alias management

### Enabling automatic Topic Alias using
If the client sets the option `autoUseTopicAlias:true` then MQTT.js uses existing topic alias automatically.

example scenario:
```
1. PUBLISH topic:'t1', ta:1 (register)
2. PUBLISH topic:'t1' -> topic:'', ta:1 (auto use existing map entry)
3. PUBLISH topic:'t2', ta:1 (register overwrite)
4. PUBLISH topic:'t2' -> topic:'', ta:1 (auto use existing map entry based on the receent map)
5. PUBLISH topic:'t1' (t1 is no longer mapped to ta:1)
```

User doesn't need to manage which topic is mapped to which topic alias.
If the user want to register topic alias, then publish topic with topic alias.
If the user want to use topic alias, then publish topic without topic alias. If there is a mapped topic alias then added it as a property and update the topic to empty string.

### Enabling automatic Topic Alias assign

If the client sets the option `autoAssignTopicAlias:true` then MQTT.js uses existing topic alias automatically.
If no topic alias exists, then assign a new vacant topic alias automatically. If topic alias is fully used, then LRU(Least Recently Used) topic-alias entry is overwritten.

example scenario:
```
The broker returns CONNACK (TopicAliasMaximum:3)
1. PUBLISH topic:'t1' -> 't1', ta:1 (auto assign t1:1 and register)
2. PUBLISH topic:'t1' -> '' , ta:1 (auto use existing map entry)
3. PUBLISH topic:'t2' -> 't2', ta:2 (auto assign t1:2 and register. 2 was vacant)
4. PUBLISH topic:'t3' -> 't3', ta:3 (auto assign t1:3 and register. 3 was vacant)
5. PUBLISH topic:'t4' -> 't4', ta:1 (LRU entry is overwritten)
```

Also user can manually register topic-alias pair using PUBLISH topic:'some', ta:X. It works well with automatic topic alias assign.

<a name="api"></a>
## API
Expand Down Expand Up @@ -291,6 +325,8 @@ the `connect` event. Typically a `net.Socket`.
```js
customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/}
```
* `autoUseTopicAlias`: enabling automatic Topic Alias using functionality
* `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality
* `properties`: properties MQTT 5.0.
`object` that supports the following properties:
* `sessionExpiryInterval`: representing the Session Expiry Interval in seconds `number`,
Expand Down Expand Up @@ -661,7 +697,7 @@ npm install browserify
npm install tinyify
cd node_modules/mqtt/
npm install .
npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
# show size for compressed browser transfer
gzip <browserMqtt.js | wc -c
```
Expand Down
191 changes: 172 additions & 19 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
*/
var EventEmitter = require('events').EventEmitter
var Store = require('./store')
var TopicAliasRecv = require('./topic-alias-recv')
var TopicAliasSend = require('./topic-alias-send')
var mqttPacket = require('mqtt-packet')
var DefaultMessageIdProvider = require('./default-message-id-provider')
var Writable = require('readable-stream').Writable
var inherits = require('inherits')
var reInterval = require('reinterval')
var clone = require('rfdc/default')
var validations = require('./validations')
var xtend = require('xtend')
var debug = require('debug')('mqttjs:client')
Expand Down Expand Up @@ -88,9 +91,86 @@ function defaultId () {
return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
}

function applyTopicAlias (client, packet) {
if (client.options.protocolVersion === 5) {
if (packet.cmd === 'publish') {
var alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
var topic = packet.topic.toString()
if (client.topicAliasSend) {
if (alias) {
if (topic.length !== 0) {
// register topic alias
debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias)
if (!client.topicAliasSend.put(topic, alias)) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
} else {
if (topic.length !== 0) {
if (client.options.autoAssignTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = {...(packet.properties), topicAlias: alias}
debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias)
} else {
alias = client.topicAliasSend.getLruAlias()
client.topicAliasSend.put(topic, alias)
packet.properties = {...(packet.properties), topicAlias: alias}
debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias)
}
} else if (client.options.autoUseTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = {...(packet.properties), topicAlias: alias}
debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias)
}
}
}
}
} else if (alias) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
}
}

function removeTopicAlias (client, packet) {
// remove topic alias because it shouldn't be used on re-sending
var alias
if (packet.properties) {
alias = packet.properties.topicAlias
}

var topic = packet.topic.toString()
if (topic.length === 0) {
// restore topic from alias
if (typeof alias === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
topic = client.topicAliasSend.getTopicByAlias(alias)
if (typeof topic === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
packet.topic = topic
}
}
}
if (alias) {
delete packet.properties.topicAlias
}
}

function sendPacket (client, packet, cb) {
debug('sendPacket :: packet: %O', packet)
debug('sendPacket :: emitting `packetsend`')

client.emit('packetsend', packet)

debug('sendPacket :: writing to stream')
Expand Down Expand Up @@ -131,7 +211,16 @@ function flushVolatile (queue) {

function storeAndSend (client, packet, cb, cbStorePut) {
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
client.outgoingStore.put(packet, function storedPacket (err) {
var storePacket = packet
var err
if (storePacket.cmd === 'publish') {
storePacket = clone(packet)
err = removeTopicAlias(client, storePacket)
if (err) {
return cb && cb(err)
}
}
client.outgoingStore.put(storePacket, function storedPacket (err) {
if (err) {
return cb && cb(err)
}
Expand Down Expand Up @@ -176,6 +265,7 @@ function MqttClient (streamBuilder, options) {
debug('MqttClient :: options.keepalive', options.keepalive)
debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
debug('MqttClient :: options.topicAliasMaximum', options.topicAliasMaximum)

this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()

Expand Down Expand Up @@ -225,6 +315,14 @@ function MqttClient (streamBuilder, options) {
// True if connection is first time.
this._firstConnection = true

if (options.topicAliasMaximum > 0) {
if (options.topicAliasMaximum > 0xffff) {
debug('MqttClient :: options.topicAliasMaximum is out of range')
} else {
this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum)
}
}

// Send queued packets
this.on('connect', function () {
var queue = this.queue
Expand Down Expand Up @@ -282,6 +380,10 @@ function MqttClient (streamBuilder, options) {
that.pingTimer = null
}

if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
}

debug('close :: calling _setupReconnect')
this._setupReconnect()
})
Expand Down Expand Up @@ -378,6 +480,14 @@ MqttClient.prototype._setupStream = function () {
debug('_setupStream: sending packet `connect`')
connectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'
if (this.topicAliasRecv) {
if (!connectPacket.properties) {
connectPacket.properties = {}
}
if (this.topicAliasRecv) {
connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
}
}
// avoid message queue
sendPacket(this, connectPacket)

Expand Down Expand Up @@ -526,17 +636,6 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {

if (options.protocolVersion === 5) {
packet.properties = opts.properties
if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
(!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
/*
if we are don`t setup topic alias or
topic alias maximum less than topic alias or
server don`t give topic alias maximum,
we are removing topic alias from packet
*/
delete packet.properties.topicAlias
}
}

debug('publish :: qos', opts.qos)
Expand Down Expand Up @@ -1102,6 +1201,13 @@ MqttClient.prototype._cleanUp = function (forced, done) {
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
debug('_sendPacket :: (%s) :: start', this.options.clientId)
cbStorePut = cbStorePut || nop
cb = cb || nop

var err = applyTopicAlias(this, packet)
if (err) {
cb(err)
return
}

if (!this.connected) {
debug('_sendPacket :: client not connected. Storing packet offline.')
Expand Down Expand Up @@ -1154,12 +1260,20 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
debug('_storePacket :: cb? %s', !!cb)
cbStorePut = cbStorePut || nop

var storePacket = packet
if (storePacket.cmd === 'publish') {
storePacket = clone(packet)
var err = removeTopicAlias(this, storePacket)
if (err) {
return cb && cb(err)
}
}
// check that the packet is not a qos of 0, or that the command is not a publish
if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
this.queue.push({ packet: packet, cb: cb })
} else if (packet.qos > 0) {
cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
this.outgoingStore.put(packet, function (err) {
if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
this.queue.push({ packet: storePacket, cb: cb })
} else if (storePacket.qos > 0) {
cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
this.outgoingStore.put(storePacket, function (err) {
if (err) {
return cb && cb(err)
}
Expand Down Expand Up @@ -1237,11 +1351,17 @@ MqttClient.prototype._handleConnack = function (packet) {
var rc = version === 5 ? packet.reasonCode : packet.returnCode

clearTimeout(this.connackTimer)
delete this.topicAliasSend

if (packet.properties) {
if (packet.properties.topicAliasMaximum) {
if (!options.properties) { options.properties = {} }
options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
if (packet.properties.topicAliasMaximum > 0xffff) {
this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
return
}
if (packet.properties.topicAliasMaximum > 0) {
this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
}
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
Expand Down Expand Up @@ -1303,6 +1423,39 @@ MqttClient.prototype._handlePublish = function (packet, done) {
var that = this
var options = this.options
var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
if (this.options.protocolVersion === 5) {
var alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
if (typeof alias !== 'undefined') {
if (topic.length === 0) {
if (alias > 0 && alias <= 0xffff) {
var gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
if (gotTopic) {
topic = gotTopic
debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
this.emit('error', new Error('Received unregistered Topic Alias'))
return
}
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
} else {
if (this.topicAliasRecv.put(topic, alias)) {
debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
}
}
}
debug('_handlePublish: qos %d', qos)
switch (qos) {
case 2: {
Expand Down
47 changes: 47 additions & 0 deletions lib/topic-alias-recv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict'

/**
* Topic Alias receiving manager
* This holds alias to topic map
* @param {Number} [max] - topic alias maximum entries
*/
function TopicAliasRecv (max) {
if (!(this instanceof TopicAliasRecv)) {
return new TopicAliasRecv(max)
}
this.aliasToTopic = {}
this.max = max
}

/**
* Insert or update topic - alias entry.
* @param {String} [topic] - topic
* @param {Number} [alias] - topic alias
* @returns {Boolean} - if success return true otherwise false
*/
TopicAliasRecv.prototype.put = function (topic, alias) {
if (alias === 0 || alias > this.max) {
return false
}
this.aliasToTopic[alias] = topic
this.length = Object.keys(this.aliasToTopic).length
return true
}

/**
* Get topic by alias
* @param {String} [topic] - topic
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
*/
TopicAliasRecv.prototype.getTopicByAlias = function (alias) {
return this.aliasToTopic[alias]
}

/**
* Clear all entries
*/
TopicAliasRecv.prototype.clear = function () {
this.aliasToTopic = {}
}

module.exports = TopicAliasRecv
Loading

0 comments on commit f8178aa

Please sign in to comment.