Skip to content

Commit

Permalink
Add disconnect timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 7, 2020
1 parent 5408d5f commit 94a9259
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.1.1] - 2020-04-07

* Add disconnect timeout

## [0.1.0] - 2020-04-01

* For to ntex namespace
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <[email protected]>"]
description = "MQTT v3.1.1 Client/Server framework"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,8 +12,8 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = "0.1.1"
ntex-codec = "0.1.0"
ntex = "0.1.5"
ntex-codec = "0.1.1"
bitflags = "1.2"
bytes = "0.5.4"
derive_more = "0.99.5"
Expand Down
25 changes: 21 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub struct MqttServer<Io, St, C: ServiceFactory> {
disconnect: Option<Rc<dyn Fn(&Session<St>, bool)>>,
max_size: usize,
inflight: usize,
handshake_timeout: u64,
handshake_timeout: usize,
disconnect_timeout: usize,
_t: PhantomData<(Io, St)>,
}

Expand Down Expand Up @@ -74,6 +75,7 @@ where
inflight: 15,
disconnect: None,
handshake_timeout: 0,
disconnect_timeout: 3000,
_t: PhantomData,
}
}
Expand All @@ -90,11 +92,24 @@ where
///
/// Handshake includes `connect` packet and response `connect-ack`.
/// By default handshake timeuot is disabled.
pub fn handshake_timeout(mut self, timeout: u64) -> Self {
pub fn handshake_timeout(mut self, timeout: usize) -> Self {
self.handshake_timeout = timeout;
self
}

/// Set server connection disconnect timeout in milliseconds.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
pub fn disconnect_timeout(mut self, val: usize) -> Self {
self.disconnect_timeout = val;
self
}

/// Set max inbound frame size.
///
/// If max size is set to `0`, size is unlimited.
Expand Down Expand Up @@ -179,6 +194,7 @@ where
let max_size = self.max_size;
let handshake_timeout = self.handshake_timeout;
let disconnect = self.disconnect;
let disconnect_timeout = self.disconnect_timeout;
let publish = publish
.into_factory()
.map_err(|e| MqttError::Service(e.into()))
Expand All @@ -191,6 +207,7 @@ where
self.inflight,
handshake_timeout,
))
.disconnect_timeout(disconnect_timeout)
.build(factory(
publish,
self.subscribe,
Expand All @@ -210,7 +227,7 @@ fn connect_service_factory<Io, St, C>(
factory: C,
max_size: usize,
inflight: usize,
handshake_timeout: u64,
handshake_timeout: usize,
) -> impl ServiceFactory<
Config = (),
Request = framed::Connect<Io, mqtt::Codec>,
Expand All @@ -228,7 +245,7 @@ where
C::Error: fmt::Debug,
{
apply(
Timeout::new(Duration::from_millis(handshake_timeout)),
Timeout::new(Duration::from_millis(handshake_timeout as u64)),
fn_factory(move || {
let fut = factory.new_service(());

Expand Down
7 changes: 6 additions & 1 deletion src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ impl MqttSink {
}

/// Send publish packet
pub fn publish_qos1(&self, topic: ByteString, payload: Bytes, dup: bool) -> impl Future<Output = Result<(), ()>> {
pub fn publish_qos1(
&self,
topic: ByteString,
payload: Bytes,
dup: bool,
) -> impl Future<Output = Result<(), ()>> {
let mut inner = self.0.borrow_mut();

if inner.sink.is_some() {
Expand Down

0 comments on commit 94a9259

Please sign in to comment.