From 1eb9426f3c5fa722f6fcfc47e91eb56d7dabf96f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 5 Apr 2024 20:54:22 +0530 Subject: [PATCH 1/7] doc: `write_to_storage` doesn't always write to disk --- uplink/src/base/serializer/mod.rs | 37 +++++++++++++++---------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 3a8c5f4b6..f856fdda5 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -324,7 +324,7 @@ impl Serializer { let stream_config = data.stream_config(); let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream_config); - match write_to_disk(publish, storage) { + match write_to_storage(publish, storage) { Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), Ok(_) => {} Err(e) => error!("Shutdown: write error = {:?}", e), @@ -340,7 +340,7 @@ impl Serializer { ) -> Result { let storage = self.storage_handler.select(&stream); // Write failed publish to disk first, metrics don't matter - match write_to_disk(publish, storage) { + match write_to_storage(publish, storage) { Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), Ok(_) => {} Err(e) => error!("Crash loop: write error = {:?}", e), @@ -351,7 +351,7 @@ impl Serializer { let data = self.collector_rx.recv_async().await?; let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream); - match write_to_disk(publish, storage) { + match write_to_storage(publish, storage) { Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), Ok(_) => {} Err(e) => error!("Crash loop: write error = {:?}", e), @@ -380,7 +380,7 @@ impl Serializer { let stream = data.stream_config(); let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream); - match write_to_disk(publish, storage) { + match write_to_storage(publish, storage) { Ok(Some(deleted)) => { debug!("Lost segment = {deleted}"); self.metrics.increment_lost_segments(); @@ -475,7 +475,7 @@ impl Serializer { let stream = data.stream_config(); let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream); - match write_to_disk(publish, storage) { + match write_to_storage(publish, storage) { Ok(Some(deleted)) => { debug!("Lost segment = {deleted}"); self.metrics.increment_lost_segments(); @@ -665,10 +665,9 @@ fn construct_publish( Ok(Publish::new(topic, QoS::AtLeastOnce, payload)) } -// Writes the provided publish packet to disk with [Storage], after setting its pkid to 1. -// Updates serializer metrics with appropriate values on success, if asked to do so. -// Returns size in memory, size in disk, number of files in disk, -fn write_to_disk( +// Writes the provided publish packet to [Storage], after setting its pkid to 1. +// If the write buffer is full, it is flushed/written onto disk based on config. +fn write_to_storage( mut publish: Publish, storage: &mut Storage, ) -> Result, storage::Error> { @@ -1037,7 +1036,7 @@ mod test { QoS::AtLeastOnce, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - write_to_disk(publish.clone(), &mut storage).unwrap(); + write_to_storage(publish.clone(), &mut storage).unwrap(); let stored_publish = read_from_storage(&mut storage, serializer.config.mqtt.max_packet_size); @@ -1189,7 +1188,7 @@ mod test { QoS::AtLeastOnce, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - write_to_disk(publish.clone(), &mut storage).unwrap(); + write_to_storage(publish.clone(), &mut storage).unwrap(); let status = tokio::runtime::Runtime::new().unwrap().block_on(serializer.catchup()).unwrap(); @@ -1231,7 +1230,7 @@ mod test { QoS::AtLeastOnce, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - write_to_disk(publish.clone(), &mut storage).unwrap(); + write_to_storage(publish.clone(), &mut storage).unwrap(); match tokio::runtime::Runtime::new().unwrap().block_on(serializer.catchup()).unwrap() { Status::EventLoopCrash(Publish { topic, payload, .. }, _) => { @@ -1288,8 +1287,8 @@ mod test { ..Default::default() })) .or_insert_with(|| unreachable!()); - write_to_disk(publish("topic/one".to_string(), 1), &mut one).unwrap(); - write_to_disk(publish("topic/one".to_string(), 10), &mut one).unwrap(); + write_to_storage(publish("topic/one".to_string(), 1), &mut one).unwrap(); + write_to_storage(publish("topic/one".to_string(), 10), &mut one).unwrap(); let top = serializer .storage_handler @@ -1300,8 +1299,8 @@ mod test { ..Default::default() })) .or_insert_with(|| unreachable!()); - write_to_disk(publish("topic/top".to_string(), 100), top).unwrap(); - write_to_disk(publish("topic/top".to_string(), 1000), top).unwrap(); + write_to_storage(publish("topic/top".to_string(), 100), top).unwrap(); + write_to_storage(publish("topic/top".to_string(), 1000), top).unwrap(); let two = serializer .storage_handler @@ -1312,7 +1311,7 @@ mod test { ..Default::default() })) .or_insert_with(|| unreachable!()); - write_to_disk(publish("topic/two".to_string(), 3), two).unwrap(); + write_to_storage(publish("topic/two".to_string(), 3), two).unwrap(); let mut default = serializer .storage_handler @@ -1323,8 +1322,8 @@ mod test { ..Default::default() })) .or_insert(Storage::new("topic/default", 1024)); - write_to_disk(publish("topic/default".to_string(), 0), &mut default).unwrap(); - write_to_disk(publish("topic/default".to_string(), 2), &mut default).unwrap(); + write_to_storage(publish("topic/default".to_string(), 0), &mut default).unwrap(); + write_to_storage(publish("topic/default".to_string(), 2), &mut default).unwrap(); // run serializer in the background spawn(async { serializer.start().await.unwrap() }); From 7bb3bd7163eb1c7255345da9d8e386b21ae02051 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 5 Apr 2024 21:42:37 +0530 Subject: [PATCH 2/7] doc: use better name for variant --- uplink/src/base/serializer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index f856fdda5..ec4e1c399 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -45,8 +45,8 @@ pub enum Error { Serde(#[from] serde_json::Error), #[error("Io error {0}")] Io(#[from] io::Error), - #[error("Disk error {0}")] - Disk(#[from] storage::Error), + #[error("Storage error {0}")] + Storage(#[from] storage::Error), #[error("Mqtt client error {0}")] Client(#[from] MqttError), #[error("Storage is disabled/missing")] From 500a578e78ae06d54828082de1862604f6b8f3a3 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 5 Apr 2024 21:42:59 +0530 Subject: [PATCH 3/7] doc: update lib.rs --- uplink/src/lib.rs | 57 +++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index c536eb4a9..ff5ba87c7 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -4,40 +4,39 @@ //! by [`Mqtt`] and [`Serializer`] respectively. [`Action`]s are received and forwarded by [`Mqtt`] to the [`Bridge`] module, where it is handled //! depending on the [`name`], with [`Bridge`] forwarding it to one of many **Action Handlers**, configured with an [`ActionRoute`]. //! -//! Some of the action handlers are [`TcpJson`], [`ProcessHandler`], [`FileDownloader`] and [`TunshellSession`]. [`TcpJson`] forwards Actions received +//! Some of the action handlers are [`TcpJson`], [`ProcessHandler`], [`FileDownloader`] and [`TunshellClient`]. [`TcpJson`] forwards Actions received //! from the platform to the application connected to it through the [`port`] and collects response data from these devices, to forward to the platform. //! Response data can be of multiple types, of interest to us are [`ActionResponse`]s and data [`Payload`]s, which are forwarded to [`Bridge`] and from //! there to the [`Serializer`], where depending on the network, it may be persisted in-memory or on-disk with [`Storage`]. //! //!```text -//! ┌───────────┐ -//! │MQTT broker│ -//! └────┐▲─────┘ -//! ││ -//! Action ││ ActionResponse -//! ││ / Data -//! ┌─▼└─┐ -//! ┌──────────┤Mqtt◄─────────┐ -//! Action │ └────┘ │ ActionResponse -//! │ │ / Data -//! │ │ -//! ┌──▼───┐ ActionResponse ┌────┴─────┐ Publish ┌───────┐ -//! ┌───────────────────────►Bridge├────────────────►Serializer◄─────────────►Storage| -//! │ └┬─┬┬─┬┘ / Data └──────────┘ Packets └───────┘ -//! │ │ ││ │ -//! │ │ ││ | Action (BridgeTx) -//! │ ┌───────────────┘ ││ └────────────────────┐ -//! │ │ ┌─────┘└───────┐ │ -//! │ ------│-----------│--------------│--------------│------ -//! │ ' │ │ Applications │ │ ' -//! │ '┌────▼───┐ ┌───▼───┐ ┌──────▼───────┐ ┌───▼───┐ ' Action ┌───────────┐ -//! │ '│Tunshell│ │Process│ │FileDownloader│ │TcpJson◄───────────────────►Application│ -//! │ '└────┬───┘ └───┬───┘ └──────┬───────┘ └───┬───┘ ' ActionResponse │ / Device │ -//! │ ' │ │ │ │ ' / Data └───────────┘ -//! │ ------│-----------│--------------│--------------│------ -//! │ │ │ │ │ -//! └────────┴───────────┴──────────────┴──────────────┘ -//! ActionResponse / Data +//! ┌───────────┐ +//! │MQTT broker│ +//! └────┐▲─────┘ +//! Action ││ ActionResponse +//! ││ / Data +//! Action ┌─▼└─┐ +//! ┌────────────┤Mqtt◄──────────┐ ActionResponse +//! │ └────┘ │ / Data +//! │ ActionResponse ┌────┴─────┐ Publish ┌───────┐ +//! │ ┌───────┬───────►Serializer◄─────────────►Storage│ +//! │ │ │ Data └──────────┘ Packets └───────┘ +//! ┌------│-------│-------│-----┐ +//! '┌─────▼─────┐ │ ┌────┴────┐' +//! ┌────────────►Action Lane├─┘ │Data Lane◄──────┐ +//! │ '└──┬─┬─┬─┬──┘ └─────────┘' │ +//! │ ' │ │ │ │ Bridge ' │ +//! │ └---│-│-│-│------------------┘ │ +//! │ │ │ │ └─────────────────────┐ │ +//! │ ┌────────┘ │ └──────────┐ │ │ +//! │┌-----│----------│------------│------------│--│--┐ +//! │' │ │Applications│ │ │ ' +//! │'┌────▼───┐ ┌────▼──┐ ┌──────▼───────┐ ┌──▼──┴─┐' Action ┌───────────┐ +//! │'│Tunshell│ │Process│ │FileDownloader│ │TcpJson◄─────────────────►Application│ +//! │'└────┬───┘ └───┬───┘ └──────┬───────┘ └───┬───┘' ActionResponse │ / Device │ +//! │└-----│---------│-------------│-------------│----┘ / Data └───────────┘ +//! └──────┴─────────┴─────────────┴─────────────┘ +//! ActionResponse //!``` //! [`port`]: base::AppConfig#structfield.port //! [`name`]: Action#structfield.name From 88805cfbcaa9ab64164ad328515ad4eb00faef5c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 16 Apr 2024 23:20:07 +0530 Subject: [PATCH 4/7] fix: delete downloaded file on action timeout (#340) * fix: delete partially downloaded file when handling a timeout * fix: error out instead of success response --- uplink/src/collector/downloader.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 893e94ec9..ce5bbe38d 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -93,6 +93,8 @@ pub enum Error { BadSave, #[error("Save file doesn't exist")] NoSave, + #[error("Download timedout")] + Timeout, } /// This struct contains the necessary components to download and store file as notified by a download file @@ -221,7 +223,13 @@ impl FileDownloader { // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries o = timeout_at(deadline, self.continuous_retry(state)) => match o { Ok(r) => r?, - Err(_) => error!("Last download has timedout"), + Err(_) => { + // unwrap is safe because download_path is expected to be Some + _ = remove_file(state.current.meta.download_path.as_ref().unwrap()); + error!("Last download has timedout; file deleted"); + + return Err(Error::Timeout); + }, } } From 69f4e1cf160bbb9e16de32e29bced30f7fa996dd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 16 Apr 2024 23:21:35 +0530 Subject: [PATCH 5/7] chore: v2.12.1 release --- Cargo.lock | 2 +- uplink/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e14923d4f..a35d51342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3368,7 +3368,7 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "uplink" -version = "2.12.0" +version = "2.12.1" dependencies = [ "anyhow", "async-trait", diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index 75627977d..92cda3da9 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "uplink" -version = "2.12.0" +version = "2.12.1" authors = ["tekjar "] edition = "2021" From bca386db6c9b636883ffa8e46e3ee13cb08e2c01 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 30 Apr 2024 16:22:31 +0530 Subject: [PATCH 6/7] chore: update deps --- Cargo.lock | 119 ++++++++++++++++++++++++------ Cargo.toml | 2 +- uplink/Cargo.toml | 2 +- uplink/src/base/mqtt/mod.rs | 6 +- uplink/src/base/serializer/mod.rs | 6 +- 5 files changed, 106 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a35d51342..89dd031c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,6 +193,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "base64ct" version = "1.6.0" @@ -1477,6 +1483,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi 0.3.9", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1616,6 +1632,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -2123,7 +2145,7 @@ dependencies = [ "percent-encoding", "pin-project-lite 0.2.13", "rustls 0.21.10", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2194,19 +2216,21 @@ dependencies = [ [[package]] name = "rumqttc" -version = "0.23.0" -source = "git+https://github.com/bytebeamio/rumqtt#09a6153d2cd3bb9136414752f67b9614bd4c97b4" +version = "0.24.0" +source = "git+https://github.com/bytebeamio/rumqtt#c5bfdd74a535cadd518c12f7667e64f1ec68858c" dependencies = [ "bytes 1.5.0", "flume 0.11.0", "futures-util", "log", "rustls-native-certs", - "rustls-pemfile", - "rustls-webpki", + "rustls-pemfile 2.1.2", + "rustls-webpki 0.102.3", "thiserror", "tokio 1.36.0", - "tokio-rustls 0.24.1", + "tokio-rustls 0.25.0", + "tokio-stream", + "tokio-util 0.7.10", ] [[package]] @@ -2267,18 +2291,33 @@ checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", "ring 0.17.7", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct 0.7.1", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring 0.17.7", + "rustls-pki-types", + "rustls-webpki 0.102.3", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" -version = "0.6.3" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "schannel", "security-framework", ] @@ -2292,6 +2331,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.0", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2302,6 +2357,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" +dependencies = [ + "ring 0.17.7", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -2889,18 +2955,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" dependencies = [ "proc-macro2", "quote", @@ -3072,11 +3138,22 @@ dependencies = [ "tokio 1.36.0", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio 1.36.0", +] + [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" dependencies = [ "futures-core", "pin-project-lite 0.2.13", @@ -3185,9 +3262,9 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.4" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ "log", "once_cell", @@ -3196,12 +3273,12 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.14" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a713421342a5a666b7577783721d3117f1b69a393df803ee17bb73b1e122a59" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ - "ansi_term", "matchers", + "nu-ansi-term", "once_cell", "regex", "sharded-slab", diff --git a/Cargo.toml b/Cargo.toml index c2b1f2edd..cc644fba4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ serde_json = "1.0" tempdir = "0.3" thiserror = "1" tokio = { version = "1", features = ["rt-multi-thread", "process"] } -tokio-stream = "0.1" +tokio-stream = "0.1.15" tokio-util = { version = "0.7", features = ["codec", "time"] } [profile.dev] diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index 92cda3da9..848d777bf 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -32,7 +32,7 @@ storage = { path = "../storage" } log = { workspace = true } regex = "1.7.1" tracing = { version="0.1", features=["log"] } -tracing-subscriber = { version="=0.3.14", features=["env-filter"] } +tracing-subscriber = { version="0.3.18", features=["env-filter"] } # built-in collectors # tunshell diff --git a/uplink/src/base/mqtt/mod.rs b/uplink/src/base/mqtt/mod.rs index 35ecf6225..12f4f3d83 100644 --- a/uplink/src/base/mqtt/mod.rs +++ b/uplink/src/base/mqtt/mod.rs @@ -12,8 +12,8 @@ use std::path::Path; use crate::{Action, Config}; use rumqttc::{ - read, AsyncClient, ConnectionError, Event, EventLoop, Incoming, MqttOptions, Packet, Publish, - QoS, Request, TlsConfiguration, Transport, + AsyncClient, ConnectionError, Event, EventLoop, Incoming, MqttOptions, Packet, Publish, QoS, + Request, TlsConfiguration, Transport, }; use std::sync::Arc; @@ -138,7 +138,7 @@ impl Mqtt { let max_packet_size = self.config.mqtt.max_packet_size; loop { // NOTE: This can fail when packet sizes > max_payload_size in config are written to disk. - match read(&mut buf, max_packet_size) { + match Packet::read(&mut buf, max_packet_size) { Ok(Packet::Publish(publish)) => { self.eventloop.pending.push_back(Request::Publish(publish)) } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index ec4e1c399..e536ad24f 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -447,7 +447,7 @@ impl Serializer { // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. // This leads to force switching to normal mode. Increasing max_payload_size to bypass this - let publish = match read(storage.reader(), max_packet_size) { + let publish = match Packet::read(storage.reader(), max_packet_size) { Ok(Packet::Publish(publish)) => publish, Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), Err(e) => { @@ -505,7 +505,7 @@ impl Serializer { _ => return Ok(Status::Normal), }; - let publish = match read(storage.reader(), max_packet_size) { + let publish = match Packet::read(storage.reader(), max_packet_size) { Ok(Packet::Publish(publish)) => publish, Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), Err(e) => { @@ -925,7 +925,7 @@ mod test { panic!("No publishes found in storage"); } - match read(storage.reader(), max_packet_size) { + match Packet::read(storage.reader(), max_packet_size) { Ok(Packet::Publish(publish)) => return publish, v => { panic!("Failed to read publish from storage. read: {:?}", v); From 513f4c2def843f3c27bf0a7dc993667010944535 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 30 Apr 2024 16:23:57 +0530 Subject: [PATCH 7/7] test: rumqttc update --- storage/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 217b30e42..ea8522ee0 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -431,7 +431,7 @@ mod test { break; } - match read(storage.reader(), 1048).unwrap() { + match Packet::read(storage.reader(), 1048).unwrap() { Packet::Publish(p) => publishes.push(p), packet => unreachable!("{:?}", packet), }