Skip to content

Commit

Permalink
Optimize KEEP-ALIVE timer
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Dec 2, 2023
1 parent 0c7e8c8 commit 02025eb
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.12.14] - 2023-12-03

* Optimize KEEP-ALIVE timer

## [0.12.13] - 2023-11-29

* Refactor io timers
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.12.13"
version = "0.12.14"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
46 changes: 24 additions & 22 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ pin_project_lite::pin_project! {
bitflags::bitflags! {
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
struct Flags: u8 {
const READY_ERR = 0b0001;
const IO_ERR = 0b0010;
const KA_TIMEOUT = 0b0100;
const READ_TIMEOUT = 0b1000;
const READY_ERR = 0b00001;
const IO_ERR = 0b00010;
const KA_ENABLED = 0b00100;
const NO_KA_TIMEOUT = 0b01000;
const READ_TIMEOUT = 0b10000;
}
}

Expand Down Expand Up @@ -121,6 +122,12 @@ where
}));
let pool = io.memory_pool().pool();

let flags = if config.keepalive_timeout_secs().is_zero() {
Flags::NO_KA_TIMEOUT
} else {
Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT
};

Dispatcher {
codec,
pool,
Expand All @@ -130,13 +137,13 @@ where
inner: DispatcherInner {
io,
state,
flags,
config: config.clone(),
flags: Flags::empty(),
st: IoDispatcherState::Processing,
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
keepalive_timeout: Seconds(30),
keepalive_timeout: config.keepalive_timeout_secs(),
},
}
}
Expand All @@ -148,6 +155,12 @@ where
/// By default keep-alive timeout is set to 30 seconds.
pub(crate) fn keepalive_timeout(mut self, timeout: Seconds) -> Self {
self.inner.keepalive_timeout = timeout;
if timeout.is_zero() {
self.inner.flags.remove(Flags::KA_ENABLED);
self.inner.flags.insert(Flags::NO_KA_TIMEOUT);
} else {
self.inner.flags.insert(Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT);
}
self
}
}
Expand Down Expand Up @@ -474,33 +487,24 @@ where
}

fn update_timer(&mut self, decoded: &Decoded<<U as Decoder>::Item>) {
log::debug!(
"Update timer, item: {:?}, remains: {:?}, consumed: {:?}, flags: {:?}",
decoded.item.is_some(),
decoded.remains,
decoded.consumed,
self.flags
);

// got parsed frame
if decoded.item.is_some() {
self.read_remains = 0;
self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
self.flags.remove(Flags::READ_TIMEOUT);
} else if self.flags.contains(Flags::READ_TIMEOUT) {
// received new data but not enough for parsing complete frame
self.read_remains = decoded.remains as u32;
} else if self.read_remains == 0 && decoded.remains == 0 {
// no new data, start keep-alive timer
if !self.flags.contains(Flags::KA_TIMEOUT) {
if self.flags.contains(Flags::NO_KA_TIMEOUT | Flags::KA_ENABLED) {
log::debug!("Start keep-alive timer {:?}", self.keepalive_timeout);
self.flags.insert(Flags::KA_TIMEOUT);
self.flags.remove(Flags::NO_KA_TIMEOUT);
self.io.start_timer_secs(self.keepalive_timeout);
}
} else if let Some((timeout, max, _)) = self.config.frame_read_rate_params() {
// we got new data but not enough to parse single frame
// start read timer
self.flags.remove(Flags::KA_TIMEOUT);
self.flags.insert(Flags::READ_TIMEOUT);
self.flags.insert(Flags::READ_TIMEOUT | Flags::NO_KA_TIMEOUT);

self.read_remains = decoded.remains as u32;
self.read_remains_prev = 0;
Expand All @@ -512,8 +516,6 @@ where
}

fn handle_timeout(&mut self) -> Result<(), DispatchItem<U>> {
log::debug!("Handle timeout, flags: {:?}", self.flags);

// check read timer
if self.flags.contains(Flags::READ_TIMEOUT) {
if let Some((timeout, max, rate)) = self.config.frame_read_rate_params() {
Expand Down Expand Up @@ -593,7 +595,7 @@ mod tests {
keepalive_timeout,
io: IoBoxed::from(io),
st: IoDispatcherState::Processing,
flags: Flags::empty(),
flags: Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT,
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
Expand Down

0 comments on commit 02025eb

Please sign in to comment.