Skip to content

Commit

Permalink
regex subscribe, qos2
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelDvP committed Jan 26, 2024
1 parent 36f2a68 commit b174af6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
36 changes: 18 additions & 18 deletions lib/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@ function convertID2topic(id, pattern, prefix, namespace, removePrefix) {
*/
function pattern2RegEx(pattern, adapter) {
pattern = convertTopic2id(pattern, true, adapter.config.prefix, adapter.namespace);
pattern = pattern.replace(/#/g, '*');
if (pattern === '#') {
return '.*';
}
if (pattern === '+') {
return '[^.]*';
}
pattern = pattern.replace(/\$/g, '\\$');
pattern = pattern.replace(/\^/g, '\\^');

if (pattern !== '*') {
if (pattern[0] === '*' && pattern[pattern.length - 1] !== '*') {
pattern += '$';
}
if (pattern[0] !== '*' && pattern[pattern.length - 1] === '*') {
pattern = `^${pattern}`;
}
if (pattern[0] === '+') {
pattern = `^[^.]*${pattern.substring(1)}`;
}
if (pattern[pattern.length - 1] === '+') {
pattern = `${pattern.substring(0, pattern.length - 1)}[^.]*$`;
}
} else {
return '.*';
if (pattern.length > 2 && (
(pattern[0] === '#' && pattern[1] !== '.') ||
(pattern[0] === '+' && pattern[1] !== '.') ||
(pattern[pattern.length - 1] === '#' && pattern[pattern.length - 2] !== '.') ||
(pattern[pattern.length - 1] === '+' && pattern[pattern.length - 2] !== '.'))) {
return '';
}
pattern = '^' + pattern + '$';
pattern = pattern.replace(/#/g, '*');
pattern = pattern.replace(/\./g, '\\.');
pattern = pattern.replace(/\\\.\*/g, '\\..*');
if (pattern[1] === '*') {
pattern = '^.' + pattern.substring(1);
}
pattern = pattern.replace(/\\\.\*/g, '(\\..*)?');
pattern = pattern.replace(/\+/g, '[^.]*');
return pattern;
}
Expand Down
28 changes: 18 additions & 10 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -879,19 +879,21 @@ function MQTTServer(adapter, states) {
client.puback({messageId: packet.messageId});
} else if (packet.qos === 2) {
const pack = client._messages && client._messages.find(e => e.messageId === packet.messageId);
if (pack) {
if (pack && packet.dup === false) {
// duplicate message => ignore
adapter.log.info(`Client [${client.id}] Ignored duplicate message with ID: ${packet.messageId}`);
return;
} else if (pack) {
client.pubrec({messageId: packet.messageId});
return;
} else {
packet.ts = Date.now();
packet.cmd = 'pubrel';
packet.cmd = 'pubrec';
packet.count = 0;
client._messages = client._messages || [];
client._messages.push(packet);

client.pubrec({messageId: packet.messageId});
return;
}
}

Expand All @@ -913,9 +915,18 @@ function MQTTServer(adapter, states) {
persistentSessions[client.id].lastSeen = Date.now();
}

let pos = -1;
// remove this message from queue
const frame = client._messages && client._messages.find(e => e.messageId === packet.messageId);
if (frame) {
if (client._messages) {
pos = client._messages.findIndex(e => e.messageId === packet.messageId);
}
if (pos !== -1) {
client._messages.splice(pos, 1);
packet.ts = Date.now();
packet.cmd = 'pubrel';
packet.count = 0;
client._messages = client._messages || [];
client._messages.push(packet);
client.pubrel({messageId: packet.messageId});
} else {
adapter.log.info(`Client [${client.id}] Received pubrec on ${client.id} for unknown messageId ${packet.messageId}`);
Expand Down Expand Up @@ -964,16 +975,14 @@ function MQTTServer(adapter, states) {
persistentSessions[client.id].lastSeen = Date.now();
}

// remove this message from queue
let pos = -1;
// remove this message from queue
if (client._messages) {
pos = client._messages.findIndex(e => e.messageId === packet.messageId);
}
if (pos !== -1) {
client.pubcomp({messageId: packet.messageId});
await receivedTopic(client._messages[pos], client);
client._messages.splice(pos, 1);
client.pubcomp({messageId: packet.messageId});
} else {
adapter.log.info(`Client [${client.id}] Received pubrel for unknown message ID: ${packet.messageId}`);
}
Expand All @@ -993,7 +1002,6 @@ function MQTTServer(adapter, states) {
persistentSessions[client.id].lastSeen = Date.now();
}

// remove this message from queue
let pos = -1;
// remove this message from queue
if (client._messages) {
Expand Down Expand Up @@ -1219,7 +1227,7 @@ function MQTTServer(adapter, states) {
message.dup = true;
clients[clientId].publish(message);
} else if (message.cmd === 'pubrel') {
clients[clientId].pubrec({
clients[clientId].pubrel({
messageId: message.messageId
});
}
Expand Down

0 comments on commit b174af6

Please sign in to comment.