diff --git a/configs/config.toml b/configs/config.toml index 6df7206a..d97ff2a4 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -66,7 +66,7 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"] # be collected, batched and forwarded to serializer to then be published onto platform. # # Required Parameters -# - buf-size: Number of data points that shall be included in each Publish +# - batch-size: Number of data points that shall be included in each Publish # - topic(optional): topic-filter to which data shall be published. If left # unconfigured, stream will be created dynamically. # - flush-period(optional): Duration in seconds after a data point enters the stream @@ -79,7 +79,7 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"] # - priority(optional, u8): Higher prioirity streams get to push their data # onto the network first. # -# In the following config for the device_shadow stream we set buf_size to 1 and mark +# In the following config for the device_shadow stream we set batch_size to 1 and mark # it as non-persistent. streams are internally constructed as a map of Name -> Config [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" @@ -89,7 +89,7 @@ priority = 75 # Example using compression [streams.imu] topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray/lz4" -buf_size = 100 +batch_size = 100 compression = "Lz4" priority = 50 @@ -107,7 +107,7 @@ priority = 50 # configuration, we use transient(in-memory) storage to handle network downtime only. [streams.gps] topic = "/tenants/{tenant_id}/devices/{device_id}/events/gps/jsonarray" -buf_size = 10 +batch_size = 10 persistence = { max_file_size = 1048576, max_file_count = 10 } # NOTE: While it is possible to configure persistence to be disabled, including only max_file_count @@ -115,7 +115,7 @@ persistence = { max_file_size = 1048576, max_file_count = 10 } # config values will be default. Configuring only max_file_size is also allowed, without persistance. [streams.motor] topic = "/tenants/{tenant_id}/devices/{device_id}/events/motor/jsonarray/lz4" -buf_size = 50 +batch_size = 50 compression = "Lz4" persistence = { max_file_count = 3 } diff --git a/examples/android/etc/uplink.config.toml b/examples/android/etc/uplink.config.toml index e2715a1e..c57e8557 100644 --- a/examples/android/etc/uplink.config.toml +++ b/examples/android/etc/uplink.config.toml @@ -27,7 +27,7 @@ path = "/data/local/tmp/uplink/downloader" [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" -buf_size = 128 +batch_size = 128 flush_period = 30 [logging] diff --git a/qa-scripts/streams.sh b/qa-scripts/streams.sh index 51f3e9dd..5606c267 100755 --- a/qa-scripts/streams.sh +++ b/qa-scripts/streams.sh @@ -15,7 +15,7 @@ flush_period = 2 [streams.imu] topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray" -buf_size = 10 +batch_size = 10 EOF )" > devices/streams.toml docker cp devices/streams.toml simulator:/usr/share/bytebeam/uplink/devices/streams.toml diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 7f2a753d..bf096b4c 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -77,10 +77,10 @@ impl ActionsBridge { let mut streams_config = HashMap::new(); let mut action_status = config.action_status.clone(); - if action_status.buf_size > 1 { + if action_status.batch_size > 1 { warn!("Buffer size of `action_status` stream restricted to 1") } - action_status.buf_size = 1; + action_status.batch_size = 1; streams_config.insert("action_status".to_owned(), action_status); let mut streams = Streams::new(config.clone(), package_tx, metrics_tx); streams.config_streams(streams_config); diff --git a/uplink/src/base/bridge/stream.rs b/uplink/src/base/bridge/stream.rs index e0aa1e37..64c14da9 100644 --- a/uplink/src/base/bridge/stream.rs +++ b/uplink/src/base/bridge/stream.rs @@ -22,7 +22,7 @@ pub enum Error { Send(#[from] SendError>), } -pub const MAX_BUFFER_SIZE: usize = 100; +pub const MAX_BATCH_SIZE: usize = 100; #[derive(Debug)] pub struct Stream { @@ -48,7 +48,7 @@ where let name = Arc::new(stream_name.into()); let config = Arc::new(stream_config); let buffer = Buffer::new(name.clone(), config.clone()); - let metrics = StreamMetrics::new(&name, config.buf_size); + let metrics = StreamMetrics::new(&name, config.batch_size); Stream { name, config, last_sequence: 0, last_timestamp: 0, buffer, tx, metrics } } @@ -94,8 +94,8 @@ where self.last_sequence = current_sequence; self.last_timestamp = current_timestamp; - // if max_buffer_size is breached, flush - let buf = if self.buffer.buffer.len() >= self.config.buf_size { + // if max_bATCH_size is breached, flush + let buf = if self.buffer.buffer.len() >= self.config.batch_size { self.metrics.add_batch(); Some(self.take_buffer()) } else { @@ -134,7 +134,7 @@ where self.len() == 0 } - /// Fill buffer with data and trigger async channel send on breaching max_buf_size. + /// Fill buffer with data and trigger async channel send on breaching max_batch_size. /// Returns [`StreamStatus`]. pub async fn fill(&mut self, data: T) -> Result { if let Some(buf) = self.add(data)? { @@ -151,7 +151,7 @@ where } #[cfg(test)] - /// Push data into buffer and trigger sync channel send on max_buf_size. + /// Push data into buffer and trigger sync channel send on max_batch_size. /// Returns [`StreamStatus`]. pub fn push(&mut self, data: T) -> Result { if let Some(buf) = self.add(data)? { @@ -188,7 +188,7 @@ pub struct Buffer { impl Buffer { pub fn new(stream_name: Arc, stream_config: Arc) -> Buffer { Buffer { - buffer: Vec::with_capacity(stream_config.buf_size), + buffer: Vec::with_capacity(stream_config.batch_size), stream_name, stream_config, anomalies: String::with_capacity(100), @@ -263,7 +263,7 @@ impl Clone for Stream { last_sequence: 0, last_timestamp: 0, buffer: Buffer::new(self.buffer.stream_name.clone(), self.buffer.stream_config.clone()), - metrics: StreamMetrics::new(&self.name, self.config.buf_size), + metrics: StreamMetrics::new(&self.name, self.config.batch_size), tx: self.tx.clone(), } } diff --git a/uplink/src/base/bridge/streams.rs b/uplink/src/base/bridge/streams.rs index 7b72d0d4..fc1b92c8 100644 --- a/uplink/src/base/bridge/streams.rs +++ b/uplink/src/base/bridge/streams.rs @@ -57,7 +57,7 @@ impl Streams { } }; - let max_stream_size = stream.config.buf_size; + let max_stream_size = stream.config.batch_size; let state = match stream.fill(data).await { Ok(s) => s, Err(e) => { diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 73b9422f..7f27f725 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -13,7 +13,7 @@ use crate::collector::journalctl::JournalCtlConfig; #[cfg(target_os = "android")] use crate::collector::logcat::LogcatConfig; -use self::bridge::stream::MAX_BUFFER_SIZE; +use self::bridge::stream::MAX_BATCH_SIZE; use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx}; use self::mqtt::CtrlTx as MqttCtrlTx; use self::serializer::CtrlTx as SerializerCtrlTx; @@ -32,8 +32,8 @@ fn default_timeout() -> Duration { } #[inline] -fn max_buf_size() -> usize { - MAX_BUFFER_SIZE +fn max_batch_size() -> usize { + MAX_BATCH_SIZE } fn default_file_size() -> usize { @@ -75,8 +75,8 @@ pub enum Compression { #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct StreamConfig { pub topic: String, - #[serde(default = "max_buf_size")] - pub buf_size: usize, + #[serde(default = "max_batch_size", alias = "buf_size")] + pub batch_size: usize, #[serde(default = "default_timeout")] #[serde_as(as = "DurationSeconds")] /// Duration(in seconds) that bridge collector waits from @@ -94,7 +94,7 @@ impl Default for StreamConfig { fn default() -> Self { Self { topic: "".to_string(), - buf_size: MAX_BUFFER_SIZE, + batch_size: MAX_BATCH_SIZE, flush_period: default_timeout(), compression: Compression::Disabled, persistence: Persistence::default(), diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 5ed189df..7c246429 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -1007,7 +1007,7 @@ mod test { let (stream_name, stream_config) = ( "hello", - StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() }, ); let mut collector = MockCollector::new(stream_name, stream_config, data_tx); std::thread::spawn(move || { @@ -1065,7 +1065,7 @@ mod test { let (stream_name, stream_config) = ( "hello", - StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() }, ); let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Faster collector, send data every 5s @@ -1097,7 +1097,7 @@ mod test { let (stream_name, stream_config) = ( "hello", - StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() }, ); let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Faster collector, send data every 5s @@ -1158,7 +1158,7 @@ mod test { let (stream_name, stream_config) = ( "hello", - StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() }, ); let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Run a collector practically once @@ -1217,7 +1217,7 @@ mod test { let (stream_name, stream_config) = ( "hello", - StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() }, ); let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Run a collector diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 6bfff3f5..a570dd84 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -68,7 +68,7 @@ pub mod base; pub mod collector; pub mod config { - use crate::base::{bridge::stream::MAX_BUFFER_SIZE, StreamConfig}; + use crate::base::{bridge::stream::MAX_BATCH_SIZE, StreamConfig}; pub use crate::base::{Config, Persistence, Stats}; use config::{Environment, File, FileFormat}; use std::fs; @@ -128,7 +128,7 @@ pub mod config { [action_status] topic = "/tenants/{tenant_id}/devices/{device_id}/action/status" - buf_size = 1 + batch_size = 1 flush_period = 2 priority = 255 # highest priority for quick delivery of action status info to platform @@ -138,7 +138,7 @@ pub mod config { [streams.logs] topic = "/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray" - buf_size = 32 + batch_size = 32 [system_stats] enabled = true @@ -205,7 +205,7 @@ pub mod config { topic: format!( "/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray" ), - buf_size: config.system_stats.stream_size.unwrap_or(MAX_BUFFER_SIZE), + batch_size: config.system_stats.stream_size.unwrap_or(MAX_BATCH_SIZE), ..Default::default() }; config.streams.insert(stream_name.to_owned(), stream_config); @@ -213,16 +213,16 @@ pub mod config { } #[cfg(any(target_os = "linux", target_os = "android"))] - if let Some(buf_size) = config.logging.as_ref().and_then(|c| c.stream_size) { + if let Some(batch_size) = config.logging.as_ref().and_then(|c| c.stream_size) { let stream_config = config.streams.entry("logs".to_string()).or_insert_with(|| StreamConfig { topic: format!( "/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray" ), - buf_size: 32, + batch_size: 32, ..Default::default() }); - stream_config.buf_size = buf_size; + stream_config.batch_size = batch_size; } let action_topic_template = "/tenants/{tenant_id}/devices/{device_id}/actions";