diff --git a/Cargo.lock b/Cargo.lock index 998c5689..576421b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2071,7 +2071,7 @@ dependencies = [ [[package]] name = "rumqttc" version = "0.23.0" -source = "git+https://github.com/bytebeamio/rumqtt#22d7fae16e3be9cc4f65b49eb31b2fe425aa9e3f" +source = "git+https://github.com/bytebeamio/rumqtt?branch=channel-pending#164a6baea584457d7e2c8cfbc18234ff69ef52ae" dependencies = [ "bytes 1.5.0", "flume 0.11.0", diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index 3c10e340..d1b92539 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] bytes = "1" flume = "0.10" -rumqttc = { git = "https://github.com/bytebeamio/rumqtt" } +rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "channel-pending" } serde = { version = "1", features = ["derive"] } serde_json = "1.0" serde_with = "3.3.0" diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index b95b6de0..19a7c97e 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -1,4 +1,5 @@ use flume::{Receiver, Sender}; +use log::error; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::join; @@ -20,7 +21,7 @@ pub use self::{ data_lane::{DataBridge, DataBridgeTx}, }; -use super::Compression; +use super::{serializer::SerializerShutdown, Compression}; pub use metrics::StreamMetrics; pub trait Point: Send + Debug + Serialize + 'static { @@ -79,6 +80,7 @@ pub(crate) struct DataBridgeShutdown; pub struct Bridge { pub(crate) data: DataBridge, pub(crate) actions: ActionsBridge, + pub(crate) serializer_shutdown: Sender, } impl Bridge { @@ -88,15 +90,20 @@ impl Bridge { metrics_tx: Sender, actions_rx: Receiver, shutdown_handle: Sender<()>, + serializer_shutdown: Sender, ) -> Self { let data = DataBridge::new(config.clone(), package_tx.clone(), metrics_tx.clone()); let actions = ActionsBridge::new(config, package_tx, actions_rx, shutdown_handle, metrics_tx); - Self { data, actions } + Self { data, actions, serializer_shutdown } } pub fn tx(&self) -> BridgeTx { - BridgeTx { data: self.data.tx(), actions: self.actions.tx() } + BridgeTx { + data: self.data.tx(), + actions: self.actions.tx(), + serializer_shutdown: self.serializer_shutdown.clone(), + } } pub fn register_action_route( @@ -120,6 +127,7 @@ impl Bridge { pub struct BridgeTx { pub data: DataBridgeTx, pub actions: ActionsBridgeTx, + pub serializer_shutdown: Sender, } impl BridgeTx { @@ -136,6 +144,10 @@ impl BridgeTx { } pub async fn trigger_shutdown(&self) { - join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown()); + join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown(), async { + if let Err(e) = self.serializer_shutdown.send_async(SerializerShutdown).await { + error!("Failed to trigger serializer shutdown. Error = {e}") + } + }); } } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 60330834..bb1d800b 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -65,7 +65,9 @@ enum Status { Normal, SlowEventloop(Publish), EventLoopReady, - EventLoopCrash(Publish), + // Transitions into Crash mode need not + // be triggered by `EventLoop` dying out + EventLoopCrash(Option), } /// Description of an interface that the [`Serializer`] expects to be provided by the MQTT client to publish the serialized data with. @@ -203,6 +205,8 @@ impl StorageHandler { } } +pub struct SerializerShutdown; + /// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network. /// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`]. /// @@ -252,6 +256,7 @@ pub struct Serializer { metrics: SerializerMetrics, metrics_tx: Sender, pending_metrics: VecDeque, + ctrl_rx: Receiver, } impl Serializer { @@ -262,6 +267,7 @@ impl Serializer { collector_rx: Receiver>, client: C, metrics_tx: Sender, + ctrl_rx: Receiver, ) -> Result, Error> { let storage_handler = StorageHandler::new(config.clone())?; @@ -273,17 +279,22 @@ impl Serializer { metrics: SerializerMetrics::new("catchup"), metrics_tx, pending_metrics: VecDeque::with_capacity(3), + ctrl_rx, }) } /// Write all data received, from here-on, to disk only. - async fn crash(&mut self, publish: Publish) -> Result { - let storage = self.storage_handler.select(&publish.topic); - // Write failed publish to disk first, metrics don't matter - match write_to_disk(publish, storage) { - Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), - Ok(_) => {} - Err(e) => error!("Crash loop: write error = {:?}", e), + async fn crash(&mut self, publish: Option) -> Result { + if let Some(publish) = publish { + let storage = self.storage_handler.select(&publish.topic); + // Write failed publish to disk first, metrics don't matter + match write_to_disk(publish, storage) { + Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), + Ok(_) => {} + Err(e) => error!("Crash loop: write error = {:?}", e), + } + } else { + debug!("Forced into crash mode, writing all incoming data to persistence."); } loop { @@ -339,7 +350,7 @@ impl Serializer { break Ok(Status::EventLoopReady) } Err(MqttError::Send(Request::Publish(publish))) => { - break Ok(Status::EventLoopCrash(publish)); + break Ok(Status::EventLoopCrash(Some(publish))); }, Err(e) => { unreachable!("Unexpected error: {}", e); @@ -348,6 +359,10 @@ impl Serializer { _ = interval.tick() => { check_metrics(&mut self.metrics, &self.storage_handler); } + // Transition into crash mode when uplink is shutting down + _ = self.ctrl_rx.recv_async() => { + break Ok(Status::EventLoopCrash(None)) + } } }; @@ -427,7 +442,7 @@ impl Serializer { // indefinitely write to disk to not loose data let client = match o { Ok(c) => c, - Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish)), + Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(Some(publish))), Err(e) => unreachable!("Unexpected error: {}", e), }; @@ -455,6 +470,10 @@ impl Serializer { _ = interval.tick() => { let _ = check_and_flush_metrics(&mut self.pending_metrics, &mut self.metrics, &self.metrics_tx, &self.storage_handler); } + // Transition into crash mode when uplink is shutting down + _ = self.ctrl_rx.recv_async() => { + return Ok(Status::EventLoopCrash(None)) + } } }; @@ -500,6 +519,10 @@ impl Serializer { debug!("Failed to flush serializer metrics (normal). Error = {}", e); } } + // Transition into crash mode when uplink is shutting down + _ = self.ctrl_rx.recv_async() => { + return Ok(Status::EventLoopCrash(None)) + } } } } @@ -689,6 +712,7 @@ fn check_and_flush_metrics( #[cfg(test)] mod test { + use flume::bounded; use serde_json::Value; use std::collections::HashMap; @@ -774,8 +798,9 @@ mod test { let (net_tx, net_rx) = flume::bounded(1); let (metrics_tx, _metrics_rx) = flume::bounded(1); let client = MockClient { net_tx }; + let (_, ctrl_rx) = bounded(1); - (Serializer::new(config, data_rx, client, metrics_tx).unwrap(), data_tx, net_rx) + (Serializer::new(config, data_rx, client, metrics_tx, ctrl_rx).unwrap(), data_tx, net_rx) } #[derive(Error, Debug)] @@ -918,7 +943,9 @@ mod test { ); match tokio::runtime::Runtime::new().unwrap().block_on(serializer.slow(publish)).unwrap() { - Status::EventLoopCrash(Publish { qos: QoS::AtLeastOnce, topic, payload, .. }) => { + Status::EventLoopCrash(Some(Publish { + qos: QoS::AtLeastOnce, topic, payload, .. + })) => { assert_eq!(topic, "hello/world"); let recvd = std::str::from_utf8(&payload).unwrap(); assert_eq!(recvd, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); @@ -1022,7 +1049,7 @@ mod test { write_to_disk(publish.clone(), &mut storage).unwrap(); match tokio::runtime::Runtime::new().unwrap().block_on(serializer.catchup()).unwrap() { - Status::EventLoopCrash(Publish { topic, payload, .. }) => { + Status::EventLoopCrash(Some(Publish { topic, payload, .. })) => { assert_eq!(topic, "hello/world"); let recvd = std::str::from_utf8(&payload).unwrap(); assert_eq!(recvd, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 95b2c78d..5b9b5c38 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -439,8 +439,9 @@ mod test { let data = DataBridgeTx { data_tx, shutdown_handle }; let (shutdown_handle, _) = bounded(1); let actions = ActionsBridgeTx { status_tx, shutdown_handle }; + let (serializer_shutdown, _) = bounded(0); - (BridgeTx { data, actions }, status_rx) + (BridgeTx { data, actions, serializer_shutdown }, status_rx) } #[test] diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index dd921f04..06e6f2cd 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -163,8 +163,9 @@ mod tests { let data = DataBridgeTx { data_tx, shutdown_handle }; let (shutdown_handle, _) = bounded(1); let actions = ActionsBridgeTx { status_tx, shutdown_handle }; + let (serializer_shutdown, _) = bounded(0); - (BridgeTx { data, actions }, status_rx) + (BridgeTx { data, actions, serializer_shutdown }, status_rx) } #[test] diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 52f4acd0..cd206e17 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -262,7 +262,7 @@ pub mod config { pub use base::actions::{Action, ActionResponse}; use base::bridge::{Bridge, Package, Payload, Point, StreamMetrics}; use base::mqtt::Mqtt; -use base::serializer::{Serializer, SerializerMetrics}; +use base::serializer::{Serializer, SerializerMetrics, SerializerShutdown}; pub use base::{ActionRoute, Config}; pub use collector::{simulator, tcpjson::TcpJson}; pub use storage::Storage; @@ -287,6 +287,8 @@ pub struct Uplink { serializer_metrics_rx: Receiver, shutdown_tx: Sender<()>, shutdown_rx: Receiver<()>, + serializer_shutdown_tx: Sender, + serializer_shutdown_rx: Receiver, } impl Uplink { @@ -296,6 +298,7 @@ impl Uplink { let (stream_metrics_tx, stream_metrics_rx) = bounded(10); let (serializer_metrics_tx, serializer_metrics_rx) = bounded(10); let (shutdown_tx, shutdown_rx) = bounded(1); + let (serializer_shutdown_tx, serializer_shutdown_rx) = bounded(1); Ok(Uplink { config, @@ -309,6 +312,8 @@ impl Uplink { serializer_metrics_rx, shutdown_tx, shutdown_rx, + serializer_shutdown_tx, + serializer_shutdown_rx, }) } @@ -319,6 +324,7 @@ impl Uplink { self.stream_metrics_tx(), self.action_rx.clone(), self.shutdown_tx.clone(), + self.serializer_shutdown_tx.clone(), ) } @@ -333,6 +339,7 @@ impl Uplink { self.data_rx.clone(), mqtt_client.clone(), self.serializer_metrics_tx(), + self.serializer_shutdown_rx.clone(), )?; // Serializer thread to handle network conditions state machine @@ -379,7 +386,7 @@ impl Uplink { }) }); - let Bridge { data: mut data_lane, actions: mut actions_lane } = bridge; + let Bridge { data: mut data_lane, actions: mut actions_lane, .. } = bridge; // Bridge thread to direct actions spawn_named_thread("Bridge actions_lane", || {