Skip to content

Commit

Permalink
feat: Ctrl handles for Serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Dec 11, 2023
1 parent c369d13 commit 1ef6ee9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 22 deletions.
6 changes: 1 addition & 5 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use flume::{Receiver, Sender};
use log::error;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand All @@ -19,7 +18,6 @@ pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx};

use super::StreamConfig;
use crate::base::ActionRoute;
use crate::base::serializer::SerializerShutdown;
use crate::{Action, ActionResponse, Config};
pub use metrics::StreamMetrics;

Expand Down Expand Up @@ -78,7 +76,6 @@ pub(crate) struct DataBridgeShutdown;
pub struct Bridge {
pub(crate) data: DataBridge,
pub(crate) actions: ActionsBridge,
pub(crate) serializer_shutdown: Sender<SerializerShutdown>,
}

impl Bridge {
Expand All @@ -88,12 +85,11 @@ impl Bridge {
metrics_tx: Sender<StreamMetrics>,
actions_rx: Receiver<Action>,
shutdown_handle: Sender<()>,
serializer_shutdown: Sender<SerializerShutdown>,
) -> Self {
let data = DataBridge::new(config.clone(), package_tx.clone(), metrics_tx.clone());
let actions =
ActionsBridge::new(config, package_tx, actions_rx, shutdown_handle, metrics_tx);
Self { data, actions, serializer_shutdown }
Self { data, actions }
}

/// Handle to send data/action status messages
Expand Down
8 changes: 7 additions & 1 deletion uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::collector::logcat::LogcatConfig;

use self::bridge::stream::MAX_BUFFER_SIZE;
use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx};
use self::serializer::CtrlTx as SerializerCtrlTx;

pub mod actions;
pub mod bridge;
Expand Down Expand Up @@ -289,10 +290,15 @@ pub struct Config {
pub struct CtrlTx {
pub actions_lane: ActionsLaneCtrlTx,
pub data_lane: DataLaneCtrlTx,
pub serializer: SerializerCtrlTx,
}

impl CtrlTx {
pub async fn trigger_shutdown(&self) {
join!(self.actions_lane.trigger_shutdown(), self.data_lane.trigger_shutdown());
join!(
self.actions_lane.trigger_shutdown(),
self.data_lane.trigger_shutdown(),
self.serializer.trigger_shutdown()
);
}
}
33 changes: 26 additions & 7 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Instant;
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use flume::{Receiver, RecvError, Sender};
use flume::{bounded, Receiver, RecvError, Sender};
use log::{debug, error, info, trace};
use lz4_flex::frame::FrameEncoder;
use rumqttc::*;
Expand Down Expand Up @@ -225,8 +225,6 @@ impl StorageHandler {
}
}

pub struct SerializerShutdown;

/// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network.
/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`].
///
Expand Down Expand Up @@ -279,7 +277,9 @@ pub struct Serializer<C: MqttClient> {
metrics: SerializerMetrics,
metrics_tx: Sender<SerializerMetrics>,
pending_metrics: VecDeque<SerializerMetrics>,
/// Control handles
ctrl_rx: Receiver<SerializerShutdown>,
ctrl_tx: Sender<SerializerShutdown>,
}

impl<C: MqttClient> Serializer<C> {
Expand All @@ -290,9 +290,9 @@ impl<C: MqttClient> Serializer<C> {
collector_rx: Receiver<Box<dyn Package>>,
client: C,
metrics_tx: Sender<SerializerMetrics>,
ctrl_rx: Receiver<SerializerShutdown>,
) -> Result<Serializer<C>, Error> {
let storage_handler = StorageHandler::new(config.clone())?;
let (ctrl_tx, ctrl_rx) = bounded(1);

Ok(Serializer {
config,
Expand All @@ -302,10 +302,15 @@ impl<C: MqttClient> Serializer<C> {
metrics: SerializerMetrics::new("catchup"),
metrics_tx,
pending_metrics: VecDeque::with_capacity(3),
ctrl_tx,
ctrl_rx,
})
}

pub fn ctrl_tx(&self) -> CtrlTx {
CtrlTx { inner: self.ctrl_tx.clone() }
}

/// Write all data received, from here-on, to disk only, shutdown serializer
/// after handling all data payloads.
async fn shutdown(&mut self) -> Result<Status, Error> {
Expand Down Expand Up @@ -772,12 +777,27 @@ fn check_and_flush_metrics(
Ok(())
}

/// Command to remotely trigger `Serializer` shutdown
pub(crate) struct SerializerShutdown;

/// Handle to send control messages to `Serializer`
#[derive(Debug, Clone)]
pub struct CtrlTx {
pub(crate) inner: Sender<SerializerShutdown>,
}

impl CtrlTx {
/// Triggers shutdown of `Serializer`
pub async fn trigger_shutdown(&self) {
self.inner.send_async(SerializerShutdown).await.unwrap()
}
}

// TODO(RT): Test cases
// - Restart with no internet but files on disk

#[cfg(test)]
mod test {
use flume::bounded;
use serde_json::Value;

use std::collections::HashMap;
Expand Down Expand Up @@ -863,9 +883,8 @@ mod test {
let (net_tx, net_rx) = flume::bounded(1);
let (metrics_tx, _metrics_rx) = flume::bounded(1);
let client = MockClient { net_tx };
let (_, ctrl_rx) = bounded(1);

(Serializer::new(config, data_rx, client, metrics_tx, ctrl_rx).unwrap(), data_tx, net_rx)
(Serializer::new(config, data_rx, client, metrics_tx).unwrap(), data_tx, net_rx)
}

#[derive(Error, Debug)]
Expand Down
16 changes: 7 additions & 9 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ pub mod config {
pub use base::actions::{Action, ActionResponse};
use base::bridge::{Bridge, Package, Payload, Point, StreamMetrics};
use base::mqtt::Mqtt;
use base::serializer::{Serializer, SerializerMetrics, SerializerShutdown};
use base::serializer::{Serializer, SerializerMetrics};
pub use base::{ActionRoute, Config};
pub use collector::{simulator, tcpjson::TcpJson};
pub use storage::Storage;
Expand All @@ -288,8 +288,6 @@ pub struct Uplink {
serializer_metrics_rx: Receiver<SerializerMetrics>,
shutdown_tx: Sender<()>,
shutdown_rx: Receiver<()>,
serializer_shutdown_tx: Sender<SerializerShutdown>,
serializer_shutdown_rx: Receiver<SerializerShutdown>,
}

impl Uplink {
Expand All @@ -299,7 +297,6 @@ impl Uplink {
let (stream_metrics_tx, stream_metrics_rx) = bounded(10);
let (serializer_metrics_tx, serializer_metrics_rx) = bounded(10);
let (shutdown_tx, shutdown_rx) = bounded(1);
let (serializer_shutdown_tx, serializer_shutdown_rx) = bounded(1);

Ok(Uplink {
config,
Expand All @@ -313,8 +310,6 @@ impl Uplink {
serializer_metrics_rx,
shutdown_tx,
shutdown_rx,
serializer_shutdown_tx,
serializer_shutdown_rx,
})
}

Expand All @@ -325,7 +320,6 @@ impl Uplink {
self.stream_metrics_tx(),
self.action_rx.clone(),
self.shutdown_tx.clone(),
self.serializer_shutdown_tx.clone(),
)
}

Expand All @@ -341,8 +335,8 @@ impl Uplink {
self.data_rx.clone(),
mqtt_client.clone(),
self.serializer_metrics_tx(),
self.serializer_shutdown_rx.clone(),
)?;
let ctrl_serializer = serializer.ctrl_tx();

// Serializer thread to handle network conditions state machine
// and send data to mqtt thread
Expand Down Expand Up @@ -412,7 +406,11 @@ impl Uplink {
})
});

Ok(CtrlTx { actions_lane: ctrl_actions_lane, data_lane: ctrl_data_lane })
Ok(CtrlTx {
actions_lane: ctrl_actions_lane,
data_lane: ctrl_data_lane,
serializer: ctrl_serializer,
})
}

pub fn spawn_builtins(&mut self, bridge: &mut Bridge) -> Result<(), Error> {
Expand Down

0 comments on commit 1ef6ee9

Please sign in to comment.