From bd1d65aa16638700b239f14a577da8764ed34363 Mon Sep 17 00:00:00 2001 From: Paul Lysak Date: Thu, 10 Sep 2020 15:28:34 +0300 Subject: [PATCH] Prepare for MQTT5 changes --- src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java | 6 ++++-- src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java | 4 +++- .../io/vertx/mqtt/test/server/MqttServerSubscribeTest.java | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java index d6a5d3a5..38aef6e9 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java @@ -29,8 +29,10 @@ import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageFactory; +import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscribePayload; @@ -434,7 +436,7 @@ public Future subscribe(Map topics) { false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId()); + MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES); List subscriptions = topics.entrySet() .stream() .map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue()))) @@ -505,7 +507,7 @@ public Future unsubscribe(String topic) { false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId()); + MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES); MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList())); diff --git a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java index c49f4563..7ccf0bde 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java @@ -27,6 +27,7 @@ import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckPayload; +import io.netty.handler.codec.mqtt.MqttUnsubAckPayload; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -367,8 +368,9 @@ public MqttEndpointImpl unsubscribeAcknowledge(int unsubscribeMessageId) { new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(unsubscribeMessageId); + MqttUnsubAckPayload payload = new MqttUnsubAckPayload(); - io.netty.handler.codec.mqtt.MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); + io.netty.handler.codec.mqtt.MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); this.write(unsuback); diff --git a/src/test/java/io/vertx/mqtt/test/server/MqttServerSubscribeTest.java b/src/test/java/io/vertx/mqtt/test/server/MqttServerSubscribeTest.java index ba0ec36f..46288d9c 100644 --- a/src/test/java/io/vertx/mqtt/test/server/MqttServerSubscribeTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/MqttServerSubscribeTest.java @@ -111,7 +111,7 @@ private void subscribe(TestContext context, String topic, int expectedQos) { } @Test - public void subscribeUnsupportedQos(TestContext context) { + public void subscribeUnsupportedMqttVersion(TestContext context) { Async async = context.async(); @@ -122,7 +122,7 @@ public void subscribeUnsupportedQos(TestContext context) { 0x11, // MSG LEN 0x00, 0x04, // PROTOCOL NAME LENGTH 0x4D, 0x51, 0x54, 0x54, // MQTT - 0x05, // VERSION + 0x06, // VERSION 0x02, // QOS 0x00, 0x3C, // KEEP ALIVE 0x00, 0x05, // CLIENT ID LENGTH