From ae325b59c2ea2f007247e1eeae34de91168c1cd5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 14 Oct 2024 16:26:33 +0530 Subject: [PATCH] feat: `Serializer` pushes latest data first if configured to do so (#365) * feat: Serializer pushes latest data first if configured * doc: example config explainig usecase * feat: allow option per-stream * style: don't warn, rm unnecessary handling --- configs/config.toml | 13 ++++++- uplink/src/base/serializer/mod.rs | 55 ++++++++++++++++++++++------ uplink/src/config.rs | 5 +++ uplink/tests/serializer.rs | 60 ++++++++++++++++++++++++++++--- 4 files changed, 118 insertions(+), 15 deletions(-) diff --git a/configs/config.toml b/configs/config.toml index 1e328f759..70d75393f 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -20,6 +20,11 @@ default_buf_size = 1024 # 1KB # Maximum number of data streams that can be accepted by uplink max_stream_count = 10 +# All streams will first push the latest packet before pushing historical data in +# FIFO order, defaults to false. This solves the problem of bad networks leading to +# data being pushed so slow that it is practically impossible to track the device. +default_live_data_first = true + # MQTT client configuration # # Required Parameters @@ -84,13 +89,19 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"] # used when there is a network/system failure. # - priority(optional, u8): Higher prioirity streams get to push their data # onto the network first. +# - live_data_first(optional, bool): All streams will first push the latest packet +# before pushing historical data in FIFO order, defaults to false. This solves the +# problem of bad networks leading to data being pushed so slow that it is practically +# impossible to track the device. # # In the following config for the device_shadow stream we set batch_size to 1 and mark -# it as non-persistent. streams are internally constructed as a map of Name -> Config +# it as non-persistent, also setting up live_data_first to enable quick delivery of stats. +# Streams are internally constructed as a map of Name -> Config [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" flush_period = 5 priority = 75 +live_data_first = true # Example using compression [streams.imu] diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 798bd862c..c4acaa4cf 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -8,7 +8,7 @@ use std::{sync::Arc, time::Duration}; use bytes::Bytes; use flume::{bounded, Receiver, RecvError, Sender, TrySendError}; -use log::{debug, error, info, trace}; +use log::{debug, error, info, trace, warn}; use lz4_flex::frame::FrameEncoder; use rumqttc::*; use thiserror::Error; @@ -133,11 +133,17 @@ impl MqttClient for AsyncClient { pub struct Storage { inner: storage::Storage, + live_data_first: bool, + latest_data: Option, } impl Storage { - pub fn new(name: impl Into, max_file_size: usize) -> Self { - Self { inner: storage::Storage::new(name, max_file_size) } + pub fn new(name: impl Into, max_file_size: usize, live_data_first: bool) -> Self { + Self { + inner: storage::Storage::new(name, max_file_size), + live_data_first, + latest_data: None, + } } pub fn set_persistence( @@ -151,6 +157,13 @@ impl Storage { // Stores the provided publish packet by serializing it into storage, after setting its pkid to 1. // If the write buffer is full, it is flushed/written onto disk based on config. pub fn write(&mut self, mut publish: Publish) -> Result, storage::Error> { + if self.live_data_first { + let Some(previous) = self.latest_data.replace(publish) else { return Ok(None) }; + publish = previous; + } else if self.latest_data.is_some() { + warn!("Latest data should be unoccupied if not using the live data first scheme"); + } + publish.pkid = 1; if let Err(e) = publish.write(self.inner.writer()) { error!("Failed to fill disk buffer. Error = {e}"); @@ -170,6 +183,10 @@ impl Storage { // ## Panic // When any packet other than a publish is deserialized. pub fn read(&mut self, max_packet_size: usize) -> Option { + if let Some(publish) = self.latest_data.take() { + return Some(publish); + } + // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. // This leads to force switching to normal mode. Increasing max_payload_size to bypass this match Packet::read(self.inner.reader(), max_packet_size) { @@ -184,6 +201,15 @@ impl Storage { // Ensures all data is written into persistence, when configured. pub fn flush(&mut self) -> Result, storage::Error> { + // Write live cache to disk when flushing + if let Some(mut publish) = self.latest_data.take() { + publish.pkid = 1; + if let Err(e) = publish.write(self.inner.writer()) { + error!("Failed to fill disk buffer. Error = {e}"); + return Ok(None); + } + } + self.inner.flush() } } @@ -202,8 +228,11 @@ impl StorageHandler { // NOTE: persist action_status if not configured otherwise streams.insert("action_status".into(), config.action_status.clone()); for (stream_name, stream_config) in streams { - let mut storage = - Storage::new(&stream_config.topic, stream_config.persistence.max_file_size); + let mut storage = Storage::new( + &stream_config.topic, + stream_config.persistence.max_file_size, + stream_config.live_data_first, + ); if stream_config.persistence.max_file_count > 0 { let mut path = config.persistence_path.clone(); path.push(&stream_name); @@ -229,7 +258,13 @@ impl StorageHandler { match self .map .entry(stream.to_owned()) - .or_insert_with(|| Storage::new(&stream.topic, self.config.default_buf_size)) + .or_insert_with(|| { + Storage::new( + &stream.topic, + self.config.default_buf_size, + self.config.default_live_data_first, + ) + }) .write(publish) { Ok(Some(deleted)) => { @@ -907,7 +942,7 @@ pub mod tests { fn read_write_storage() { let config = Arc::new(default_config()); - let mut storage = Storage::new("hello/world", 1024); + let mut storage = Storage::new("hello/world", 1024, false); let mut publish = Publish::new( "hello/world", QoS::AtLeastOnce, @@ -1024,7 +1059,7 @@ pub mod tests { .storage_handler .map .entry(Arc::new(Default::default())) - .or_insert(Storage::new("hello/world", 1024)); + .or_insert(Storage::new("hello/world", 1024, false)); let (stream_name, stream_config) = ( "hello", @@ -1082,7 +1117,7 @@ pub mod tests { topic: "hello/world".to_string(), ..Default::default() })) - .or_insert(Storage::new("hello/world", 1024)); + .or_insert(Storage::new("hello/world", 1024, false)); let (stream_name, stream_config) = ( "hello", @@ -1194,7 +1229,7 @@ pub mod tests { priority: 0, ..Default::default() })) - .or_insert(Storage::new("topic/default", 1024)); + .or_insert(Storage::new("topic/default", 1024, false)); default.write(publish("topic/default".to_string(), 0)).unwrap(); default.write(publish("topic/default".to_string(), 2)).unwrap(); diff --git a/uplink/src/config.rs b/uplink/src/config.rs index b7d1aff68..0c1921b3b 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -93,6 +93,8 @@ pub struct StreamConfig { pub persistence: Persistence, #[serde(default)] pub priority: u8, + #[serde(default)] + pub live_data_first: bool, } impl Default for StreamConfig { @@ -104,6 +106,7 @@ impl Default for StreamConfig { compression: Compression::Disabled, persistence: Persistence::default(), priority: 0, + live_data_first: false, } } } @@ -536,6 +539,8 @@ pub struct Config { pub logging: Option, pub precondition_checks: Option, pub bus: Option, + #[serde(default)] + pub default_live_data_first: bool, } impl Config { diff --git a/uplink/tests/serializer.rs b/uplink/tests/serializer.rs index 0a171110d..b45999b79 100644 --- a/uplink/tests/serializer.rs +++ b/uplink/tests/serializer.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use flume::bounded; use rumqttc::{Publish, QoS, Request}; use tempdir::TempDir; -use tokio::{runtime::Runtime, spawn}; +use tokio::{runtime::Runtime, spawn, time::sleep}; use uplink::{ base::{bridge::Payload, serializer::Serializer}, @@ -83,18 +83,18 @@ async fn preferential_send_on_network() { }; // write packets for one, two and top onto disk - let mut one = Storage::new("topic/one", 1024 * 1024); + let mut one = Storage::new("topic/one", 1024 * 1024, false); one.set_persistence(persistence_path(&config.persistence_path, "one"), 1).unwrap(); one.write(publish("topic/one".to_string(), 4)).unwrap(); one.write(publish("topic/one".to_string(), 5)).unwrap(); one.flush().unwrap(); - let mut two = Storage::new("topic/two", 1024 * 1024); + let mut two = Storage::new("topic/two", 1024 * 1024, false); two.set_persistence(persistence_path(&config.persistence_path, "two"), 1).unwrap(); two.write(publish("topic/two".to_string(), 3)).unwrap(); two.flush().unwrap(); - let mut top = Storage::new("topic/top", 1024 * 1024); + let mut top = Storage::new("topic/top", 1024 * 1024, false); top.set_persistence(persistence_path(&config.persistence_path, "top"), 1).unwrap(); top.write(publish("topic/top".to_string(), 1)).unwrap(); top.write(publish("topic/top".to_string(), 2)).unwrap(); @@ -216,3 +216,55 @@ async fn fifo_data_push() { assert_eq!(topic, "topic/default"); assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); } + +#[tokio::test] +// Ensures that live data is pushed first if configured to do so +async fn prefer_live_data() { + let mut config = Config::default(); + config.default_buf_size = 1024 * 1024; + config.mqtt.max_packet_size = 1024 * 1024; + config.default_live_data_first = true; + let config = Arc::new(config); + let (data_tx, data_rx) = bounded(0); + let (net_tx, req_rx) = bounded(0); + let (metrics_tx, _metrics_rx) = bounded(1); + let client = MockClient { net_tx }; + let serializer = Serializer::new(config, data_rx, client, metrics_tx).unwrap(); + + // start serializer in the background + thread::spawn(|| _ = Runtime::new().unwrap().block_on(serializer.start())); + + spawn(async { + let mut default = MockCollector::new( + "default", + StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() }, + data_tx, + ); + for i in 0.. { + default.send(i).await.unwrap(); + sleep(Duration::from_millis(250)).await; + } + }); + + sleep(Duration::from_millis(750)).await; + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":0,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); + + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); + + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); +}