From f1dbca65c49b76f6195d95258b0aec8221c7b21b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 5 Dec 2023 18:57:52 +0530 Subject: [PATCH] refactor: use `StreamConfig` in `Serializer` (#296) * refactor: use `StreamConfig` in `Serializer` * test: fix stream config * refactor: rm compression from `Stream`/`Buffer` * style: `stream_name`/`stream_config` * refactor: `new` replaces `with_config` --- uplink/src/base/bridge/mod.rs | 22 ++-- uplink/src/base/bridge/stream.rs | 108 ++++++------------- uplink/src/base/bridge/streams.rs | 7 +- uplink/src/base/mod.rs | 19 +++- uplink/src/base/serializer/mod.rs | 167 +++++++++++++++++++----------- 5 files changed, 168 insertions(+), 155 deletions(-) diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index b95b6de0..f96e217e 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -12,15 +12,14 @@ mod metrics; pub(crate) mod stream; mod streams; -use crate::{Action, ActionResponse, ActionRoute, Config}; - -use self::actions_lane::Error; -pub use self::{ - actions_lane::{ActionsBridge, ActionsBridgeTx}, - data_lane::{DataBridge, DataBridgeTx}, -}; - -use super::Compression; +pub use actions_lane::ActionsBridgeTx; +use actions_lane::{ActionsBridge, Error}; +use data_lane::DataBridge; +pub use data_lane::DataBridgeTx; + +use super::StreamConfig; +use crate::base::ActionRoute; +use crate::{Action, ActionResponse, Config}; pub use metrics::StreamMetrics; pub trait Point: Send + Debug + Serialize + 'static { @@ -30,8 +29,8 @@ pub trait Point: Send + Debug + Serialize + 'static { } pub trait Package: Send + Debug { - fn topic(&self) -> Arc; - fn stream(&self) -> Arc; + fn stream_config(&self) -> Arc; + fn stream_name(&self) -> Arc; // TODO: Implement a generic Return type that can wrap // around custom serialization error types. fn serialize(&self) -> serde_json::Result>; @@ -41,7 +40,6 @@ pub trait Package: Send + Debug { fn is_empty(&self) -> bool { self.len() == 0 } - fn compression(&self) -> Compression; } // TODO Don't do any deserialization on payload. Read it a Vec which is in turn a json diff --git a/uplink/src/base/bridge/stream.rs b/uplink/src/base/bridge/stream.rs index d83c9354..17c099bb 100644 --- a/uplink/src/base/bridge/stream.rs +++ b/uplink/src/base/bridge/stream.rs @@ -4,7 +4,7 @@ use flume::{SendError, Sender}; use log::{debug, trace}; use serde::Serialize; -use crate::base::{Compression, StreamConfig, DEFAULT_TIMEOUT}; +use crate::base::StreamConfig; use super::{Package, Point, StreamMetrics}; @@ -27,15 +27,12 @@ pub const MAX_BUFFER_SIZE: usize = 100; #[derive(Debug)] pub struct Stream { pub name: Arc, - pub max_buffer_size: usize, - pub flush_period: Duration, - topic: Arc, + pub config: Arc, last_sequence: u32, last_timestamp: u64, buffer: Buffer, tx: Sender>, pub metrics: StreamMetrics, - compression: Compression, } impl Stream @@ -44,50 +41,25 @@ where Buffer: Package, { pub fn new( - stream: impl Into, - topic: impl Into, - max_buffer_size: usize, + stream_name: impl Into, + stream_config: StreamConfig, tx: Sender>, - compression: Compression, ) -> Stream { - let name = Arc::new(stream.into()); - let topic = Arc::new(topic.into()); - let buffer = Buffer::new(name.clone(), topic.clone(), compression); - let flush_period = Duration::from_secs(DEFAULT_TIMEOUT); - let metrics = StreamMetrics::new(&name, max_buffer_size); + let name = Arc::new(stream_name.into()); + let config = Arc::new(stream_config); + let buffer = Buffer::new(name.clone(), config.clone()); + let metrics = StreamMetrics::new(&name, config.buf_size); - Stream { - name, - max_buffer_size, - flush_period, - topic, - last_sequence: 0, - last_timestamp: 0, - buffer, - tx, - metrics, - compression, - } - } - - pub fn with_config( - name: &str, - config: &StreamConfig, - tx: Sender>, - ) -> Stream { - let mut stream = Stream::new(name, &config.topic, config.buf_size, tx, config.compression); - stream.flush_period = config.flush_period; - stream + Stream { name, config, last_sequence: 0, last_timestamp: 0, buffer, tx, metrics } } pub fn dynamic( - stream: impl Into, + stream_name: impl Into, project_id: impl Into, device_id: impl Into, - max_buffer_size: usize, tx: Sender>, ) -> Stream { - let stream = stream.into(); + let stream_name = stream_name.into(); let project_id = project_id.into(); let device_id = device_id.into(); @@ -96,10 +68,11 @@ where + "/devices/" + &device_id + "/events/" - + &stream + + &stream_name + "/jsonarray"; + let config = StreamConfig { topic, ..Default::default() }; - Stream::new(stream, topic, max_buffer_size, tx, Compression::Disabled) + Stream::new(stream_name, config, tx) } fn add(&mut self, data: T) -> Result>, Error> { @@ -127,7 +100,7 @@ where self.last_timestamp = current_timestamp; // if max_buffer_size is breached, flush - let buf = if self.buffer.buffer.len() >= self.max_buffer_size { + let buf = if self.buffer.buffer.len() >= self.config.buf_size { self.metrics.add_batch(); Some(self.take_buffer()) } else { @@ -140,10 +113,10 @@ where // Returns buffer content, replacing with empty buffer in-place fn take_buffer(&mut self) -> Buffer { let name = self.name.clone(); - let topic = self.topic.clone(); - trace!("Flushing stream name: {}, topic: {}", name, topic); + let config = self.config.clone(); + trace!("Flushing stream name: {}, topic: {}", name, config.topic); - mem::replace(&mut self.buffer, Buffer::new(name, topic, self.compression)) + mem::replace(&mut self.buffer, Buffer::new(name, config)) } /// Triggers flush and async channel send if not empty @@ -175,7 +148,7 @@ where } let status = match self.len() { - 1 => StreamStatus::Init(self.flush_period), + 1 => StreamStatus::Init(self.config.flush_period), len => StreamStatus::Partial(len), }; @@ -192,7 +165,7 @@ where } let status = match self.len() { - 1 => StreamStatus::Init(self.flush_period), + 1 => StreamStatus::Init(self.config.flush_period), len => StreamStatus::Partial(len), }; @@ -210,23 +183,21 @@ where /// Buffer doesn't put any restriction on type of `T` #[derive(Debug)] pub struct Buffer { - pub stream: Arc, - pub topic: Arc, + pub stream_name: Arc, + pub stream_config: Arc, pub buffer: Vec, pub anomalies: String, pub anomaly_count: usize, - pub compression: Compression, } impl Buffer { - pub fn new(stream: Arc, topic: Arc, compression: Compression) -> Buffer { + pub fn new(stream_name: Arc, stream_config: Arc) -> Buffer { Buffer { - stream, - topic, - buffer: vec![], + buffer: Vec::with_capacity(stream_config.buf_size), + stream_name, + stream_config, anomalies: String::with_capacity(100), anomaly_count: 0, - compression, } } @@ -236,7 +207,7 @@ impl Buffer { return; } - let error = String::from(self.stream.as_ref()) + let error = self.stream_name.to_string() + ".sequence: " + &last.to_string() + ", " @@ -268,12 +239,12 @@ where T: Point, Vec: Serialize, { - fn topic(&self) -> Arc { - self.topic.clone() + fn stream_config(&self) -> Arc { + self.stream_config.clone() } - fn stream(&self) -> Arc { - self.stream.clone() + fn stream_name(&self) -> Arc { + self.stream_name.clone() } fn serialize(&self) -> serde_json::Result> { @@ -291,29 +262,18 @@ where fn latency(&self) -> u64 { 0 } - - fn compression(&self) -> Compression { - self.compression - } } impl Clone for Stream { fn clone(&self) -> Self { Stream { name: self.name.clone(), - flush_period: self.flush_period, - max_buffer_size: self.max_buffer_size, - topic: self.topic.clone(), + config: self.config.clone(), last_sequence: 0, last_timestamp: 0, - buffer: Buffer::new( - self.buffer.stream.clone(), - self.buffer.topic.clone(), - self.compression, - ), - metrics: StreamMetrics::new(&self.name, self.max_buffer_size), + buffer: Buffer::new(self.buffer.stream_name.clone(), self.buffer.stream_config.clone()), + metrics: StreamMetrics::new(&self.name, self.config.buf_size), tx: self.tx.clone(), - compression: self.compression, } } } diff --git a/uplink/src/base/bridge/streams.rs b/uplink/src/base/bridge/streams.rs index 158f3870..7b72d0d4 100644 --- a/uplink/src/base/bridge/streams.rs +++ b/uplink/src/base/bridge/streams.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use flume::Sender; use log::{error, info, trace}; -use super::stream::{self, StreamStatus, MAX_BUFFER_SIZE}; +use super::stream::{self, StreamStatus}; use super::{Point, StreamMetrics}; use crate::base::StreamConfig; use crate::{Config, Package, Stream}; @@ -30,7 +30,7 @@ impl Streams { pub fn config_streams(&mut self, streams_config: HashMap) { for (name, stream) in streams_config { - let stream = Stream::with_config(&name, &stream, self.data_tx.clone()); + let stream = Stream::new(&name, stream, self.data_tx.clone()); self.map.insert(name.to_owned(), stream); } } @@ -50,7 +50,6 @@ impl Streams { &stream_name, &self.config.project_id, &self.config.device_id, - MAX_BUFFER_SIZE, self.data_tx.clone(), ); @@ -58,7 +57,7 @@ impl Streams { } }; - let max_stream_size = stream.max_buffer_size; + let max_stream_size = stream.config.buf_size; let state = match stream.fill(data).await { Ok(s) => s, Err(e) => { diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index c78d1e22..1b012dc0 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -1,4 +1,5 @@ use std::env::current_dir; +use std::hash::Hash; use std::path::PathBuf; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, fmt::Debug}; @@ -59,7 +60,7 @@ pub fn clock() -> u128 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() } -#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default)] +#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, Hash, PartialEq, Eq)] pub enum Compression { #[default] Disabled, @@ -67,7 +68,7 @@ pub enum Compression { } #[serde_as] -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Eq)] pub struct StreamConfig { pub topic: String, #[serde(default = "max_buf_size")] @@ -95,7 +96,19 @@ impl Default for StreamConfig { } } -#[derive(Debug, Clone, Deserialize)] +impl Hash for StreamConfig { + fn hash(&self, state: &mut H) { + self.topic.hash(state) + } +} + +impl PartialEq for StreamConfig { + fn eq(&self, other: &Self) -> bool { + self.topic == other.topic + } +} + +#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)] pub struct Persistence { #[serde(default = "default_file_size")] pub max_file_size: usize, diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index aedb4ff2..fe489b81 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -17,22 +17,31 @@ use crate::base::Compression; use crate::{Config, Package}; pub use metrics::SerializerMetrics; -use super::default_file_size; +use super::{default_file_size, StreamConfig}; const METRICS_INTERVAL: Duration = Duration::from_secs(10); #[derive(thiserror::Error, Debug)] pub enum MqttError { #[error("SendError(..)")] - Send(Request), + Send(Arc, Request), #[error("TrySendError(..)")] TrySend(Request), } +impl From<(Arc, ClientError)> for MqttError { + fn from((stream, e): (Arc, ClientError)) -> Self { + match e { + ClientError::Request(r) => MqttError::Send(stream, r), + ClientError::TryRequest(r) => MqttError::TrySend(r), + } + } +} + impl From for MqttError { fn from(e: ClientError) -> Self { match e { - ClientError::Request(r) => MqttError::Send(r), + ClientError::Request(r) => MqttError::Send(Arc::new(Default::default()), r), ClientError::TryRequest(r) => MqttError::TrySend(r), } } @@ -63,24 +72,23 @@ pub enum Error { #[derive(Debug, PartialEq)] enum Status { Normal, - SlowEventloop(Publish), + SlowEventloop(Publish, Arc), EventLoopReady, - EventLoopCrash(Publish), + EventLoopCrash(Publish, Arc), } /// Description of an interface that the [`Serializer`] expects to be provided by the MQTT client to publish the serialized data with. #[async_trait::async_trait] pub trait MqttClient: Clone { /// Accept payload and resolve as an error only when the client has died(thread kill). Useful in Slow/Catchup mode. - async fn publish( + async fn publish( &self, - topic: S, + stream: Arc, qos: QoS, retain: bool, payload: V, ) -> Result<(), MqttError> where - S: Into + Send, V: Into> + Send; /// Accept payload and resolve as an error if data can't be sent over network, immediately. Useful in Normal mode. @@ -98,18 +106,17 @@ pub trait MqttClient: Clone { #[async_trait::async_trait] impl MqttClient for AsyncClient { - async fn publish( + async fn publish( &self, - topic: S, + stream: Arc, qos: QoS, retain: bool, payload: V, ) -> Result<(), MqttError> where - S: Into + Send, V: Into> + Send, { - self.publish(topic, qos, retain, payload).await?; + self.publish(&stream.topic, qos, retain, payload).await.map_err(|e| (stream, e))?; Ok(()) } @@ -130,9 +137,9 @@ impl MqttClient for AsyncClient { } struct StorageHandler { - map: HashMap, + map: HashMap, Storage>, // Stream being read from - read_stream: Option, + read_stream: Option>, } impl StorageHandler { @@ -155,26 +162,31 @@ impl StorageHandler { path.display() ); } - map.insert(stream_config.topic.clone(), storage); + map.insert(Arc::new(stream_config.clone()), storage); } Ok(Self { map, read_stream: None }) } - fn select(&mut self, topic: &str) -> &mut Storage { - self.map.entry(topic.to_owned()).or_insert_with(|| Storage::new(topic, default_file_size())) + fn select(&mut self, stream: &Arc) -> &mut Storage { + self.map + .entry(stream.to_owned()) + .or_insert_with(|| Storage::new(&stream.topic, default_file_size())) } - fn next(&mut self, metrics: &mut SerializerMetrics) -> Option<&mut Storage> { + fn next( + &mut self, + metrics: &mut SerializerMetrics, + ) -> Option<(&Arc, &mut Storage)> { let storages = self.map.iter_mut(); - for (stream_name, storage) in storages { + for (stream, storage) in storages { match (storage.reload_on_eof(), &mut self.read_stream) { // Done reading all pending files for a persisted stream (Ok(true), Some(curr_stream)) => { - if curr_stream == stream_name { + if curr_stream == stream { self.read_stream.take(); - debug!("Completed reading from: {stream_name}"); + debug!("Completed reading from: {}", stream.topic); } continue; @@ -183,12 +195,12 @@ impl StorageHandler { (Ok(true), _) => continue, // Reading from a newly loaded non-empty persisted stream (Ok(false), None) => { - debug!("Reading from: {stream_name}"); - self.read_stream = Some(stream_name.to_owned()); - return Some(storage); + debug!("Reading from: {}", stream.topic); + self.read_stream = Some(stream.to_owned()); + return Some((stream, storage)); } // Continuing to read from persisted stream loaded earlier - (Ok(false), _) => return Some(storage), + (Ok(false), _) => return Some((stream, storage)), // Reload again on encountering a corrupted file (Err(e), _) => { metrics.increment_errors(); @@ -277,8 +289,12 @@ impl Serializer { } /// Write all data received, from here-on, to disk only. - async fn crash(&mut self, publish: Publish) -> Result { - let storage = self.storage_handler.select(&publish.topic); + async fn crash( + &mut self, + publish: Publish, + stream: Arc, + ) -> Result { + let storage = self.storage_handler.select(&stream); // Write failed publish to disk first, metrics don't matter match write_to_disk(publish, storage) { Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), @@ -290,7 +306,7 @@ impl Serializer { // Collect next data packet and write to disk let data = self.collector_rx.recv_async().await?; let publish = construct_publish(data)?; - let storage = self.storage_handler.select(&publish.topic); + let storage = self.storage_handler.select(&stream); match write_to_disk(publish, storage) { Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), Ok(_) => {} @@ -301,7 +317,7 @@ impl Serializer { /// Write new data to disk until back pressure due to slow n/w is resolved // TODO: Handle errors. Don't return errors - async fn slow(&mut self, publish: Publish) -> Result { + async fn slow(&mut self, publish: Publish, stream: Arc) -> Result { let mut interval = interval(METRICS_INTERVAL); // Reactlabs setup processes logs generated by uplink info!("Switching to slow eventloop mode!!"); @@ -310,15 +326,16 @@ impl Serializer { // Note: self.client.publish() is executing code before await point // in publish method every time. Verify this behaviour later let payload = Bytes::copy_from_slice(&publish.payload[..]); - let publish = send_publish(self.client.clone(), publish.topic, payload); + let publish = send_publish(self.client.clone(), publish.topic, payload, stream); tokio::pin!(publish); let v: Result = loop { select! { data = self.collector_rx.recv_async() => { let data = data?; + let stream = data.stream_config(); let publish = construct_publish(data)?; - let storage = self.storage_handler.select(&publish.topic); + let storage = self.storage_handler.select(&stream); match write_to_disk(publish, storage) { Ok(Some(deleted)) => { debug!("Lost segment = {deleted}"); @@ -338,8 +355,8 @@ impl Serializer { Ok(_) => { break Ok(Status::EventLoopReady) } - Err(MqttError::Send(Request::Publish(publish))) => { - break Ok(Status::EventLoopCrash(publish)); + Err(MqttError::Send(stream, Request::Publish(publish))) => { + break Ok(Status::EventLoopCrash(publish, stream)); }, Err(e) => { unreachable!("Unexpected error: {}", e); @@ -374,7 +391,7 @@ impl Serializer { let max_packet_size = self.config.mqtt.max_packet_size; let client = self.client.clone(); - let storage = match self.storage_handler.next(&mut self.metrics) { + let (stream, storage) = match self.storage_handler.next(&mut self.metrics) { Some(s) => s, _ => return Ok(Status::Normal), }; @@ -397,15 +414,16 @@ impl Serializer { }; let mut last_publish_payload_size = publish.payload.len(); - let send = send_publish(client, publish.topic, publish.payload); + let send = send_publish(client, publish.topic, publish.payload, stream.clone()); tokio::pin!(send); let v: Result = loop { select! { data = self.collector_rx.recv_async() => { let data = data?; + let stream = data.stream_config(); let publish = construct_publish(data)?; - let storage = self.storage_handler.select(&publish.topic); + let storage = self.storage_handler.select(&stream); match write_to_disk(publish, storage) { Ok(Some(deleted)) => { debug!("Lost segment = {deleted}"); @@ -427,11 +445,11 @@ impl Serializer { // indefinitely write to disk to not loose data let client = match o { Ok(c) => c, - Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish)), + Err(MqttError::Send(stream, Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish, stream)), Err(e) => unreachable!("Unexpected error: {}", e), }; - let storage = match self.storage_handler.next(&mut self.metrics) { + let (stream, storage) = match self.storage_handler.next(&mut self.metrics) { Some(s) => s, _ => return Ok(Status::Normal), }; @@ -449,7 +467,7 @@ impl Serializer { let payload = publish.payload; last_publish_payload_size = payload.len(); - send.set(send_publish(client, publish.topic, payload)); + send.set(send_publish(client, publish.topic, payload, stream.clone())); } // On a regular interval, forwards metrics information to network _ = interval.tick() => { @@ -477,16 +495,17 @@ impl Serializer { select! { data = self.collector_rx.recv_async() => { let data = data?; + let stream = data.stream_config(); let publish = construct_publish(data)?; let payload_size = publish.payload.len(); debug!("publishing on {} with size = {}", publish.topic, payload_size); - match self.client.try_publish(publish.topic, QoS::AtLeastOnce, false, publish.payload) { + match self.client.try_publish(&stream.topic, QoS::AtLeastOnce, false, publish.payload) { Ok(_) => { self.metrics.add_batch(); self.metrics.add_sent_size(payload_size); continue; } - Err(MqttError::TrySend(Request::Publish(publish))) => return Ok(Status::SlowEventloop(publish)), + Err(MqttError::TrySend(Request::Publish(publish))) => return Ok(Status::SlowEventloop(publish, stream)), Err(e) => unreachable!("Unexpected error: {}", e), } @@ -511,9 +530,9 @@ impl Serializer { loop { let next_status = match status { Status::Normal => self.normal().await?, - Status::SlowEventloop(publish) => self.slow(publish).await?, + Status::SlowEventloop(publish, stream) => self.slow(publish, stream).await?, Status::EventLoopReady => self.catchup().await?, - Status::EventLoopCrash(publish) => self.crash(publish).await?, + Status::EventLoopCrash(publish, stream) => self.crash(publish, stream).await?, }; status = next_status; @@ -525,9 +544,10 @@ async fn send_publish( client: C, topic: String, payload: Bytes, + stream: Arc, ) -> Result { debug!("publishing on {topic} with size = {}", payload.len()); - client.publish(topic, QoS::AtLeastOnce, false, payload).await?; + client.publish(stream, QoS::AtLeastOnce, false, payload).await?; Ok(client) } @@ -541,15 +561,16 @@ fn lz4_compress(payload: &mut Vec) -> Result<(), Error> { // Constructs a [Publish] packet given a [Package] element. Updates stream metrics as necessary. fn construct_publish(data: Box) -> Result { - let stream = data.stream().as_ref().to_owned(); + let stream_name = data.stream_name().as_ref().to_owned(); + let stream_config = data.stream_config(); let point_count = data.len(); let batch_latency = data.latency(); - trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}"); + trace!("Data received on stream: {stream_name}; message count = {point_count}; batching latency = {batch_latency}"); - let topic = data.topic().to_string(); + let topic = stream_config.topic.clone(); let mut payload = data.serialize()?; - if let Compression::Lz4 = data.compression() { + if let Compression::Lz4 = stream_config.compression { lz4_compress(&mut payload)?; } @@ -718,21 +739,23 @@ mod test { #[async_trait::async_trait] impl MqttClient for MockClient { - async fn publish( + async fn publish( &self, - topic: S, + stream: Arc, qos: QoS, retain: bool, payload: V, ) -> Result<(), MqttError> where - S: Into + Send, V: Into> + Send, { - let mut publish = Publish::new(topic, qos, payload); + let mut publish = Publish::new(&stream.topic, qos, payload); publish.retain = retain; let publish = Request::Publish(publish); - self.net_tx.send_async(publish).await.map_err(|e| MqttError::Send(e.into_inner()))?; + self.net_tx + .send_async(publish) + .await + .map_err(|e| MqttError::Send(stream, e.into_inner()))?; Ok(()) } @@ -805,7 +828,15 @@ mod test { impl MockCollector { fn new(data_tx: flume::Sender>) -> MockCollector { MockCollector { - stream: Stream::new("hello", "hello/world", 1, data_tx, Compression::Disabled), + stream: Stream::new( + "hello", + StreamConfig { + topic: "hello/world".to_string(), + buf_size: 1, + ..Default::default() + }, + data_tx, + ), } } @@ -842,7 +873,7 @@ mod test { }); match tokio::runtime::Runtime::new().unwrap().block_on(serializer.normal()).unwrap() { - Status::SlowEventloop(Publish { qos: QoS::AtLeastOnce, topic, payload, .. }) => { + Status::SlowEventloop(Publish { qos: QoS::AtLeastOnce, topic, payload, .. }, _) => { assert_eq!(topic, "hello/world"); let recvd: Value = serde_json::from_slice(&payload).unwrap(); let obj = &recvd.as_array().unwrap()[0]; @@ -902,8 +933,10 @@ mod test { QoS::AtLeastOnce, "[{{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}}]".as_bytes(), ); - let status = - tokio::runtime::Runtime::new().unwrap().block_on(serializer.slow(publish)).unwrap(); + let status = tokio::runtime::Runtime::new() + .unwrap() + .block_on(serializer.slow(publish, Arc::new(Default::default()))) + .unwrap(); assert_eq!(status, Status::EventLoopReady); } @@ -929,8 +962,15 @@ mod test { "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - match tokio::runtime::Runtime::new().unwrap().block_on(serializer.slow(publish)).unwrap() { - Status::EventLoopCrash(Publish { qos: QoS::AtLeastOnce, topic, payload, .. }) => { + match tokio::runtime::Runtime::new() + .unwrap() + .block_on(serializer.slow( + publish, + Arc::new(StreamConfig { topic: "hello/world".to_string(), ..Default::default() }), + )) + .unwrap() + { + Status::EventLoopCrash(Publish { qos: QoS::AtLeastOnce, topic, payload, .. }, _) => { assert_eq!(topic, "hello/world"); let recvd = std::str::from_utf8(&payload).unwrap(); assert_eq!(recvd, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); @@ -961,7 +1001,7 @@ mod test { let mut storage = serializer .storage_handler .map - .entry("hello/world".to_string()) + .entry(Arc::new(Default::default())) .or_insert(Storage::new("hello/world", 1024)); let mut collector = MockCollector::new(data_tx); @@ -1013,7 +1053,10 @@ mod test { let mut storage = serializer .storage_handler .map - .entry("hello/world".to_string()) + .entry(Arc::new(StreamConfig { + topic: "hello/world".to_string(), + ..Default::default() + })) .or_insert(Storage::new("hello/world", 1024)); let mut collector = MockCollector::new(data_tx); @@ -1034,7 +1077,7 @@ mod test { write_to_disk(publish.clone(), &mut storage).unwrap(); match tokio::runtime::Runtime::new().unwrap().block_on(serializer.catchup()).unwrap() { - Status::EventLoopCrash(Publish { topic, payload, .. }) => { + Status::EventLoopCrash(Publish { topic, payload, .. }, _) => { assert_eq!(topic, "hello/world"); let recvd = std::str::from_utf8(&payload).unwrap(); assert_eq!(recvd, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");