From 962e01ac30a357cdcac2353d947be9de2065a090 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 10 Dec 2023 08:23:54 +0600 Subject: [PATCH] Fix KEEP-ALIVE timer handling (#163) --- CHANGES.md | 4 +++ Cargo.toml | 2 +- src/io.rs | 78 ++++++++++++++++++++++++++++++++++++---------- src/server.rs | 7 ----- src/v3/selector.rs | 7 ----- src/v3/server.rs | 7 ----- src/v5/selector.rs | 7 ----- src/v5/server.rs | 7 ----- 8 files changed, 67 insertions(+), 52 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 16bb818..0714471 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.12.15] - 2023-12-10 + +* Fix KEEP-ALIVE timer handling + ## [0.12.14] - 2023-12-03 * Optimize KEEP-ALIVE timer diff --git a/Cargo.toml b/Cargo.toml index 67e7499..3ca563a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.12.14" +version = "0.12.15" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" diff --git a/src/io.rs b/src/io.rs index 86fbaa5..3375f13 100644 --- a/src/io.rs +++ b/src/io.rs @@ -37,7 +37,7 @@ bitflags::bitflags! { const READY_ERR = 0b00001; const IO_ERR = 0b00010; const KA_ENABLED = 0b00100; - const NO_KA_TIMEOUT = 0b01000; + const KA_TIMEOUT = 0b01000; const READ_TIMEOUT = 0b10000; } } @@ -122,12 +122,6 @@ where })); let pool = io.memory_pool().pool(); - let flags = if config.keepalive_timeout_secs().is_zero() { - Flags::NO_KA_TIMEOUT - } else { - Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT - }; - Dispatcher { codec, pool, @@ -137,7 +131,7 @@ where inner: DispatcherInner { io, state, - flags, + flags: Flags::empty(), config: config.clone(), st: IoDispatcherState::Processing, read_remains: 0, @@ -157,9 +151,8 @@ where self.inner.keepalive_timeout = timeout; if timeout.is_zero() { self.inner.flags.remove(Flags::KA_ENABLED); - self.inner.flags.insert(Flags::NO_KA_TIMEOUT); } else { - self.inner.flags.insert(Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT); + self.inner.flags.insert(Flags::KA_ENABLED); } self } @@ -490,21 +483,22 @@ where // got parsed frame if decoded.item.is_some() { self.read_remains = 0; - self.flags.remove(Flags::READ_TIMEOUT); + self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); } else if self.flags.contains(Flags::READ_TIMEOUT) { // received new data but not enough for parsing complete frame self.read_remains = decoded.remains as u32; } else if self.read_remains == 0 && decoded.remains == 0 { // no new data, start keep-alive timer - if self.flags.contains(Flags::NO_KA_TIMEOUT | Flags::KA_ENABLED) { + if self.flags.contains(Flags::KA_ENABLED) && !self.flags.contains(Flags::KA_TIMEOUT) + { log::debug!("Start keep-alive timer {:?}", self.keepalive_timeout); - self.flags.remove(Flags::NO_KA_TIMEOUT); + self.flags.insert(Flags::KA_TIMEOUT); self.io.start_timer_secs(self.keepalive_timeout); } } else if let Some((timeout, max, _)) = self.config.frame_read_rate_params() { // we got new data but not enough to parse single frame // start read timer - self.flags.insert(Flags::READ_TIMEOUT | Flags::NO_KA_TIMEOUT); + self.flags.insert(Flags::READ_TIMEOUT); self.read_remains = decoded.remains as u32; self.read_remains_prev = 0; @@ -550,7 +544,7 @@ where #[cfg(test)] mod tests { - use std::{cell::Cell, rc::Rc}; + use std::{cell::Cell, rc::Rc, sync::Arc, sync::Mutex}; use ntex::channel::condition::Condition; use ntex::time::{sleep, Millis}; @@ -595,7 +589,7 @@ mod tests { keepalive_timeout, io: IoBoxed::from(io), st: IoDispatcherState::Processing, - flags: Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT, + flags: Flags::KA_ENABLED, read_remains: 0, read_remains_prev: 0, read_max_timeout: Seconds::ZERO, @@ -854,4 +848,56 @@ mod tests { client.close().await; let _ = rx.recv().await; } + + /// Update keep-alive timer after receiving frame + #[ntex::test] + async fn test_keepalive() { + let (client, server) = Io::create(); + client.remote_buffer_cap(1024); + + let data = Arc::new(Mutex::new(RefCell::new(Vec::new()))); + let data2 = data.clone(); + + let (disp, _) = Dispatcher::new_debug( + nio::Io::new(server), + BytesCodec, + ntex::service::fn_service(move |msg: DispatchItem| { + let data = data2.clone(); + async move { + match msg { + DispatchItem::Item(bytes) => { + data.lock().unwrap().borrow_mut().push(0); + return Ok::<_, ()>(Some(bytes.freeze())); + } + DispatchItem::KeepAliveTimeout => { + data.lock().unwrap().borrow_mut().push(1); + } + _ => (), + } + Ok(None) + } + }), + ); + ntex::rt::spawn(async move { + let _ = disp.keepalive_timeout(Seconds(2)).await; + }); + + client.write("1"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"1")); + sleep(Millis(750)).await; + + client.write("2"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"2")); + + sleep(Millis(750)).await; + client.write("3"); + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"3")); + + sleep(Millis(750)).await; + assert!(!client.is_closed()); + assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 0, 0]); + } } diff --git a/src/server.rs b/src/server.rs index fb971d8..b9f5b4b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -61,13 +61,6 @@ impl MqttServer { self.connect_timeout = timeout.into(); self } - - #[deprecated(since = "0.12.5")] - #[doc(hidden)] - pub fn conenct_timeout(mut self, timeout: Seconds) -> Self { - self.connect_timeout = timeout.into(); - self - } } impl MqttServer diff --git a/src/v3/selector.rs b/src/v3/selector.rs index d6178ee..20e4688 100644 --- a/src/v3/selector.rs +++ b/src/v3/selector.rs @@ -59,13 +59,6 @@ where self } - #[deprecated(since = "0.12.5")] - #[doc(hidden)] - pub fn conenct_timeout(mut self, timeout: Seconds) -> Self { - self.connect_timeout = timeout.into(); - self - } - /// Set max inbound frame size. /// /// If max size is set to `0`, size is unlimited. diff --git a/src/v3/server.rs b/src/v3/server.rs index aa7ed35..b7c28dd 100644 --- a/src/v3/server.rs +++ b/src/v3/server.rs @@ -108,13 +108,6 @@ where self } - #[deprecated(since = "0.12.5")] - #[doc(hidden)] - pub fn conenct_timeout(mut self, timeout: Seconds) -> Self { - self.connect_timeout = timeout; - self - } - /// Set server connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete diff --git a/src/v5/selector.rs b/src/v5/selector.rs index f71b3c2..25d68b1 100644 --- a/src/v5/selector.rs +++ b/src/v5/selector.rs @@ -62,13 +62,6 @@ where self } - #[deprecated(since = "0.12.5")] - #[doc(hidden)] - pub fn conenct_timeout(mut self, timeout: Seconds) -> Self { - self.connect_timeout = timeout.into(); - self - } - /// Set max inbound frame size. /// /// If max size is set to `0`, size is unlimited. diff --git a/src/v5/server.rs b/src/v5/server.rs index 2cedbf3..4772a93 100644 --- a/src/v5/server.rs +++ b/src/v5/server.rs @@ -83,13 +83,6 @@ where self } - #[deprecated(since = "0.12.5")] - #[doc(hidden)] - pub fn conenct_timeout(mut self, timeout: Seconds) -> Self { - self.connect_timeout = timeout; - self - } - /// Set server connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete