Skip to content

Commit

Permalink
style: batch_size clarification
Browse files Browse the repository at this point in the history
`buf_size` confused users, uses alias for backward compatability
  • Loading branch information
Devdutt Shenoi committed Feb 7, 2024
1 parent f601a9b commit 2fb197d
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 36 deletions.
10 changes: 5 additions & 5 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# be collected, batched and forwarded to serializer to then be published onto platform.
#
# Required Parameters
# - buf-size: Number of data points that shall be included in each Publish
# - batch-size: Number of data points that shall be included in each Publish
# - topic(optional): topic-filter to which data shall be published. If left
# unconfigured, stream will be created dynamically.
# - flush-period(optional): Duration in seconds after a data point enters the stream
Expand All @@ -79,7 +79,7 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# - priority(optional, u8): Higher prioirity streams get to push their data
# onto the network first.
#
# In the following config for the device_shadow stream we set buf_size to 1 and mark
# In the following config for the device_shadow stream we set batch_size to 1 and mark
# it as non-persistent. streams are internally constructed as a map of Name -> Config
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
Expand All @@ -89,7 +89,7 @@ priority = 75
# Example using compression
[streams.imu]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray/lz4"
buf_size = 100
batch_size = 100
compression = "Lz4"
priority = 50

Expand All @@ -107,15 +107,15 @@ priority = 50
# configuration, we use transient(in-memory) storage to handle network downtime only.
[streams.gps]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/gps/jsonarray"
buf_size = 10
batch_size = 10
persistence = { max_file_size = 1048576, max_file_count = 10 }

# NOTE: While it is possible to configure persistence to be disabled, including only max_file_count
# without path causes non-persistence. Configuring only the persistence path is allowed and the other
# config values will be default. Configuring only max_file_size is also allowed, without persistance.
[streams.motor]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/motor/jsonarray/lz4"
buf_size = 50
batch_size = 50
compression = "Lz4"
persistence = { max_file_count = 3 }

Expand Down
2 changes: 1 addition & 1 deletion examples/android/etc/uplink.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ path = "/data/local/tmp/uplink/downloader"

