diff --git a/CHANGES.md b/CHANGES.md index 09e4f64b..48ea1dd9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,8 +1,10 @@ # Changes -## [0.3.12] - 2020-xx-xx +## [0.3.12] - 2020-10-05 -* Better name SubscribeIter::confirm() +* v5: Add helper method Connect::fail_with() + +* v5: Better name SubscribeIter::confirm() ## [0.3.11] - 2020-09-29 diff --git a/Cargo.toml b/Cargo.toml index e759255e..fc0bb1d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.3.11" +version = "0.3.12" authors = ["ntex contributors "] description = "MQTT Client/Server framework for v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" @@ -20,9 +20,9 @@ derive_more = "0.99.5" either = "1.5.3" futures = "0.3.5" fxhash = "0.2.1" -pin-project = "0.4.23" -pin-project-lite = "0.1.7" -pin-project-internal = "0.4.23" +pin-project = "0.4.24" +pin-project-lite = "0.1.9" +pin-project-internal = "0.4.24" log = "0.4" bytestring = "0.1.5" serde = "1.0" diff --git a/src/lib.rs b/src/lib.rs index 8f9a46dc..3e7b3b46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,7 @@ mod version; pub use self::error::MqttError; pub use self::server::MqttServer; pub use self::session::Session; -pub use self::topic::{Topic, Level as TopicLevel}; +pub use self::topic::{Level as TopicLevel, Topic}; // http://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml pub const TCP_PORT: u16 = 1883; diff --git a/src/topic.rs b/src/topic.rs index 01a37dce..48db2932 100644 --- a/src/topic.rs +++ b/src/topic.rs @@ -59,20 +59,12 @@ impl Level { #[inline] pub fn is_normal(&self) -> bool { - if let Level::Normal(_) = *self { - true - } else { - false - } + matches!(*self, Level::Normal(_)) } #[inline] pub fn is_metadata(&self) -> bool { - if let Level::Metadata(_) = *self { - true - } else { - false - } + matches!(*self, Level::Metadata(_)) } #[inline] diff --git a/src/v3/codec/decode.rs b/src/v3/codec/decode.rs index 810217fb..81a1f37b 100644 --- a/src/v3/codec/decode.rs +++ b/src/v3/codec/decode.rs @@ -58,8 +58,8 @@ fn decode_connect_packet(src: &mut Bytes) -> Result { let level = src.get_u8(); ensure!(level == MQTT_LEVEL_3, DecodeError::UnsupportedProtocolLevel); - let flags = ConnectFlags::from_bits(src.get_u8()) - .ok_or_else(|| DecodeError::ConnectReservedFlagSet)?; + let flags = + ConnectFlags::from_bits(src.get_u8()).ok_or(DecodeError::ConnectReservedFlagSet)?; let keep_alive = u16::decode(src)?; let client_id = ByteString::decode(src)?; @@ -100,8 +100,8 @@ fn decode_connect_packet(src: &mut Bytes) -> Result { fn decode_connect_ack_packet(src: &mut Bytes) -> Result { ensure!(src.remaining() >= 2, DecodeError::InvalidLength); - let flags = ConnectAckFlags::from_bits(src.get_u8()) - .ok_or_else(|| DecodeError::ConnAckReservedFlagSet)?; + let flags = + ConnectAckFlags::from_bits(src.get_u8()).ok_or(DecodeError::ConnAckReservedFlagSet)?; let return_code = src.get_u8().try_into()?; Ok(Packet::ConnectAck { diff --git a/src/v3/sink.rs b/src/v3/sink.rs index e35e2bd8..6a90723e 100644 --- a/src/v3/sink.rs +++ b/src/v3/sink.rs @@ -79,14 +79,12 @@ impl MqttSink { let mut inner = self.0.borrow_mut(); if inner.is_closed() { Either::Left(err(())) + } else if inner.queue.len() >= inner.cap { + let (tx, rx) = inner.pool.waiters.channel(); + inner.waiters.push_back(tx); + Either::Right(rx.map_err(|_| ())) } else { - if inner.queue.len() >= inner.cap { - let (tx, rx) = inner.pool.waiters.channel(); - inner.waiters.push_back(tx); - Either::Right(rx.map_err(|_| ())) - } else { - Either::Left(ok(())) - } + Either::Left(ok(())) } } diff --git a/src/v5/codec/packet/connack.rs b/src/v5/codec/packet/connack.rs index d1907f32..f29984ee 100644 --- a/src/v5/codec/packet/connack.rs +++ b/src/v5/codec/packet/connack.rs @@ -106,7 +106,7 @@ impl ConnectAck { pub(crate) fn decode(src: &mut Bytes) -> Result { ensure!(src.remaining() >= 2, DecodeError::InvalidLength); let flags = ConnectAckFlags::from_bits(src.get_u8()) - .ok_or_else(|| DecodeError::ConnAckReservedFlagSet)?; + .ok_or(DecodeError::ConnAckReservedFlagSet)?; let reason_code = src.get_u8().try_into()?; diff --git a/src/v5/codec/packet/connect.rs b/src/v5/codec/packet/connect.rs index ee65c476..6aa56d86 100644 --- a/src/v5/codec/packet/connect.rs +++ b/src/v5/codec/packet/connect.rs @@ -114,9 +114,8 @@ impl Connect { let level = src.get_u8(); ensure!(level == MQTT_LEVEL_5, DecodeError::UnsupportedProtocolLevel); - let flags = ConnectFlags::from_bits(src.get_u8()) - .ok_or_else(|| DecodeError::ConnectReservedFlagSet)?; - + let flags = + ConnectFlags::from_bits(src.get_u8()).ok_or(DecodeError::ConnectReservedFlagSet)?; let keep_alive = src.get_u16(); // reading properties diff --git a/src/v5/connect.rs b/src/v5/connect.rs index 649cad32..67444a28 100644 --- a/src/v5/connect.rs +++ b/src/v5/connect.rs @@ -70,6 +70,7 @@ impl Connect { ConnectAck { io: self.io, sink: self.sink, session: None, packet } } + /// Create connect ack object with provided ConnectAck packet pub fn fail_with(self, ack: codec::ConnectAck) -> ConnectAck { ConnectAck { io: self.io, sink: self.sink, session: None, packet: ack } } diff --git a/src/v5/sink.rs b/src/v5/sink.rs index a42287ea..b9455a20 100644 --- a/src/v5/sink.rs +++ b/src/v5/sink.rs @@ -91,14 +91,12 @@ impl MqttSink { let mut inner = self.0.borrow_mut(); if inner.is_closed() { Either::Left(err(())) + } else if inner.queue.len() >= inner.cap { + let (tx, rx) = inner.pool.waiters.channel(); + inner.waiters.push_back(tx); + Either::Right(rx.map_err(|_| ())) } else { - if inner.queue.len() >= inner.cap { - let (tx, rx) = inner.pool.waiters.channel(); - inner.waiters.push_back(tx); - Either::Right(rx.map_err(|_| ())) - } else { - Either::Left(ok(())) - } + Either::Left(ok(())) } }