diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 527763a0..5508478c 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -173,7 +173,8 @@ pub struct InstallerConfig { #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct StreamMetricsConfig { pub enabled: bool, - pub topic: String, + pub bridge_topic: String, + pub serializer_topic: String, pub blacklist: Vec, #[serde_as(as = "DurationSeconds")] pub timeout: Duration, diff --git a/uplink/src/base/monitor/mod.rs b/uplink/src/base/monitor/mod.rs index 662402a9..d3bf7531 100644 --- a/uplink/src/base/monitor/mod.rs +++ b/uplink/src/base/monitor/mod.rs @@ -5,9 +5,9 @@ use flume::{Receiver, RecvError}; use rumqttc::{AsyncClient, ClientError, QoS, Request}; use tokio::select; -use crate::base::bridge::StreamMetrics; use crate::Config; +use super::bridge::StreamMetrics; use super::mqtt::MqttMetrics; use super::serializer::SerializerMetrics; @@ -38,8 +38,10 @@ impl Monitor { pub async fn start(&self) -> Result<(), Error> { let stream_metrics_config = self.config.stream_metrics.clone(); - let stream_metrics_topic = stream_metrics_config.topic; - let mut stream_metrics = Vec::with_capacity(10); + let bridge_stream_metrics_topic = stream_metrics_config.bridge_topic; + let mut bridge_stream_metrics = Vec::with_capacity(10); + let serializer_stream_metrics_topic = stream_metrics_config.serializer_topic; + let mut serializer_stream_metrics = Vec::with_capacity(10); let serializer_metrics_config = self.config.serializer_metrics.clone(); let serializer_metrics_topic = serializer_metrics_config.topic; @@ -58,18 +60,31 @@ impl Monitor { continue; } - stream_metrics.push(o); - let v = serde_json::to_string(&stream_metrics).unwrap(); + bridge_stream_metrics.push(o); + let v = serde_json::to_string(&bridge_stream_metrics).unwrap(); - stream_metrics.clear(); - self.client.publish(&stream_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap(); + bridge_stream_metrics.clear(); + self.client.publish(&bridge_stream_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap(); } o = self.serializer_metrics_rx.recv_async() => { let o = o?; - serializer_metrics.push(o); - let v = serde_json::to_string(&serializer_metrics).unwrap(); - serializer_metrics.clear(); - self.client.publish(&serializer_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap(); + match o { + SerializerMetrics::Main(o) => { + serializer_metrics.push(o); + let v = serde_json::to_string(&serializer_metrics).unwrap(); + serializer_metrics.clear(); + self.client.publish(&serializer_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap(); + } + SerializerMetrics::Stream(o) => { + if stream_metrics_config.blacklist.contains(&o.stream) { + continue; + } + serializer_stream_metrics.push(o); + let v = serde_json::to_string(&serializer_stream_metrics).unwrap(); + serializer_stream_metrics.clear(); + self.client.publish(&serializer_stream_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap(); + } + } } o = self.mqtt_metrics_rx.recv_async() => { let o = o?; diff --git a/uplink/src/base/serializer/metrics.rs b/uplink/src/base/serializer/metrics.rs index 68e1fc43..a0380230 100644 --- a/uplink/src/base/serializer/metrics.rs +++ b/uplink/src/base/serializer/metrics.rs @@ -1,10 +1,13 @@ +use std::time::Duration; + use serde::Serialize; +use serde_with::{serde_as, DurationSeconds}; use crate::base::clock; /// Metrics information relating to the operation of the `Serializer`, all values are reset on metrics flush #[derive(Debug, Serialize, Clone)] -pub struct SerializerMetrics { +pub struct Metrics { timestamp: u128, sequence: u32, /// One of **Catchup**, **Normal**, **Slow** or **Crash** @@ -27,9 +30,9 @@ pub struct SerializerMetrics { pub sent_size: usize, } -impl SerializerMetrics { +impl Metrics { pub fn new(mode: &str) -> Self { - SerializerMetrics { + Metrics { timestamp: clock(), sequence: 1, mode: mode.to_owned(), @@ -99,3 +102,81 @@ impl SerializerMetrics { self.errors = 0; } } + +#[serde_as] +#[derive(Debug, Serialize, Clone)] +pub struct StreamMetrics { + pub timestamp: u128, + pub sequence: u32, + pub stream: String, + pub serialized_data_size: usize, + pub compressed_data_size: usize, + #[serde(skip)] + pub serializations: u32, + #[serde_as(as = "DurationSeconds")] + pub total_serialization_time: Duration, + #[serde_as(as = "DurationSeconds")] + pub avg_serialization_time: Duration, + #[serde(skip)] + pub compressions: u32, + #[serde_as(as = "DurationSeconds")] + pub total_compression_time: Duration, + #[serde_as(as = "DurationSeconds")] + pub avg_compression_time: Duration, +} + +impl StreamMetrics { + pub fn new(name: &str) -> Self { + StreamMetrics { + stream: name.to_owned(), + timestamp: clock(), + sequence: 1, + serialized_data_size: 0, + compressed_data_size: 0, + serializations: 0, + total_serialization_time: Duration::ZERO, + avg_serialization_time: Duration::ZERO, + compressions: 0, + total_compression_time: Duration::ZERO, + avg_compression_time: Duration::ZERO, + } + } + + pub fn add_serialized_sizes(&mut self, data_size: usize, compressed_data_size: Option) { + self.serialized_data_size += data_size; + self.compressed_data_size += compressed_data_size.unwrap_or(data_size); + } + + pub fn add_serialization_time(&mut self, serialization_time: Duration) { + self.serializations += 1; + self.total_serialization_time += serialization_time; + } + + pub fn add_compression_time(&mut self, compression_time: Duration) { + self.compressions += 1; + self.total_compression_time += compression_time; + } + + // Should be called before serializing metrics to ensure averages are computed. + // Averages aren't calculated for ever `add_*` call to save on costs. + pub fn prepare_snapshot(&mut self) { + self.avg_serialization_time = self + .total_serialization_time + .checked_div(self.serializations) + .unwrap_or(Duration::ZERO); + self.avg_compression_time = + self.total_compression_time.checked_div(self.compressions).unwrap_or(Duration::ZERO); + } + + pub fn prepare_next(&mut self) { + self.timestamp = clock(); + self.sequence += 1; + self.serialized_data_size = 0; + self.compressed_data_size = 0; + } +} + +pub enum SerializerMetrics { + Main(Box), + Stream(Box), +} diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 18145dc1..35ac7c8f 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -16,7 +16,7 @@ use tokio::{select, time::interval}; use crate::base::Compression; use crate::{Config, Package}; -pub use metrics::SerializerMetrics; +pub use metrics::{Metrics, SerializerMetrics, StreamMetrics}; use super::{default_file_size, StreamConfig}; @@ -171,10 +171,7 @@ impl StorageHandler { .or_insert_with(|| Storage::new(&stream.topic, default_file_size())) } - fn next( - &mut self, - metrics: &mut SerializerMetrics, - ) -> Option<(&Arc, &mut Storage)> { + fn next(&mut self, metrics: &mut Metrics) -> Option<(&Arc, &mut Storage)> { let storages = self.map.iter_mut(); for (stream, storage) in storages { @@ -274,9 +271,10 @@ pub struct Serializer { collector_rx: Receiver>, client: C, storage_handler: StorageHandler, - metrics: SerializerMetrics, + metrics: Metrics, metrics_tx: Sender, pending_metrics: VecDeque, + stream_metrics: HashMap, /// Control handles ctrl_rx: Receiver, ctrl_tx: Sender, @@ -299,7 +297,8 @@ impl Serializer { collector_rx, client, storage_handler, - metrics: SerializerMetrics::new("catchup"), + metrics: Metrics::new("catchup"), + stream_metrics: HashMap::new(), metrics_tx, pending_metrics: VecDeque::with_capacity(3), ctrl_tx, @@ -325,7 +324,7 @@ impl Serializer { return Ok(()); }; let stream_config = data.stream_config(); - let publish = construct_publish(data)?; + let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream_config); match write_to_disk(publish, storage) { Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), @@ -352,7 +351,7 @@ impl Serializer { loop { // Collect next data packet and write to disk let data = self.collector_rx.recv_async().await?; - let publish = construct_publish(data)?; + let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream); match write_to_disk(publish, storage) { Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), @@ -381,7 +380,7 @@ impl Serializer { data = self.collector_rx.recv_async() => { let data = data?; let stream = data.stream_config(); - let publish = construct_publish(data)?; + let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream); match write_to_disk(publish, storage) { Ok(Some(deleted)) => { @@ -410,7 +409,7 @@ impl Serializer { } }, _ = interval.tick() => { - check_metrics(&mut self.metrics, &self.storage_handler); + check_metrics(&mut self.metrics, &mut self.stream_metrics, &self.storage_handler); } // Transition into crash mode when uplink is shutting down Ok(SerializerShutdown) = self.ctrl_rx.recv_async() => { @@ -422,6 +421,7 @@ impl Serializer { save_and_prepare_next_metrics( &mut self.pending_metrics, &mut self.metrics, + &mut self.stream_metrics, &self.storage_handler, ); let v = v?; @@ -458,6 +458,7 @@ impl Serializer { save_and_prepare_next_metrics( &mut self.pending_metrics, &mut self.metrics, + &mut self.stream_metrics, &self.storage_handler, ); return Ok(Status::Normal); @@ -474,7 +475,7 @@ impl Serializer { data = self.collector_rx.recv_async() => { let data = data?; let stream = data.stream_config(); - let publish = construct_publish(data)?; + let publish = construct_publish(data, &mut self.stream_metrics)?; let storage = self.storage_handler.select(&stream); match write_to_disk(publish, storage) { Ok(Some(deleted)) => { @@ -536,6 +537,7 @@ impl Serializer { save_and_prepare_next_metrics( &mut self.pending_metrics, &mut self.metrics, + &mut self.stream_metrics, &self.storage_handler, ); @@ -553,7 +555,7 @@ impl Serializer { data = self.collector_rx.recv_async() => { let data = data?; let stream = data.stream_config(); - let publish = construct_publish(data)?; + let publish = construct_publish(data, &mut self.stream_metrics)?; let payload_size = publish.payload.len(); debug!("publishing on {} with size = {}", publish.topic, payload_size); match self.client.try_publish(&stream.topic, QoS::AtLeastOnce, false, publish.payload) { @@ -601,7 +603,7 @@ impl Serializer { } self.shutdown()?; - + info!("Serializer has handled all pending packets, shutting down"); Ok(()) @@ -627,7 +629,10 @@ fn lz4_compress(payload: &mut Vec) -> Result<(), Error> { } // Constructs a [Publish] packet given a [Package] element. Updates stream metrics as necessary. -fn construct_publish(data: Box) -> Result { +fn construct_publish( + data: Box, + stream_metrics: &mut HashMap, +) -> Result { let stream_name = data.stream_name().as_ref().to_owned(); let stream_config = data.stream_config(); let point_count = data.len(); @@ -635,12 +640,30 @@ fn construct_publish(data: Box) -> Result { trace!("Data received on stream: {stream_name}; message count = {point_count}; batching latency = {batch_latency}"); let topic = stream_config.topic.clone(); + + let metrics = stream_metrics + .entry(stream_name.clone()) + .or_insert_with(|| StreamMetrics::new(&stream_name)); + + let serialization_start = Instant::now(); let mut payload = data.serialize()?; + let serialization_time = serialization_start.elapsed(); + metrics.add_serialization_time(serialization_time); + + let data_size = payload.len(); + let mut compressed_data_size = None; if let Compression::Lz4 = stream_config.compression { + let compression_start = Instant::now(); lz4_compress(&mut payload)?; + let compression_time = compression_start.elapsed(); + metrics.add_compression_time(compression_time); + + compressed_data_size = Some(payload.len()); } + metrics.add_serialized_sizes(data_size, compressed_data_size); + Ok(Publish::new(topic, QoS::AtLeastOnce, payload)) } @@ -661,7 +684,11 @@ fn write_to_disk( Ok(deleted) } -fn check_metrics(metrics: &mut SerializerMetrics, storage_handler: &StorageHandler) { +fn check_metrics( + metrics: &mut Metrics, + stream_metrics: &mut HashMap, + storage_handler: &StorageHandler, +) { use pretty_bytes::converter::convert; let mut inmemory_write_size = 0; let mut inmemory_read_size = 0; @@ -691,11 +718,24 @@ fn check_metrics(metrics: &mut SerializerMetrics, storage_handler: &StorageHandl convert(metrics.write_memory as f64), convert(metrics.read_memory as f64), ); + + for metrics in stream_metrics.values_mut() { + metrics.prepare_snapshot(); + info!( + "{:>17}: serialized_data_size = {} compressed_data_size = {} avg_serialization_time = {}us avg_compression_time = {}us", + metrics.stream, + convert(metrics.serialized_data_size as f64), + convert(metrics.compressed_data_size as f64), + metrics.avg_serialization_time.as_micros(), + metrics.avg_compression_time.as_micros() + ); + } } fn save_and_prepare_next_metrics( pending: &mut VecDeque, - metrics: &mut SerializerMetrics, + metrics: &mut Metrics, + stream_metrics: &mut HashMap, storage_handler: &StorageHandler, ) { let mut inmemory_write_size = 0; @@ -715,15 +755,22 @@ fn save_and_prepare_next_metrics( metrics.set_disk_files(file_count); metrics.set_disk_utilized(disk_utilized); - let m = metrics.clone(); - pending.push_back(m); + let m = Box::new(metrics.clone()); + pending.push_back(SerializerMetrics::Main(m)); metrics.prepare_next(); + + for metrics in stream_metrics.values_mut() { + metrics.prepare_snapshot(); + let m = Box::new(metrics.clone()); + pending.push_back(SerializerMetrics::Stream(m)); + metrics.prepare_next(); + } } // // Enable actual metrics timers when there is data. This method is called every minute by the bridge fn check_and_flush_metrics( pending: &mut VecDeque, - metrics: &mut SerializerMetrics, + metrics: &mut Metrics, metrics_tx: &Sender, storage_handler: &StorageHandler, ) -> Result<(), flume::TrySendError> { @@ -748,20 +795,37 @@ fn check_and_flush_metrics( // Send pending metrics. This signifies state change while let Some(metrics) = pending.get(0) { - // Always send pending metrics. They represent state changes - info!( - "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", - metrics.mode, - metrics.batches, - metrics.errors, - metrics.lost_segments, - metrics.disk_files, - convert(metrics.disk_utilized as f64), - convert(metrics.write_memory as f64), - convert(metrics.read_memory as f64), - ); - metrics_tx.try_send(metrics.clone())?; - pending.pop_front(); + match metrics { + SerializerMetrics::Main(metrics) => { + // Always send pending metrics. They represent state changes + info!( + "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", + metrics.mode, + metrics.batches, + metrics.errors, + metrics.lost_segments, + metrics.disk_files, + convert(metrics.disk_utilized as f64), + convert(metrics.write_memory as f64), + convert(metrics.read_memory as f64), + ); + metrics_tx.try_send(SerializerMetrics::Main(metrics.clone()))?; + pending.pop_front(); + } + SerializerMetrics::Stream(metrics) => { + // Always send pending metrics. They represent state changes + info!( + "{:>17}: serialized_data_size = {} compressed_data_size = {} avg_serialization_time = {}us avg_compression_time = {}us", + metrics.stream, + convert(metrics.serialized_data_size as f64), + convert(metrics.compressed_data_size as f64), + metrics.avg_serialization_time.as_micros(), + metrics.avg_compression_time.as_micros() + ); + metrics_tx.try_send(SerializerMetrics::Stream(metrics.clone()))?; + pending.pop_front(); + } + } } if metrics.batches() > 0 { @@ -777,7 +841,7 @@ fn check_and_flush_metrics( convert(metrics.read_memory as f64), ); - metrics_tx.try_send(metrics.clone())?; + metrics_tx.try_send(SerializerMetrics::Main(Box::new(metrics.clone())))?; metrics.prepare_next(); } diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index d16da57b..f87cbe76 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -112,7 +112,8 @@ pub mod config { [stream_metrics] enabled = false - topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_stream_metrics/jsonarray" + bridge_topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_stream_metrics/jsonarray" + serializer_topic = "/tenants/{tenant_id}/devices/{device_id}/events/uplink_serializer_stream_metrics/jsonarray" blacklist = [] timeout = 10 @@ -172,7 +173,12 @@ pub mod config { } replace_topic_placeholders(&mut config.action_status.topic, tenant_id, device_id); - replace_topic_placeholders(&mut config.stream_metrics.topic, tenant_id, device_id); + replace_topic_placeholders(&mut config.stream_metrics.bridge_topic, tenant_id, device_id); + replace_topic_placeholders( + &mut config.stream_metrics.serializer_topic, + tenant_id, + device_id, + ); replace_topic_placeholders(&mut config.serializer_metrics.topic, tenant_id, device_id); replace_topic_placeholders(&mut config.mqtt_metrics.topic, tenant_id, device_id);