From faead2413b514e6b047223578df8ab93e4f22521 Mon Sep 17 00:00:00 2001 From: Jon Lamb Date: Tue, 28 May 2024 04:37:55 -0700 Subject: [PATCH 1/2] Move RTT reader to a separate thread --- Cargo.lock | 1 + Cargo.toml | 19 +-- src/bin/rtt_collector.rs | 303 ++++++++++++++++++++++++--------------- 3 files changed, 197 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 336d5db..6f0d6bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1518,6 +1518,7 @@ dependencies = [ "async-trait", "auxon-sdk", "byteordered", + "bytes", "clap", "ctrlc", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 560b691..2067924 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,17 +13,17 @@ exclude = ["test_system/"] name = "modality_trace_recorder_plugin" path = "src/lib.rs" -[[bin]] -name = "modality-trace-recorder-importer" -path = "src/bin/importer.rs" +#[[bin]] +#name = "modality-trace-recorder-importer" +#path = "src/bin/importer.rs" -[[bin]] -name = "modality-trace-recorder-tcp-collector" -path = "src/bin/tcp_collector.rs" +#[[bin]] +#name = "modality-trace-recorder-tcp-collector" +#path = "src/bin/tcp_collector.rs" -[[bin]] -name = "modality-trace-recorder-itm-collector" -path = "src/bin/itm_collector.rs" +#[[bin]] +#name = "modality-trace-recorder-itm-collector" +#path = "src/bin/itm_collector.rs" [[bin]] name = "modality-trace-recorder-rtt-collector" @@ -51,6 +51,7 @@ clap = { version = "4.5", features = ["derive", "env", "color"] } ctrlc = { version = "3.4", features=["termination"] } exitcode = "1" ratelimit = "0.9" +bytes = "1" human_bytes = "0.4" simple_moving_average = "1.0" auxon-sdk = { version = "1.3", features = ["modality"] } diff --git a/src/bin/rtt_collector.rs b/src/bin/rtt_collector.rs index 527c641..59ea380 100644 --- a/src/bin/rtt_collector.rs +++ b/src/bin/rtt_collector.rs @@ -1,3 +1,4 @@ +use bytes::{Bytes, BytesMut}; use clap::Parser; use human_bytes::human_bytes; use modality_trace_recorder_plugin::{ @@ -12,9 +13,10 @@ use probe_rs::{ }; use ratelimit::Ratelimiter; use simple_moving_average::{NoSumSMA, SMA}; +use std::sync::mpsc; use std::{ - fs, - io::{self, BufReader}, + collections::VecDeque, + fs, io, path::PathBuf, sync::{Arc, Mutex}, time::{Duration, Instant}, @@ -298,18 +300,11 @@ async fn do_main() -> Result<(), Box> { trc_cfg.plugin.rtt_collector.metrics = true; } - if let Some(poll_interval) = trc_cfg.plugin.rtt_collector.rtt_poll_interval { - if poll_interval.0.is_zero() { - warn!("Poll interval cannot be zero, using default 1ns"); - trc_cfg.plugin.rtt_collector.rtt_poll_interval = Some(Duration::from_nanos(1).into()); - } - } - if trc_cfg .plugin .rtt_collector .rtt_read_buffer_size - .unwrap_or(TrcRttReader::DEFAULT_RTT_BUFFER_SIZE) + .unwrap_or(RttReader::DEFAULT_RTT_BUFFER_SIZE) < 8 { return Err(format!( @@ -484,8 +479,7 @@ async fn do_main() -> Result<(), Box> { std::thread::sleep(Duration::from_millis(100)); } - debug!("Run core after breakpoint setup"); - core.run()?; + // The core is run in the reader thread } if !trc_cfg.plugin.rtt_collector.disable_control_plane { @@ -515,53 +509,91 @@ async fn do_main() -> Result<(), Box> { // Only hold onto the Core when we need to lock the debug probe driver (before each read/write) std::mem::drop(core); + let (bytes_tx, bytes_rx) = mpsc::sync_channel(RttReader::DEFAULT_CHANNEL_SIZE); + + let trc_cfg_clone = trc_cfg.clone(); + let intr_clone = intr.clone(); + let mut trc_join_handle: tokio::task::JoinHandle> = + tokio::spawn(async move { + let host_buffer_size = trc_cfg_clone + .plugin + .rtt_collector + .rtt_read_buffer_size + .unwrap_or(RttReader::DEFAULT_RTT_BUFFER_SIZE); + let mut reader = RttReceiver { + bytes_rx, + interruptor: intr_clone.clone(), + buffer: VecDeque::with_capacity(host_buffer_size), + }; + trc_reader::run(&mut reader, trc_cfg_clone, intr_clone).await?; + Ok(()) + }); + let session = Arc::new(Mutex::new(session)); let session_clone = session.clone(); let trc_cfg_clone = trc_cfg.clone(); - let mut join_handle: tokio::task::JoinHandle> = tokio::spawn(async move { - let poll_interval = trc_cfg_clone - .plugin - .rtt_collector - .rtt_poll_interval - .map(|d| d.0.into()) - .unwrap_or(TrcRttReader::DEFAULT_POLL_INTERVAL); - let buffer_size = trc_cfg_clone - .plugin - .rtt_collector - .rtt_read_buffer_size - .unwrap_or(TrcRttReader::DEFAULT_RTT_BUFFER_SIZE); - let metrics = if trc_cfg_clone.plugin.rtt_collector.metrics { - Some(Metrics::new(buffer_size)) - } else { - None - }; - let stream = TrcRttReader::new( - intr.clone(), - session_clone, - up_channel, - trc_cfg_clone.plugin.rtt_collector.core, - poll_interval, - buffer_size, - metrics, - )?; - let mut reader = BufReader::with_capacity(buffer_size, stream); - trc_reader::run(&mut reader, trc_cfg_clone, intr).await?; - Ok(()) - }); + let intr_clone = intr.clone(); + let mut rtt_join_handle: tokio::task::JoinHandle> = + tokio::task::spawn_blocking(move || { + let poll_interval = trc_cfg_clone + .plugin + .rtt_collector + .rtt_poll_interval + .map(|d| d.0.into()) + .unwrap_or(RttReader::DEFAULT_POLL_INTERVAL); + let host_buffer_size = trc_cfg_clone + .plugin + .rtt_collector + .rtt_read_buffer_size + .unwrap_or(RttReader::DEFAULT_RTT_BUFFER_SIZE); + let target_buffer_size = up_channel.buffer_size(); + let run_core_on_start = trc_cfg_clone.plugin.rtt_collector.breakpoint.is_some(); + let metrics = if trc_cfg_clone.plugin.rtt_collector.metrics { + Some(Metrics::new(host_buffer_size)) + } else { + None + }; + let reader = RttReader::new( + intr_clone, + RttReaderProbeState { + session: session_clone, + ch: up_channel, + core_index: trc_cfg_clone.plugin.rtt_collector.core, + start_core: run_core_on_start, + }, + poll_interval, + host_buffer_size, + target_buffer_size, + metrics, + )?; + rtt_reader_thread(reader, bytes_tx)?; + Ok(()) + }); tokio::select! { _ = tokio::signal::ctrl_c() => { debug!("User signaled shutdown"); // Wait for any on-going transfer to complete + intr.set(); let _session = session.lock().unwrap(); std::thread::sleep(Duration::from_millis(100)); - join_handle.abort(); + rtt_join_handle.abort(); + trc_join_handle.abort(); } - res = &mut join_handle => { + res = &mut trc_join_handle => { match res? { Ok(_) => {}, Err(e) => { - error!(error = %e, "Encountered and error during streaming"); + error!(error = %e, "Encountered an error in TRC processing"); + return Err(e.into()) + } + } + } + res = &mut rtt_join_handle => { + match res? { + Ok(_) => {}, + Err(e) => { + error!(error = %e, "Encountered an error in RTT streaming"); return Err(e.into()) } } @@ -676,116 +708,153 @@ enum Error { TraceRecorder(#[from] modality_trace_recorder_plugin::Error), } -struct TrcRttReader { +struct RttReader { interruptor: Interruptor, session: Arc>, ch: UpChannel, core_index: usize, - last_poll_had_data: bool, - poll_interval: Duration, - ratelimiter: Ratelimiter, + start_core: bool, + rtt_buffer_size: usize, + ratelimiter: Option, metrics: Option, } -impl TrcRttReader { +struct RttReaderProbeState { + session: Arc>, + ch: UpChannel, + core_index: usize, + start_core: bool, +} + +impl RttReader { const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(1); - const NO_DATA_POLL_INTERVAL: Duration = Duration::from_millis(100); const DEFAULT_RTT_BUFFER_SIZE: usize = 1024; + const DEFAULT_CHANNEL_SIZE: usize = 32; pub fn new( interruptor: Interruptor, - session: Arc>, - ch: UpChannel, - core_index: usize, + probe_state: RttReaderProbeState, poll_interval: Duration, - rtt_buffer_size: usize, + host_rtt_buffer_size: usize, + target_rtt_buffer_size: usize, metrics: Option, ) -> Result { - debug!(rtt_buffer_size, data_poll_interval = ?poll_interval, no_data_poll_interval = ?Self::NO_DATA_POLL_INTERVAL, - metrics = metrics.is_some(), "Setup RTT reader"); - let ratelimiter = Ratelimiter::builder(1, poll_interval) - .initial_available(1) - .build()?; - // Make sure we can safely unwrap on set_refill_interval in the Read impl - ratelimiter.set_refill_interval(Self::NO_DATA_POLL_INTERVAL)?; - ratelimiter.set_refill_interval(poll_interval)?; + let RttReaderProbeState { + session, + ch, + core_index, + start_core, + } = probe_state; + + debug!(host_rtt_buffer_size, data_poll_interval = ?poll_interval, metrics = metrics.is_some(), "Setup RTT reader"); + + let ratelimiter = if !poll_interval.is_zero() { + // Setup initial tokens to fulfil an entire target buffer before we + // start ratelimiting + let initial_cnt = if target_rtt_buffer_size >= host_rtt_buffer_size { + std::cmp::max(2, 1 + (target_rtt_buffer_size / host_rtt_buffer_size)) + } else { + // Host-side is bigger, do at least 2 reads + 2 + }; + + let ratelimiter = Ratelimiter::builder(1, poll_interval) + .initial_available(initial_cnt as _) + .build()?; + Some(ratelimiter) + } else { + None + }; + Ok(Self { interruptor, session, ch, core_index, - last_poll_had_data: true, - poll_interval, + start_core, + rtt_buffer_size: host_rtt_buffer_size, ratelimiter, metrics, }) } } -impl io::Read for TrcRttReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if self.interruptor.is_set() { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "RTT reader shutdown", - )); +fn rtt_reader_thread( + mut reader: RttReader, + bytes_tx: mpsc::SyncSender, +) -> Result<(), Error> { + let mut buf = BytesMut::zeroed(reader.rtt_buffer_size); + + let mut session = reader.session.lock().unwrap(); + let mut core = session.core(reader.core_index)?; + + if reader.start_core { + debug!("Run core after breakpoint setup"); + core.run()?; + } + + while !reader.interruptor.is_set() { + let rtt_bytes_read = reader.ch.read(&mut core, &mut buf[..])?; + trace!(rtt_bytes_read); + + if let Some(ratelimiter) = reader.ratelimiter.as_mut() { + if let Err(delay) = ratelimiter.try_wait() { + std::thread::sleep(delay); + } + } + + if rtt_bytes_read != 0 { + buf.truncate(rtt_bytes_read); + let frozen_buf = buf.freeze(); + if bytes_tx.send(frozen_buf).is_err() { + debug!("RTT reader channel closed"); + break; + } + buf = BytesMut::zeroed(reader.rtt_buffer_size); + } + + if let Some(metrics) = reader.metrics.as_mut() { + metrics.update(rtt_bytes_read); } + } + + debug!("RTT reader shutdown"); + + Ok(()) +} - let mut session = self.session.lock().unwrap(); - let mut core = session - .core(self.core_index) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; +struct RttReceiver { + interruptor: Interruptor, + bytes_rx: mpsc::Receiver, + buffer: VecDeque, +} - let mut bytes_fulfilled = 0; - while bytes_fulfilled == 0 { +impl io::Read for RttReceiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + while self.buffer.len() < buf.len() { if self.interruptor.is_set() { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, - "RTT reader shutdown", + "RTT receiver shutdown", )); } - let rtt_bytes_read = self - .ch - .read(&mut core, &mut buf[bytes_fulfilled..]) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - trace!(rtt_bytes_read); - bytes_fulfilled += rtt_bytes_read; - - // NOTE: this is what probe-rs does - // - // Poll RTT with a frequency of 10 Hz if we do not receive any new data. - // Once we receive new data, we bump the frequency to 1kHz (default). - // - // If the polling frequency is too high, the USB connection to the probe - // can become unstable. Hence we only pull as little as necessary. - // - // SAFETY: we check that both intervals are valid in the constructor - match ((rtt_bytes_read != 0), self.last_poll_had_data) { - (true, false) => { - self.ratelimiter - .set_refill_interval(self.poll_interval) - .unwrap(); + match self.bytes_rx.recv() { + Ok(bytes) => { + self.buffer.extend(bytes.iter()); + self.buffer.make_contiguous(); } - (false, true) => { - self.ratelimiter - .set_refill_interval(Self::NO_DATA_POLL_INTERVAL) - .unwrap(); + Err(_) => { + debug!("RTT receiver channel closed"); + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "RTT receiver shutdown", + )); } - _ => (), - } - self.last_poll_had_data = rtt_bytes_read != 0; - - if let Err(delay) = self.ratelimiter.try_wait() { - std::thread::sleep(delay); - } - - if let Some(metrics) = self.metrics.as_mut() { - metrics.update(rtt_bytes_read); } } - Ok(bytes_fulfilled) + self.buffer.read(buf) } } @@ -802,9 +871,9 @@ struct Metrics { impl Metrics { const WINDOW_DURATION: Duration = Duration::from_secs(2); - fn new(rtt_buffer_size: usize) -> Self { + fn new(host_rtt_buffer_size: usize) -> Self { Self { - rtt_buffer_size: rtt_buffer_size as u64, + rtt_buffer_size: host_rtt_buffer_size as u64, window_start: Instant::now(), read_cnt: 0, bytes_read: 0, From 88bdda4552a5c0a4d8553310a5f2ecef046fcc6e Mon Sep 17 00:00:00 2001 From: Jon Lamb Date: Tue, 28 May 2024 05:34:49 -0700 Subject: [PATCH 2/2] Wrap TCP stream in BufReader --- Cargo.lock | 2 +- Cargo.toml | 20 ++++++++++---------- src/bin/tcp_collector.rs | 10 ++++++---- test_system/specs/device.speqtr | 5 ++++- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f0d6bd..6a24b9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1513,7 +1513,7 @@ dependencies = [ [[package]] name = "modality-trace-recorder-plugin" -version = "0.25.0" +version = "0.26.0" dependencies = [ "async-trait", "auxon-sdk", diff --git a/Cargo.toml b/Cargo.toml index 2067924..3621634 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "modality-trace-recorder-plugin" -version = "0.25.0" +version = "0.26.0" edition = "2021" authors = ["Jon Lamb "] description = "A Modality reflector plugin suite and ingest adapter library for Percepio's TraceRecorder data" @@ -13,17 +13,17 @@ exclude = ["test_system/"] name = "modality_trace_recorder_plugin" path = "src/lib.rs" -#[[bin]] -#name = "modality-trace-recorder-importer" -#path = "src/bin/importer.rs" +[[bin]] +name = "modality-trace-recorder-importer" +path = "src/bin/importer.rs" -#[[bin]] -#name = "modality-trace-recorder-tcp-collector" -#path = "src/bin/tcp_collector.rs" +[[bin]] +name = "modality-trace-recorder-tcp-collector" +path = "src/bin/tcp_collector.rs" -#[[bin]] -#name = "modality-trace-recorder-itm-collector" -#path = "src/bin/itm_collector.rs" +[[bin]] +name = "modality-trace-recorder-itm-collector" +path = "src/bin/itm_collector.rs" [[bin]] name = "modality-trace-recorder-rtt-collector" diff --git a/src/bin/tcp_collector.rs b/src/bin/tcp_collector.rs index 4ea6522..bc7eda4 100644 --- a/src/bin/tcp_collector.rs +++ b/src/bin/tcp_collector.rs @@ -4,7 +4,7 @@ use modality_trace_recorder_plugin::{ tracing::try_init_tracing_subscriber, trc_reader, Command, Interruptor, ReflectorOpts, TimelineAttrKey, TraceRecorderConfig, TraceRecorderConfigEntry, TraceRecorderOpts, }; -use std::io::Write; +use std::io::{BufReader, Write}; use std::net::{SocketAddr, TcpStream}; use std::time::{Duration, Instant}; use tracing::debug; @@ -147,9 +147,11 @@ async fn do_main() -> Result<(), Box> { } let disable_control_plane = cfg.plugin.tcp_collector.disable_control_plane; - let mut stream_clone = stream.try_clone()?; - let mut join_handle = - tokio::spawn(async move { trc_reader::run(&mut stream_clone, cfg, intr).await }); + let stream_clone = stream.try_clone()?; + let mut join_handle = tokio::spawn(async move { + let mut reader = BufReader::new(stream_clone); + trc_reader::run(&mut reader, cfg, intr).await + }); tokio::select! { _ = tokio::signal::ctrl_c() => { diff --git a/test_system/specs/device.speqtr b/test_system/specs/device.speqtr index 4557140..aa168e5 100644 --- a/test_system/specs/device.speqtr +++ b/test_system/specs/device.speqtr @@ -44,11 +44,14 @@ behavior "Heap Usage" end # @type = "error logs" -behavior "No Errors or dropped events" +behavior "No Errors" prohibited case "no error messages exist" * @ * (_.channel = "error" OR _.channel = "#WFR") end +end +# @type = "missing data" +behavior "No dropped events" prohibited case "no dropped events" * @ * (exists(_.trace_recorder.dropped_preceding_events)) end