[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
buf_size = 128
batch_size = 128
flush_period = 30

[logging]
Expand Down
2 changes: 1 addition & 1 deletion qa-scripts/streams.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ flush_period = 2
[streams.imu]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray"
buf_size = 10
batch_size = 10
EOF
)" > devices/streams.toml
docker cp devices/streams.toml simulator:/usr/share/bytebeam/uplink/devices/streams.toml
Expand Down
4 changes: 2 additions & 2 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ impl ActionsBridge {

let mut streams_config = HashMap::new();
let mut action_status = config.action_status.clone();
if action_status.buf_size > 1 {
if action_status.batch_size > 1 {
warn!("Buffer size of `action_status` stream restricted to 1")
}
action_status.buf_size = 1;
action_status.batch_size = 1;
streams_config.insert("action_status".to_owned(), action_status);
let mut streams = Streams::new(config.clone(), package_tx, metrics_tx);
streams.config_streams(streams_config);
Expand Down
16 changes: 8 additions & 8 deletions uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub enum Error {
Send(#[from] SendError<Box<dyn Package>>),
}

pub const MAX_BUFFER_SIZE: usize = 100;
pub const MAX_BATCH_SIZE: usize = 100;

#[derive(Debug)]
pub struct Stream<T> {
Expand All @@ -48,7 +48,7 @@ where
let name = Arc::new(stream_name.into());
let config = Arc::new(stream_config);
let buffer = Buffer::new(name.clone(), config.clone());
let metrics = StreamMetrics::new(&name, config.buf_size);
let metrics = StreamMetrics::new(&name, config.batch_size);

Stream { name, config, last_sequence: 0, last_timestamp: 0, buffer, tx, metrics }
}
Expand Down Expand Up @@ -94,8 +94,8 @@ where
self.last_sequence = current_sequence;
self.last_timestamp = current_timestamp;

// if max_buffer_size is breached, flush
let buf = if self.buffer.buffer.len() >= self.config.buf_size {
// if max_bATCH_size is breached, flush
let buf = if self.buffer.buffer.len() >= self.config.batch_size {
self.metrics.add_batch();
Some(self.take_buffer())
} else {
Expand Down Expand Up @@ -134,7 +134,7 @@ where
self.len() == 0
}

/// Fill buffer with data and trigger async channel send on breaching max_buf_size.
/// Fill buffer with data and trigger async channel send on breaching max_batch_size.
/// Returns [`StreamStatus`].
pub async fn fill(&mut self, data: T) -> Result<StreamStatus, Error> {
if let Some(buf) = self.add(data)? {
Expand All @@ -151,7 +151,7 @@ where
}

#[cfg(test)]
/// Push data into buffer and trigger sync channel send on max_buf_size.
/// Push data into buffer and trigger sync channel send on max_batch_size.
/// Returns [`StreamStatus`].
pub fn push(&mut self, data: T) -> Result<StreamStatus, Error> {
if let Some(buf) = self.add(data)? {
Expand Down Expand Up @@ -188,7 +188,7 @@ pub struct Buffer<T> {
impl<T> Buffer<T> {
pub fn new(stream_name: Arc<String>, stream_config: Arc<StreamConfig>) -> Buffer<T> {
Buffer {
buffer: Vec::with_capacity(stream_config.buf_size),
buffer: Vec::with_capacity(stream_config.batch_size),
stream_name,
stream_config,
anomalies: String::with_capacity(100),
Expand Down Expand Up @@ -263,7 +263,7 @@ impl<T> Clone for Stream<T> {
last_sequence: 0,
last_timestamp: 0,
buffer: Buffer::new(self.buffer.stream_name.clone(), self.buffer.stream_config.clone()),
metrics: StreamMetrics::new(&self.name, self.config.buf_size),
metrics: StreamMetrics::new(&self.name, self.config.batch_size),
tx: self.tx.clone(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<T: Point> Streams<T> {
}
};

let max_stream_size = stream.config.buf_size;
let max_stream_size = stream.config.batch_size;
let state = match stream.fill(data).await {
Ok(s) => s,
Err(e) => {
Expand Down
12 changes: 6 additions & 6 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::collector::journalctl::JournalCtlConfig;
#[cfg(target_os = "android")]
use crate::collector::logcat::LogcatConfig;

use self::bridge::stream::MAX_BUFFER_SIZE;
use self::bridge::stream::MAX_BATCH_SIZE;
use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx};
use self::mqtt::CtrlTx as MqttCtrlTx;
use self::serializer::CtrlTx as SerializerCtrlTx;
Expand All @@ -32,8 +32,8 @@ fn default_timeout() -> Duration {
}

#[inline]
fn max_buf_size() -> usize {
MAX_BUFFER_SIZE
fn max_batch_size() -> usize {
MAX_BATCH_SIZE
}

fn default_file_size() -> usize {
Expand Down Expand Up @@ -75,8 +75,8 @@ pub enum Compression {
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct StreamConfig {
pub topic: String,
#[serde(default = "max_buf_size")]
pub buf_size: usize,
#[serde(default = "max_batch_size", alias = "buf_size")]
pub batch_size: usize,
#[serde(default = "default_timeout")]
#[serde_as(as = "DurationSeconds<u64>")]
/// Duration(in seconds) that bridge collector waits from
Expand All @@ -94,7 +94,7 @@ impl Default for StreamConfig {
fn default() -> Self {
Self {
topic: "".to_string(),
buf_size: MAX_BUFFER_SIZE,
batch_size: MAX_BATCH_SIZE,
flush_period: default_timeout(),
compression: Compression::Disabled,
persistence: Persistence::default(),
Expand Down
10 changes: 5 additions & 5 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ mod test {

let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
std::thread::spawn(move || {
Expand Down Expand Up @@ -1065,7 +1065,7 @@ mod test {

let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Faster collector, send data every 5s
Expand Down Expand Up @@ -1097,7 +1097,7 @@ mod test {

let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Faster collector, send data every 5s
Expand Down Expand Up @@ -1158,7 +1158,7 @@ mod test {

let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Run a collector practically once
Expand Down Expand Up @@ -1217,7 +1217,7 @@ mod test {

let (stream_name, stream_config) = (
"hello",
StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() },
StreamConfig { topic: "hello/world".to_string(), batch_size: 1, ..Default::default() },
);
let mut collector = MockCollector::new(stream_name, stream_config, data_tx);
// Run a collector
Expand Down
14 changes: 7 additions & 7 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub mod base;
pub mod collector;

pub mod config {
use crate::base::{bridge::stream::MAX_BUFFER_SIZE, StreamConfig};
use crate::base::{bridge::stream::MAX_BATCH_SIZE, StreamConfig};
pub use crate::base::{Config, Persistence, Stats};
use config::{Environment, File, FileFormat};
use std::fs;
Expand Down Expand Up @@ -128,7 +128,7 @@ pub mod config {
[action_status]
topic = "/tenants/{tenant_id}/devices/{device_id}/action/status"
buf_size = 1
batch_size = 1
flush_period = 2
priority = 255 # highest priority for quick delivery of action status info to platform
Expand All @@ -138,7 +138,7 @@ pub mod config {
[streams.logs]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray"
buf_size = 32
batch_size = 32
[system_stats]
enabled = true
Expand Down Expand Up @@ -205,24 +205,24 @@ pub mod config {
topic: format!(
"/tenants/{tenant_id}/devices/{device_id}/events/{stream_name}/jsonarray"
),
buf_size: config.system_stats.stream_size.unwrap_or(MAX_BUFFER_SIZE),
batch_size: config.system_stats.stream_size.unwrap_or(MAX_BATCH_SIZE),
..Default::default()
};
config.streams.insert(stream_name.to_owned(), stream_config);
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
if let Some(buf_size) = config.logging.as_ref().and_then(|c| c.stream_size) {
if let Some(batch_size) = config.logging.as_ref().and_then(|c| c.stream_size) {
let stream_config =
config.streams.entry("logs".to_string()).or_insert_with(|| StreamConfig {
topic: format!(
"/tenants/{tenant_id}/devices/{device_id}/events/logs/jsonarray"
),
buf_size: 32,
batch_size: 32,
..Default::default()
});
stream_config.buf_size = buf_size;
stream_config.batch_size = batch_size;
}

let action_topic_template = "/tenants/{tenant_id}/devices/{device_id}/actions";
Expand Down

0 comments on commit 2fb197d

Please sign in to comment.