Skip to content

Commit

Permalink
feat: force serializer into crash mode on shutdown (#309)
Browse files Browse the repository at this point in the history
* feat: force serializer into crash mode on shutdown

* log: writing incoming data into persistence

* fix: pull pending requests from channel
  • Loading branch information
Devdutt Shenoi committed Dec 1, 2023
1 parent 50d6766 commit 3cc82ff
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
bytes = "1"
flume = "0.10"
rumqttc = { git = "https://github.com/bytebeamio/rumqtt" }
rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "channel-pending" }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
serde_with = "3.3.0"
Expand Down
20 changes: 16 additions & 4 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use flume::{Receiver, Sender};
use log::error;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::join;
Expand All @@ -20,7 +21,7 @@ pub use self::{
data_lane::{DataBridge, DataBridgeTx},
};

use super::Compression;
use super::{serializer::SerializerShutdown, Compression};
pub use metrics::StreamMetrics;

pub trait Point: Send + Debug + Serialize + 'static {
Expand Down Expand Up @@ -79,6 +80,7 @@ pub(crate) struct DataBridgeShutdown;
pub struct Bridge {
pub(crate) data: DataBridge,
pub(crate) actions: ActionsBridge,
pub(crate) serializer_shutdown: Sender<SerializerShutdown>,
}

impl Bridge {
Expand All @@ -88,15 +90,20 @@ impl Bridge {
metrics_tx: Sender<StreamMetrics>,
actions_rx: Receiver<Action>,
shutdown_handle: Sender<()>,
serializer_shutdown: Sender<SerializerShutdown>,
) -> Self {
let data = DataBridge::new(config.clone(), package_tx.clone(), metrics_tx.clone());
let actions =
ActionsBridge::new(config, package_tx, actions_rx, shutdown_handle, metrics_tx);
Self { data, actions }
Self { data, actions, serializer_shutdown }
}

pub fn tx(&self) -> BridgeTx {
BridgeTx { data: self.data.tx(), actions: self.actions.tx() }
BridgeTx {
data: self.data.tx(),
actions: self.actions.tx(),
serializer_shutdown: self.serializer_shutdown.clone(),
}
}

pub fn register_action_route(
Expand All @@ -120,6 +127,7 @@ impl Bridge {
pub struct BridgeTx {
pub data: DataBridgeTx,
pub actions: ActionsBridgeTx,
pub serializer_shutdown: Sender<SerializerShutdown>,
}

impl BridgeTx {
Expand All @@ -136,6 +144,10 @@ impl BridgeTx {
}

pub async fn trigger_shutdown(&self) {
join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown());
join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown(), async {
if let Err(e) = self.serializer_shutdown.send_async(SerializerShutdown).await {
error!("Failed to trigger serializer shutdown. Error = {e}")
}
});
}
}
53 changes: 40 additions & 13 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ enum Status {
Normal,
SlowEventloop(Publish),
EventLoopReady,
EventLoopCrash(Publish),
// Transitions into Crash mode need not
// be triggered by `EventLoop` dying out
EventLoopCrash(Option<Publish>),
}

/// Description of an interface that the [`Serializer`] expects to be provided by the MQTT client to publish the serialized data with.
Expand Down Expand Up @@ -203,6 +205,8 @@ impl StorageHandler {
}
}

pub struct SerializerShutdown;

/// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network.
/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`].
///
Expand Down Expand Up @@ -252,6 +256,7 @@ pub struct Serializer<C: MqttClient> {
metrics: SerializerMetrics,
metrics_tx: Sender<SerializerMetrics>,
pending_metrics: VecDeque<SerializerMetrics>,
ctrl_rx: Receiver<SerializerShutdown>,
}

impl<C: MqttClient> Serializer<C> {
Expand All @@ -262,6 +267,7 @@ impl<C: MqttClient> Serializer<C> {
collector_rx: Receiver<Box<dyn Package>>,
client: C,
metrics_tx: Sender<SerializerMetrics>,
ctrl_rx: Receiver<SerializerShutdown>,
) -> Result<Serializer<C>, Error> {
let storage_handler = StorageHandler::new(config.clone())?;

Expand All @@ -273,17 +279,22 @@ impl<C: MqttClient> Serializer<C> {
metrics: SerializerMetrics::new("catchup"),
metrics_tx,
pending_metrics: VecDeque::with_capacity(3),
ctrl_rx,
})
}

/// Write all data received, from here-on, to disk only.
async fn crash(&mut self, publish: Publish) -> Result<Status, Error> {
let storage = self.storage_handler.select(&publish.topic);
// Write failed publish to disk first, metrics don't matter
match write_to_disk(publish, storage) {
Ok(Some(deleted)) => debug!("Lost segment = {deleted}"),
Ok(_) => {}
Err(e) => error!("Crash loop: write error = {:?}", e),
async fn crash(&mut self, publish: Option<Publish>) -> Result<Status, Error> {
if let Some(publish) = publish {
let storage = self.storage_handler.select(&publish.topic);
// Write failed publish to disk first, metrics don't matter
match write_to_disk(publish, storage) {
Ok(Some(deleted)) => debug!("Lost segment = {deleted}"),
Ok(_) => {}
Err(e) => error!("Crash loop: write error = {:?}", e),
}
} else {
debug!("Forced into crash mode, writing all incoming data to persistence.");
}

loop {
Expand Down Expand Up @@ -339,7 +350,7 @@ impl<C: MqttClient> Serializer<C> {
break Ok(Status::EventLoopReady)
}
Err(MqttError::Send(Request::Publish(publish))) => {
break Ok(Status::EventLoopCrash(publish));
break Ok(Status::EventLoopCrash(Some(publish)));
},
Err(e) => {
unreachable!("Unexpected error: {}", e);
Expand All @@ -348,6 +359,10 @@ impl<C: MqttClient> Serializer<C> {
_ = interval.tick() => {
check_metrics(&mut self.metrics, &self.storage_handler);
}
// Transition into crash mode when uplink is shutting down
_ = self.ctrl_rx.recv_async() => {
break Ok(Status::EventLoopCrash(None))
}
}
};

Expand Down Expand Up @@ -427,7 +442,7 @@ impl<C: MqttClient> Serializer<C> {
// indefinitely write to disk to not loose data
let client = match o {
Ok(c) => c,
Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish)),
Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(Some(publish))),
Err(e) => unreachable!("Unexpected error: {}", e),
};

Expand Down Expand Up @@ -455,6 +470,10 @@ impl<C: MqttClient> Serializer<C> {
_ = interval.tick() => {
let _ = check_and_flush_metrics(&mut self.pending_metrics, &mut self.metrics, &self.metrics_tx, &self.storage_handler);
}
// Transition into crash mode when uplink is shutting down
_ = self.ctrl_rx.recv_async() => {
return Ok(Status::EventLoopCrash(None))
}
}
};

Expand Down Expand Up @@ -500,6 +519,10 @@ impl<C: MqttClient> Serializer<C> {
debug!("Failed to flush serializer metrics (normal). Error = {}", e);
}
}
// Transition into crash mode when uplink is shutting down
_ = self.ctrl_rx.recv_async() => {
return Ok(Status::EventLoopCrash(None))
}
}
}
}
Expand Down Expand Up @@ -689,6 +712,7 @@ fn check_and_flush_metrics(

#[cfg(test)]
mod test {
use flume::bounded;
use serde_json::Value;

use std::collections::HashMap;
Expand Down Expand Up @@ -774,8 +798,9 @@ mod test {
let (net_tx, net_rx) = flume::bounded(1);
let (metrics_tx, _metrics_rx) = flume::bounded(1);
let client = MockClient { net_tx };
let (_, ctrl_rx) = bounded(1);

(Serializer::new(config, data_rx, client, metrics_tx).unwrap(), data_tx, net_rx)
(Serializer::new(config, data_rx, client, metrics_tx, ctrl_rx).unwrap(), data_tx, net_rx)
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -918,7 +943,9 @@ mod test {
);

match tokio::runtime::Runtime::new().unwrap().block_on(serializer.slow(publish)).unwrap() {
Status::EventLoopCrash(Publish { qos: QoS::AtLeastOnce, topic, payload, .. }) => {
Status::EventLoopCrash(Some(Publish {
qos: QoS::AtLeastOnce, topic, payload, ..
})) => {
assert_eq!(topic, "hello/world");
let recvd = std::str::from_utf8(&payload).unwrap();
assert_eq!(recvd, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
Expand Down Expand Up @@ -1022,7 +1049,7 @@ mod test {
write_to_disk(publish.clone(), &mut storage).unwrap();

match tokio::runtime::Runtime::new().unwrap().block_on(serializer.catchup()).unwrap() {
Status::EventLoopCrash(Publish { topic, payload, .. }) => {
Status::EventLoopCrash(Some(Publish { topic, payload, .. })) => {
assert_eq!(topic, "hello/world");
let recvd = std::str::from_utf8(&payload).unwrap();
assert_eq!(recvd, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
Expand Down
3 changes: 2 additions & 1 deletion uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,9 @@ mod test {
let data = DataBridgeTx { data_tx, shutdown_handle };
let (shutdown_handle, _) = bounded(1);
let actions = ActionsBridgeTx { status_tx, shutdown_handle };
let (serializer_shutdown, _) = bounded(0);

(BridgeTx { data, actions }, status_rx)
(BridgeTx { data, actions, serializer_shutdown }, status_rx)
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion uplink/src/collector/script_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ mod tests {
let data = DataBridgeTx { data_tx, shutdown_handle };
let (shutdown_handle, _) = bounded(1);
let actions = ActionsBridgeTx { status_tx, shutdown_handle };
let (serializer_shutdown, _) = bounded(0);

(BridgeTx { data, actions }, status_rx)
(BridgeTx { data, actions, serializer_shutdown }, status_rx)
}

#[test]
Expand Down
11 changes: 9 additions & 2 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ pub mod config {
pub use base::actions::{Action, ActionResponse};
use base::bridge::{Bridge, Package, Payload, Point, StreamMetrics};
use base::mqtt::Mqtt;
use base::serializer::{Serializer, SerializerMetrics};
use base::serializer::{Serializer, SerializerMetrics, SerializerShutdown};
pub use base::{ActionRoute, Config};
pub use collector::{simulator, tcpjson::TcpJson};
pub use storage::Storage;
Expand All @@ -287,6 +287,8 @@ pub struct Uplink {
serializer_metrics_rx: Receiver<SerializerMetrics>,
shutdown_tx: Sender<()>,
shutdown_rx: Receiver<()>,
serializer_shutdown_tx: Sender<SerializerShutdown>,
serializer_shutdown_rx: Receiver<SerializerShutdown>,
}

impl Uplink {
Expand All @@ -296,6 +298,7 @@ impl Uplink {
let (stream_metrics_tx, stream_metrics_rx) = bounded(10);
let (serializer_metrics_tx, serializer_metrics_rx) = bounded(10);
let (shutdown_tx, shutdown_rx) = bounded(1);
let (serializer_shutdown_tx, serializer_shutdown_rx) = bounded(1);

Ok(Uplink {
config,
Expand All @@ -309,6 +312,8 @@ impl Uplink {
serializer_metrics_rx,
shutdown_tx,
shutdown_rx,
serializer_shutdown_tx,
serializer_shutdown_rx,
})
}

Expand All @@ -319,6 +324,7 @@ impl Uplink {
self.stream_metrics_tx(),
self.action_rx.clone(),
self.shutdown_tx.clone(),
self.serializer_shutdown_tx.clone(),
)
}

Expand All @@ -333,6 +339,7 @@ impl Uplink {
self.data_rx.clone(),
mqtt_client.clone(),
self.serializer_metrics_tx(),
self.serializer_shutdown_rx.clone(),
)?;

// Serializer thread to handle network conditions state machine
Expand Down Expand Up @@ -379,7 +386,7 @@ impl Uplink {
})
});

let Bridge { data: mut data_lane, actions: mut actions_lane } = bridge;
let Bridge { data: mut data_lane, actions: mut actions_lane, .. } = bridge;

// Bridge thread to direct actions
spawn_named_thread("Bridge actions_lane", || {
Expand Down

0 comments on commit 3cc82ff

Please sign in to comment.