From 5f4fdfdc424aa513591d5ab664ac826e29035342 Mon Sep 17 00:00:00 2001 From: Jon Lamb Date: Wed, 8 May 2024 09:01:13 -0700 Subject: [PATCH] Refactor to support ineraction modes --- README.md | 15 +- src/attr.rs | 63 +- src/client.rs | 80 ++- src/config.rs | 18 +- src/context_manager.rs | 703 +++++++++++++------- src/context_manager_new.rs | 85 --- src/error.rs | 8 +- src/lib.rs | 4 +- src/opts.rs | 33 + src/recorder_data.rs | 31 +- src/trc_reader.rs | 680 +++---------------- test_system/include/config/FreeRTOSConfig.h | 2 +- test_system/src/sensor.c | 15 +- 13 files changed, 683 insertions(+), 1054 deletions(-) delete mode 100644 src/context_manager_new.rs diff --git a/README.md b/README.md index 82c307f..910e617 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,18 @@ Result 2: ║ timestamp = +2.684145s ``` +### Interaction Modes + +The plugins can be configured for different interaction modes via the `interaction-mode` field (or `--interaction-mode` at the CLI). + +Available modes: +* `fully-linearized`: An interaction is produced on every context switch event from the preceding task/ISR context to the + active task/ISR context. This effectively linearizes the system execution where everything is causally connected. +* `ipc`: An interaction is produced for each task/ISR IPC. + Currently this supports the following object kinds: + - queue + - task notification + ## Configuration All of the plugins can be configured through a TOML configuration file (from either the `--config` option or the `MODALITY_REFLECTOR_CONFIG` environment variable). @@ -141,6 +153,7 @@ These sections are the same for each of the plugins. * `[metadata]` — Plugin configuration table. - `run-id` — Use the provided UUID as the run ID instead of generating a random one. - `time-domain` — Use the provided UUID as the time domain ID instead of generating a random one. + - `interaction-mode` — Interaction mode to use (`fully-linearized` or `ipc`). The default value is `fully-linearized`. - `startup-task-name` — Use the provided initial startup task name instead of the default (`(startup)`). - `single-task-timeline` — Use a single timeline for all tasks instead of a timeline per task. ISRs can still be represented with their own timelines or not. - `disable-task-interactions` — Don't synthesize interactions between tasks and ISRs when a context switch occurs. @@ -241,7 +254,7 @@ reflector configuration file, e.g. `[plugins.ingest.collectors.trace-recorder-it These are used to start and stop tracing by writing control plane commands from the probe. - `command-len-addr` — Use the provided memory address for the ITM streaming port variable `tz_host_command_bytes_to_read`. These are used to start and stop tracing by writing control plane commands from the probe. - - `stimulus-port` — The ITM stimulus port used for trace recorder data.The default value is 1. + - `stimulus-port` — The ITM stimulus port used for trace recorder data. The default value is 1. - `probe-selector` — Select a specific probe instead of opening the first available one. - `chip` — The target chip to attach to (e.g. `STM32F407VE`). - `protocol` — Protocol used to connect to chip. Possible options: [`swd`, `jtag`]. The default value is `swd`. diff --git a/src/attr.rs b/src/attr.rs index 4fbad36..a110cef 100644 --- a/src/attr.rs +++ b/src/attr.rs @@ -1,57 +1,8 @@ -use auxon_sdk::{ - ingest_client::{BoundTimelineState, IngestClient, IngestError}, - ingest_protocol::InternedAttrKey, -}; +use auxon_sdk::ingest_protocol::InternedAttrKey; use derive_more::Display; -use std::{collections::HashMap, fmt, hash::Hash}; +use std::collections::HashMap; -pub trait AttrKeyIndex: Hash + Eq + Clone + fmt::Display {} - -#[derive(Clone, Debug)] -pub struct AttrKeys(HashMap); - -impl Default for AttrKeys { - fn default() -> Self { - Self(HashMap::new()) - } -} - -impl AttrKeys { - pub async fn get( - &mut self, - client: &mut IngestClient, - key: T, - ) -> Result { - if let Some(k) = self.0.get(&key) { - Ok(*k) - } else { - let interned_key = client.declare_attr_key(key.to_string()).await?; - self.0.insert(key, interned_key); - Ok(interned_key) - } - } - - pub(crate) fn remove_string_key_entry(&mut self, key: &str) -> Option<(T, InternedAttrKey)> { - self.0 - .keys() - .find_map(|k| { - let k_str = k.to_string(); - if k_str.as_str() == key { - Some(k.clone()) - } else { - None - } - }) - .and_then(|k| self.0.remove_entry(&k)) - } - - pub(crate) fn insert(&mut self, key: T, interned_key: InternedAttrKey) { - self.0.insert(key, interned_key); - } -} - -impl AttrKeyIndex for TimelineAttrKey {} -impl AttrKeyIndex for EventAttrKey {} +pub type AttrKeys = HashMap; #[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display)] pub enum TimelineAttrKey { @@ -133,8 +84,12 @@ pub enum EventAttrKey { Timestamp, #[display(fmt = "event.interaction.remote_timeline_id")] RemoteTimelineId, - #[display(fmt = "event.interaction.remote_timestamp")] - RemoteTimestamp, + #[display(fmt = "event.interaction.remote_nonce")] + RemoteNonce, + #[display(fmt = "event.internal.trace_recorder.nonce")] + InternalNonce, + #[display(fmt = "event.nonce")] + Nonce, #[display(fmt = "event.mutator.id")] MutatorId, #[display(fmt = "event.internal.trace_recorder.mutator.id")] diff --git a/src/client.rs b/src/client.rs index dd05b99..33b6676 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,55 +1,67 @@ -use crate::attr::{AttrKeys, EventAttrKey, TimelineAttrKey}; +use crate::{ + attr::{AttrKeys, EventAttrKey, TimelineAttrKey}, + error::Error, +}; use auxon_sdk::{ - ingest_client::{BoundTimelineState, IngestClient, IngestError}, - ingest_protocol::InternedAttrKey, + api::{AttrVal, TimelineId}, + ingest_client::{dynamic::DynamicIngestClient, IngestClient, ReadyState}, }; pub struct Client { timeline_keys: AttrKeys, event_keys: AttrKeys, - inner: IngestClient, + pub(crate) inner: DynamicIngestClient, } impl Client { - pub fn new(client: IngestClient) -> Self { + pub fn new(client: IngestClient) -> Self { Self { timeline_keys: AttrKeys::default(), event_keys: AttrKeys::default(), - inner: client, + inner: client.into(), } } - pub async fn close(mut self) -> Result<(), IngestError> { - self.inner.flush().await?; - let _ = self.inner.close_timeline(); - Ok(()) - } - - pub async fn timeline_key( + pub async fn switch_timeline( &mut self, - key: TimelineAttrKey, - ) -> Result { - let k = self.timeline_keys.get(&mut self.inner, key).await?; - Ok(k) - } - - pub async fn event_key(&mut self, key: EventAttrKey) -> Result { - let k = self.event_keys.get(&mut self.inner, key).await?; - Ok(k) - } - - pub fn inner(&mut self) -> &mut IngestClient { - &mut self.inner + id: TimelineId, + new_timeline_attrs: Option>, + ) -> Result<(), Error> { + self.inner.open_timeline(id).await?; + if let Some(attrs) = new_timeline_attrs { + let mut interned_attrs = Vec::new(); + for (k, v) in attrs.into_iter() { + let int_key = if let Some(ik) = self.timeline_keys.get(k) { + *ik + } else { + let ik = self.inner.declare_attr_key(k.to_string()).await?; + self.timeline_keys.insert(k.clone(), ik); + ik + }; + interned_attrs.push((int_key, v.clone())); + } + self.inner.timeline_metadata(interned_attrs).await?; + } + Ok(()) } - pub(crate) fn remove_timeline_string_key( + pub async fn send_event( &mut self, - key: &str, - ) -> Option<(TimelineAttrKey, InternedAttrKey)> { - self.timeline_keys.remove_string_key_entry(key) - } - - pub(crate) fn add_timeline_key(&mut self, key: TimelineAttrKey, interned_key: InternedAttrKey) { - self.timeline_keys.insert(key, interned_key) + ordering: u128, + attrs: impl IntoIterator, + ) -> Result<(), Error> { + let mut interned_attrs = Vec::new(); + for (k, v) in attrs.into_iter() { + let int_key = if let Some(ik) = self.event_keys.get(k) { + *ik + } else { + let ik = self.inner.declare_attr_key(k.to_string()).await?; + self.event_keys.insert(k.clone(), ik); + ik + }; + interned_attrs.push((int_key, v.clone())); + } + self.inner.event(ordering, interned_attrs).await?; + Ok(()) } } diff --git a/src/config.rs b/src/config.rs index aa5f3d1..b7c8151 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,7 @@ use crate::{ error::AuthTokenError, opts::{ - FormatArgAttributeKeysSet, IgnoredObjectClasses, ReflectorOpts, RenameMap, + FormatArgAttributeKeysSet, IgnoredObjectClasses, InteractionMode, ReflectorOpts, RenameMap, TraceRecorderOpts, }, }; @@ -50,6 +50,7 @@ pub struct PluginConfig { pub user_event_channel_rename_map: RenameMap, pub user_event_formatted_string_rename_map: RenameMap, pub user_event_fmt_arg_attr_keys: FormatArgAttributeKeysSet, + pub interaction_mode: InteractionMode, pub import: ImportConfig, pub tcp_collector: TcpCollectorConfig, @@ -309,6 +310,11 @@ impl TraceRecorderConfig { } else { cfg_plugin.user_event_fmt_arg_attr_keys }, + interaction_mode: if let Some(m) = tr_opts.interaction_mode { + m + } else { + cfg_plugin.interaction_mode + }, import: cfg_plugin.import, tcp_collector: cfg_plugin.tcp_collector, itm_collector: cfg_plugin.itm_collector, @@ -365,6 +371,7 @@ mod internal { #[serde(rename = "user-event-formatted-string-name")] pub user_event_formatted_string_rename_map: RenameMap, pub user_event_fmt_arg_attr_keys: FormatArgAttributeKeysSet, + pub interaction_mode: InteractionMode, } impl From for PluginConfig { @@ -385,6 +392,7 @@ mod internal { user_event_channel_rename_map: c.user_event_channel_rename_map, user_event_formatted_string_rename_map: c.user_event_formatted_string_rename_map, user_event_fmt_arg_attr_keys: c.user_event_fmt_arg_attr_keys, + interaction_mode: c.interaction_mode, import: Default::default(), tcp_collector: Default::default(), itm_collector: Default::default(), @@ -532,6 +540,7 @@ time-domain = 'a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d1' startup-task-name = 'm3' user-event-channel = true user-event-format-string = true +interaction-mode = "ipc" single-task-timeline = true flatten-isr-timelines = true disable-task-interactions = true @@ -569,6 +578,7 @@ user-event-channel = true user-event-format-string = true single-task-timeline = true flatten-isr-timelines = true +interaction-mode = "ipc" include-unknown-events = true disable-task-interactions = true disable-control-plane = true @@ -609,6 +619,7 @@ single-task-timeline = true flatten-isr-timelines = true disable-control-plane = true use-timeline-id-channel = true +interaction-mode = "fully-linearized" include-unknown-events = true disable-task-interactions = true ignored-object-classes = ['queue', 'Semaphore'] @@ -663,6 +674,7 @@ flatten-isr-timelines = true disable-control-plane = true use-timeline-id-channel = true disable-task-interactions = true +interaction-mode = "ipc" ignored-object-classes = ['queue', 'Semaphore'] attach-timeout = "100ms" restart = true @@ -790,6 +802,7 @@ metrics = true }] .into_iter() .collect(), + interaction_mode: InteractionMode::Ipc, import: ImportConfig { protocol: None, file: PathBuf::from("/path/to/memdump.bin").into(), @@ -886,6 +899,7 @@ metrics = true }] .into_iter() .collect(), + interaction_mode: InteractionMode::Ipc, import: Default::default(), tcp_collector: TcpCollectorConfig { disable_control_plane: true, @@ -986,6 +1000,7 @@ metrics = true }] .into_iter() .collect(), + interaction_mode: InteractionMode::FullyLinearized, import: Default::default(), tcp_collector: Default::default(), itm_collector: ItmCollectorConfig { @@ -1097,6 +1112,7 @@ metrics = true }] .into_iter() .collect(), + interaction_mode: InteractionMode::Ipc, import: Default::default(), tcp_collector: Default::default(), itm_collector: Default::default(), diff --git a/src/context_manager.rs b/src/context_manager.rs index bf32c2b..a982c73 100644 --- a/src/context_manager.rs +++ b/src/context_manager.rs @@ -1,341 +1,554 @@ use crate::{ attr::EventAttrKey, - client::Client, config::TraceRecorderConfig, + deviant_event_parser::DeviantEventParser, error::Error, - recorder_data::{ContextHandle, RecorderDataExt}, + opts::InteractionMode, + recorder_data::{ + ContextHandle, EventAttributes, NanosecondsExt, RecorderDataExt, TimelineAttributes, + }, }; -use auxon_sdk::{ - api::{AttrVal, TimelineId}, - ingest_client::{IngestClient, ReadyState}, - ingest_protocol::InternedAttrKey, -}; -use std::collections::{HashMap, HashSet}; -use std::hash::Hash; +use auxon_sdk::api::{AttrVal, BigInt, TimelineId}; +use std::collections::{HashMap, VecDeque}; use trace_recorder_parser::{ - time::Timestamp, - types::{Argument, FormatString, FormattedString, ObjectHandle, UserEventChannel}, + streaming::event::{Event, EventCode, EventType, TrackingEventCounter}, + streaming::RecorderData, + time::StreamingInstant, + types::ObjectHandle, }; -use tracing::warn; -use uuid::Uuid; +use tracing::{debug, trace, warn}; + +#[derive(Debug)] +pub struct ContextEvent { + pub context: ObjectHandle, + pub global_ordering: u128, + pub attributes: EventAttributes, + // In fully-linearized interaction mode, every event has a nonce. + // When true, this event contains an interaction from the previous + // event and the previous event's nonce should be visible in + // the conventional attribute key. + pub add_previous_event_nonce: bool, +} -#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] -pub enum ContextSwitchOutcome { - /// Current task/ISR is already marked as active, we're on the same timeline - Same, - /// Switched to a new task/ISR, return the remote (previous) timeline ID and - /// the last event timestamped logged on the previous timeline - Different(TimelineId, Timestamp), +#[derive(Debug)] +pub struct TimelineMeta { + id: TimelineId, + attributes: TimelineAttributes, + /// The nonce recorded on the last event. + /// Effectively a timeline-local event counter so we can draw arbitrary interactions + nonce: InteractionNonce, } -const TIMELINE_ID_CHANNEL_NAME: &str = "modality_timeline_id"; -const TIMELINE_ID_FORMAT_STRING: &str = "name=%s,id=%s"; +#[derive(Debug)] +pub struct ContextManager { + cfg: TraceRecorderConfig, + common_timeline_attributes: TimelineAttributes, -pub(crate) struct ContextManager { - single_task_timeline: bool, - flatten_isr_timelines: bool, - use_timeline_id_channel: bool, - startup_task_name: Option, + global_ordering: u128, + time_rollover_tracker: StreamingInstant, + event_counter_tracker: TrackingEventCounter, - // Used to track interactions between tasks/ISRs + degraded: bool, + first_event_observed: bool, + deviant_event_parser: Option, + objects_to_timelines: HashMap, startup_task_handle: ObjectHandle, - handle_of_last_logged_context: ContextHandle, - timestamp_of_last_event: Timestamp, - object_to_timeline_ids: HashMap, - registered_object_handles: HashSet, - common_timeline_attr_kvs: HashMap, - client: Client, + // State for interaction modes + active_context: ContextHandle, + notif_pending_interactions: HashMap, + queue_pending_interactions: HashMap>, } impl ContextManager { - pub(crate) async fn begin( - client: IngestClient, - cfg: TraceRecorderConfig, - trd: &TR, - ) -> Result { + const UNKNOWN_CONTEXT: &'static str = "UNKNOWN_CONTEXT"; + + pub fn new(mut cfg: TraceRecorderConfig, trd: &RecorderData) -> Result { + if cfg.plugin.interaction_mode != InteractionMode::FullyLinearized { + if cfg.plugin.single_task_timeline { + warn!(interaction_mode = ?cfg.plugin.interaction_mode, + "Configuration 'single-task-timeline' is ignored by the current interaction mode"); + cfg.plugin.single_task_timeline = false; + } + if cfg.plugin.flatten_isr_timelines { + warn!(interaction_mode = ?cfg.plugin.interaction_mode, + "Configuration 'flatten-isr-timelines' is ignored by the current interaction mode"); + cfg.plugin.flatten_isr_timelines = false; + } + } + let startup_task_handle = trd.startup_task_handle()?; - // Setup the root startup task timeline - let startup_task_timeline_id = TimelineId::allocate(); - let mut object_to_timeline_ids = HashMap::new(); - object_to_timeline_ids.insert(startup_task_handle, startup_task_timeline_id); - let mut registered_object_handles = HashSet::new(); - registered_object_handles.insert(startup_task_handle); - let mut client = Client::new(client.open_timeline(startup_task_timeline_id).await?); - - //let common_timeline_attr_kvs = trd.setup_common_timeline_attrs(&cfg, &mut client).await?; - let common_timeline_attr_kvs = Default::default(); - - let mut importer = ContextManager { - single_task_timeline: cfg.plugin.single_task_timeline, - flatten_isr_timelines: cfg.plugin.flatten_isr_timelines, - use_timeline_id_channel: cfg.plugin.use_timeline_id_channel, - startup_task_name: cfg.plugin.startup_task_name, - startup_task_handle, - handle_of_last_logged_context: ContextHandle::Task(startup_task_handle), - timestamp_of_last_event: Timestamp::zero(), - object_to_timeline_ids, - registered_object_handles, - common_timeline_attr_kvs, - client, + let deviant_event_parser = if let Some(base_event_id) = cfg.plugin.deviant_event_id_base { + Some(DeviantEventParser::new( + trd.header.endianness, + base_event_id.into(), + )?) + } else { + None }; - // Add root startup task timeline metadata - importer - .add_timeline_metadata(importer.handle_of_last_logged_context, trd) - .await?; + let common_timeline_attributes = trd.common_timeline_attributes(&cfg); + Ok(Self { + cfg, + common_timeline_attributes, + global_ordering: 0, + time_rollover_tracker: StreamingInstant::zero(), + event_counter_tracker: TrackingEventCounter::zero(), + degraded: false, + first_event_observed: false, + deviant_event_parser, + objects_to_timelines: Default::default(), + startup_task_handle, + active_context: ContextHandle::Task(startup_task_handle), + notif_pending_interactions: Default::default(), + queue_pending_interactions: Default::default(), + }) + } - Ok(importer) + pub fn timeline_meta(&self, handle: ObjectHandle) -> Result<&TimelineMeta, Error> { + self.objects_to_timelines + .get(&handle) + .ok_or(Error::TaskTimelineLookup(handle)) } - pub(crate) async fn end(self) -> Result<(), Error> { - self.client.close().await?; - Ok(()) + pub fn observe_trace_restart(&mut self) { + self.first_event_observed = false; } - pub(crate) fn set_degraded_single_timeline_mode(&mut self) { - self.single_task_timeline = true; - self.flatten_isr_timelines = true; - self.handle_of_last_logged_context = ContextHandle::Task(self.startup_task_handle); + pub fn set_degraded(&mut self, reason: S) { + if !self.degraded { + warn!( + reason = reason.to_string(), + "Downgrading to single timeline mode" + ); + self.cfg.plugin.interaction_mode = InteractionMode::FullyLinearized; + self.cfg.plugin.single_task_timeline = true; + self.cfg.plugin.flatten_isr_timelines = true; + self.cfg.plugin.startup_task_name = Some(Self::UNKNOWN_CONTEXT.to_owned()); + self.active_context = ContextHandle::Task(self.startup_task_handle); + // TODO - see if this works + } } - pub(crate) fn handle_device_timeline_id_channel_event( + pub fn process_event( &mut self, - channel: &UserEventChannel, - format_string: &FormatString, - formatted_string: &FormattedString, - args: &[Argument], - trd: &TR, - ) { - if !self.use_timeline_id_channel || channel.as_str() != TIMELINE_ID_CHANNEL_NAME { - return; - } + event_code: EventCode, + event: &Event, + trd: &RecorderData, + ) -> Result, Error> { + // NOTE: We get events in logical (and tomporal) order, so only need + // a local counter for ordering + self.global_ordering = self.global_ordering.saturating_add(1); + + let event_type = event_code.event_type(); + + let dropped_events = if !self.first_event_observed { + if event_type != EventType::TraceStart { + self.set_degraded(format!( + "First event should be TRACE_START (got {})", + event_type + )); + } - if format_string.as_str() != TIMELINE_ID_FORMAT_STRING { - warn!( - "Invalid format string '{}' on the channel '{TIMELINE_ID_CHANNEL_NAME}'", - format_string - ); - return; + // Setup initial root/startup task + self.active_context = ContextHandle::Task(self.startup_task_handle); + self.alloc_context(self.active_context, trd)?; + + self.event_counter_tracker + .set_initial_count(event.event_count()); + self.first_event_observed = true; + None + } else { + self.event_counter_tracker.update(event.event_count()) + }; + + // Update timer/counter rollover trackers + let event_count_raw = u16::from(event.event_count()); + let event_count = self.event_counter_tracker.count(); + let timer_ticks = event.timestamp(); + let timestamp = self.time_rollover_tracker.elapsed(timer_ticks); + + let mut attrs: EventAttributes = Default::default(); + + // Skip events that are not relevant or are not expected + match event { + Event::ObjectName(_) | Event::TsConfig(_) => return Ok(None), + Event::Unknown(ev) => { + // Handle custom deviant events + let maybe_dev = if let Some(p) = self.deviant_event_parser.as_mut() { + p.parse(ev)? + } else { + None + }; + + if let Some(dev) = maybe_dev { + attrs.insert(EventAttrKey::Name, dev.kind.to_modality_name().into()); + attrs.insert(EventAttrKey::MutatorId, dev.mutator_id.1); + attrs.insert( + EventAttrKey::InternalMutatorId, + dev.mutator_id.0.to_string().into(), + ); + if let Some(m) = dev.mutation_id { + attrs.insert(EventAttrKey::MutationId, m.1); + attrs.insert(EventAttrKey::InternalMutationId, m.0.to_string().into()); + } + if let Some(s) = dev.mutation_success { + attrs.insert(EventAttrKey::MutationSuccess, s); + } + } else if !self.cfg.plugin.include_unknown_events { + debug!( + %event_type, + timestamp = %ev.timestamp, + id = %ev.code.event_id(), + event_count = %ev.event_count, "Skipping unknown"); + return Ok(None); + } + } + _ => (), } - if args.len() != 2 { + // Add core attr set + trd.add_core_event_attributes(&self.cfg.plugin, event_code, event, &mut attrs)?; + + // Add event counter attrs + attrs.insert(EventAttrKey::EventCountRaw, event_count_raw.into()); + attrs.insert(EventAttrKey::EventCount, event_count.into()); + if let Some(dropped_events) = dropped_events { warn!( - "Invalid argument length in '{}' on channel '{TIMELINE_ID_CHANNEL_NAME}'", - formatted_string + event_count = event_count_raw, + dropped_events, "Dropped events detected" ); - return; + + attrs.insert(EventAttrKey::DroppedEvents, dropped_events.into()); } - let obj_name = match &args[0] { - Argument::String(s) => s.as_str(), - _ => { - warn!( - "Invalid argument[0] type in '{}' on channel '{TIMELINE_ID_CHANNEL_NAME}'", - formatted_string - ); - return; + // Add timerstamp attrs + attrs.insert( + EventAttrKey::TimerTicks, + BigInt::new_attr_val(timer_ticks.ticks().into()), + ); + attrs.insert( + EventAttrKey::TimestampTicks, + BigInt::new_attr_val(timestamp.ticks().into()), + ); + attrs.insert( + EventAttrKey::Timestamp, + AttrVal::Timestamp( + trd.timestamp_info + .timer_frequency + .lossy_timestamp_ns(timestamp), + ), + ); + + // Handle interaction mode specifics + match self.cfg.plugin.interaction_mode { + InteractionMode::FullyLinearized => { + Ok(Some(self.process_fully_linearized_mode(attrs, event, trd)?)) } + InteractionMode::Ipc => Ok(Some(self.process_ipc_mode(attrs, event, trd)?)), + } + } + + fn process_fully_linearized_mode( + &mut self, + attrs: EventAttributes, + event: &Event, + trd: &RecorderData, + ) -> Result { + let ctx_switch_outcome = self.handle_context_switch(event, trd)?; + + let mut ctx_event = ContextEvent { + context: self.active_context.object_handle(), + global_ordering: self.global_ordering, + attributes: attrs, + add_previous_event_nonce: false, }; - let obj_handle = match trd.object_handle(obj_name) { - Some(h) => h, - None => { - warn!( - "Object with name '{obj_name}' has not be registered yet, ignoring timeline-id" - ); - return; + match ctx_switch_outcome { + ContextSwitchOutcome::Same => { + // Normal event on the active context } + ContextSwitchOutcome::Different(prev_context) => { + // Context switch event + ctx_event.add_previous_event_nonce = true; + let prev_timeline = self.timeline_mut(prev_context)?; + let interaction_src = prev_timeline.interaction_source(); + ctx_event.add_interaction(interaction_src); + } + } + + let timeline = self.timeline_mut(self.active_context)?; + timeline.increment_nonce(); + ctx_event.add_internal_nonce(timeline.nonce); + + Ok(ctx_event) + } + + fn process_ipc_mode( + &mut self, + attrs: EventAttributes, + event: &Event, + trd: &RecorderData, + ) -> Result { + let _ctx_switch_outcome = self.handle_context_switch(event, trd)?; + + let mut ctx_event = ContextEvent { + context: self.active_context.object_handle(), + global_ordering: self.global_ordering, + attributes: attrs, + add_previous_event_nonce: false, }; - if self.object_to_timeline_ids.contains_key(&obj_handle) { - warn!( - "Object with name '{obj_name}' already has a timeline-id, ignoring provided timeline-id" - ); - return; - } + let timeline = self.timeline_mut(self.active_context)?; + timeline.increment_nonce(); + ctx_event.add_internal_nonce(timeline.nonce); + + match event { + Event::TaskNotify(ev) | Event::TaskNotifyFromIsr(ev) => { + // Record the interaction source (sender task) + ctx_event.promote_internal_nonce(); + let handle_of_task_to_notify = ev.handle; + let interaction_src = timeline.interaction_source(); + // NOTE: for task notifications, only the immediately preceding sender (most recent) + // is captured + let _maybe_overwritten_pending_interaction = self + .notif_pending_interactions + .insert(handle_of_task_to_notify, interaction_src); + } + Event::TaskNotifyWait(ev) => { + // Add any pending remote interaction (recvr task) + if ev.handle != self.active_context.object_handle() { + warn!( + notif_wait_handle = %ev.handle, + active_context = %self.active_context.object_handle(), + "Inconsistent IPC interaction context"); + } + if let Some(pending_interaction) = + self.notif_pending_interactions.remove(&ev.handle) + { + ctx_event.add_interaction(pending_interaction); + } + } - let timeline_id = match &args[1] { - Argument::String(s) => match Uuid::parse_str(s.as_str()) { - Ok(id) => TimelineId::from(id), - Err(e) => { - warn!("The provided timeline-id '{s}' is invalid. {e}"); - return; + Event::QueueSend(ev) | Event::QueueSendFromIsr(ev) => { + // Record the interaction source (sender task), send to back + ctx_event.promote_internal_nonce(); + let interaction_src = timeline.interaction_source(); + let pending_interactions = self + .queue_pending_interactions + .entry(ev.handle) + .or_default(); + pending_interactions.push_back(interaction_src); + } + Event::QueueSendFront(ev) | Event::QueueSendFrontFromIsr(ev) => { + // Record the interaction source (sender task), send to front + ctx_event.promote_internal_nonce(); + let interaction_src = timeline.interaction_source(); + let pending_interactions = self + .queue_pending_interactions + .entry(ev.handle) + .or_default(); + pending_interactions.push_front(interaction_src); + } + Event::QueueReceive(ev) | Event::QueueReceiveFromIsr(ev) => { + // Add any pending remote interaction (recvr task) + let pending_interactions = self + .queue_pending_interactions + .entry(ev.handle) + .or_default(); + if let Some(pending_interaction) = pending_interactions.pop_front() { + ctx_event.add_interaction(pending_interaction); + } + } + Event::QueuePeek(ev) => { + // Add any pending remote interaction (recvr task), but don't remove it + let pending_interactions = self + .queue_pending_interactions + .entry(ev.handle) + .or_default(); + if let Some(pending_interaction) = pending_interactions.pop_front() { + pending_interactions.push_front(pending_interaction); + ctx_event.add_interaction(pending_interaction); } - }, - _ => { - warn!( - "Invalid argument[1] type in '{}' on channel '{TIMELINE_ID_CHANNEL_NAME}'", - formatted_string - ); - return; } - }; - self.object_to_timeline_ids.insert(obj_handle, timeline_id); - } + _ => (), + } - pub(crate) async fn event_key(&mut self, key: EventAttrKey) -> Result { - let k = self.client.event_key(key).await?; - Ok(k) + Ok(ctx_event) } - pub(crate) async fn event( + fn handle_context_switch( &mut self, - timestamp: Timestamp, - ordering: u128, - attrs: impl IntoIterator, - ) -> Result<(), Error> { - if timestamp < self.timestamp_of_last_event { - warn!( - "Time went backwards from {} to {}", - self.timestamp_of_last_event, timestamp - ); - } - - // Keep track of this for interaction remote timestamp on next task switch - self.timestamp_of_last_event = timestamp; - - self.client.inner().event(ordering, attrs).await?; + event: &Event, + trd: &RecorderData, + ) -> Result { + let maybe_contex_switch_handle: Option = match event { + Event::IsrBegin(ev) | Event::IsrResume(ev) => Some(ev.into()), + Event::TaskBegin(ev) | Event::TaskResume(ev) | Event::TaskActivate(ev) => { + Some(ev.into()) + } + _ => None, + }; - Ok(()) + match maybe_contex_switch_handle { + Some(context) => self.context_switch_in(context, trd), + None => Ok(ContextSwitchOutcome::Same), + } } /// Called on the task or ISR context switch-in events /// - /// EventType::TaskSwitchTaskBegin | EventType::TaskSwitchTaskResume - /// EventType::TaskSwitchIsrBegin | EventType::TaskSwitchIsrResume - pub(crate) async fn context_switch_in( + /// EventType: + /// * TaskSwitchTaskBegin + /// * TaskSwitchTaskResume + /// * TaskActivate + /// * TaskSwitchIsrBegin + /// * TaskSwitchIsrResume + fn context_switch_in( &mut self, - event_context: ContextHandle, - trd: &TR, + context: ContextHandle, + trd: &RecorderData, ) -> Result { // We've resumed from the same context we were already in and not doing - // any flattening, nothing to do - if !self.single_task_timeline - && !self.flatten_isr_timelines - && (event_context == self.handle_of_last_logged_context) + // any flattening, nothing left to do + if !self.cfg.plugin.single_task_timeline + && !self.cfg.plugin.flatten_isr_timelines + && (context == self.active_context) { return Ok(ContextSwitchOutcome::Same); } + // self.active_context: // * a task or ISR context in normal operation // * a task context (!single_task_timeline && flatten_isr_timelines) // * startup task context or ISR context (single_task_timeline && !flatten_isr_timelines) // * always startup task context (single_task_timeline && flatten_isr_timelines) - let (prev_handle, prev_timeline_id) = match self.handle_of_last_logged_context { - ContextHandle::Task(h) => ( - ContextHandle::Task(h), - *self - .object_to_timeline_ids - .get(&h) - .ok_or(Error::TaskTimelineLookup(h))?, - ), - ContextHandle::Isr(h) => ( - ContextHandle::Isr(h), - *self - .object_to_timeline_ids - .get(&h) - .ok_or(Error::IsrTimelineLookup(h))?, - ), - }; - let mut timeline_is_new = false; - let curr_timeline_id = match event_context { + let new_context = match context { // EventType::TaskSwitchTaskBegin | EventType::TaskSwitchTaskResume // Resume could be same task or from an ISR, when the task is already marked as active ContextHandle::Task(task_event_handle) => { - let handle = if self.single_task_timeline { + let handle = if self.cfg.plugin.single_task_timeline { self.startup_task_handle } else { task_event_handle }; - self.handle_of_last_logged_context = ContextHandle::Task(handle); - timeline_is_new = self.registered_object_handles.insert(handle); - *self - .object_to_timeline_ids - .entry(handle) - .or_insert_with(TimelineId::allocate) + ContextHandle::Task(handle) } // EventType::TaskSwitchIsrBegin | EventType::TaskSwitchIsrResume // Resume happens when we return to another ISR ContextHandle::Isr(isr_event_handle) => { - if self.flatten_isr_timelines { + if self.cfg.plugin.flatten_isr_timelines { // Flatten the ISR context into the parent task context - self.handle_of_last_logged_context = prev_handle; - prev_timeline_id + self.active_context } else { - self.handle_of_last_logged_context = ContextHandle::Isr(isr_event_handle); - timeline_is_new = self.registered_object_handles.insert(isr_event_handle); - *self - .object_to_timeline_ids - .entry(isr_event_handle) - .or_insert_with(TimelineId::allocate) + ContextHandle::Isr(isr_event_handle) } } }; - self.client.inner().open_timeline(curr_timeline_id).await?; - if timeline_is_new { - // Add timeline metadata in the newly updated context - self.add_timeline_metadata(self.handle_of_last_logged_context, trd) - .await?; - } - - if prev_timeline_id != curr_timeline_id { - Ok(ContextSwitchOutcome::Different( - prev_timeline_id, - self.timestamp_of_last_event, - )) + if new_context != self.active_context { + self.alloc_context(new_context, trd)?; + let prev_context = self.active_context; + self.active_context = new_context; + Ok(ContextSwitchOutcome::Different(prev_context)) } else { Ok(ContextSwitchOutcome::Same) } } - pub(crate) async fn add_timeline_metadata( - &mut self, - handle: ContextHandle, - trd: &TR, - ) -> Result<(), Error> { - /* - let tl_details = trd.timeline_details(handle, self.startup_task_name.as_deref())?; - - let mut attr_kvs = self.common_timeline_attr_kvs.clone(); - attr_kvs.insert( - self.client.timeline_key(tl_details.name_key).await?, - tl_details.name.into(), - ); - attr_kvs.insert( - self.client.timeline_key(tl_details.description_key).await?, - tl_details.description.into(), - ); - attr_kvs.insert( - self.client - .timeline_key(tl_details.object_handle_key) - .await?, - i64::from(u32::from(tl_details.object_handle)).into(), - ); - - self.client.inner().timeline_metadata(attr_kvs).await?; - */ + fn timeline_mut(&mut self, context: ContextHandle) -> Result<&mut TimelineMeta, Error> { + match context { + ContextHandle::Task(h) => self + .objects_to_timelines + .get_mut(&h) + .ok_or(Error::TaskTimelineLookup(h)), + ContextHandle::Isr(h) => self + .objects_to_timelines + .get_mut(&h) + .ok_or(Error::IsrTimelineLookup(h)), + } + } + fn alloc_context(&mut self, context: ContextHandle, trd: &RecorderData) -> Result<(), Error> { + let mut is_new = false; + let tlm = self + .objects_to_timelines + .entry(context.object_handle()) + .or_insert_with(|| { + is_new = true; + let attrs = self.common_timeline_attributes.clone(); + TimelineMeta::new(context.object_handle(), attrs) + }); + if is_new { + trd.add_core_timeline_attributes(&self.cfg.plugin, context, &mut tlm.attributes)?; + } Ok(()) } } -/* -pub(crate) fn arg_to_attr_val(arg: &Argument) -> AttrVal { - match arg { - Argument::I8(v) => AttrVal::Integer(i64::from(*v)), - Argument::U8(v) => AttrVal::Integer(i64::from(*v)), - Argument::I16(v) => AttrVal::Integer(i64::from(*v)), - Argument::U16(v) => AttrVal::Integer(i64::from(*v)), - Argument::I32(v) => AttrVal::Integer(i64::from(*v)), - Argument::U32(v) => AttrVal::Integer(i64::from(*v)), - Argument::F32(v) => AttrVal::from(f64::from(v.0)), - Argument::F64(v) => AttrVal::from(v.0), - Argument::String(v) => AttrVal::String(v.clone().into()), +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] +enum ContextSwitchOutcome { + /// Current task/ISR is already marked as active, we're on the same timeline + Same, + /// Switched to a new task/ISR, return the previous context + Different(ContextHandle), +} + +type QueueHandle = ObjectHandle; +type DestinationContextHandle = ObjectHandle; + +type RemoteTimelineId = TimelineId; +type RemoteInteractionNonce = i64; +type InteractionNonce = i64; +type Interaction = (RemoteTimelineId, RemoteInteractionNonce); + +impl ContextEvent { + pub fn promote_internal_nonce(&mut self) { + if let Some(nonce) = self.attributes.remove(&EventAttrKey::InternalNonce) { + self.attributes.insert(EventAttrKey::Nonce, nonce); + } + } + + fn add_internal_nonce(&mut self, nonce: InteractionNonce) { + self.attributes + .insert(EventAttrKey::InternalNonce, nonce.into()); + } + + fn add_interaction(&mut self, interaction: Interaction) { + self.attributes + .insert(EventAttrKey::RemoteTimelineId, interaction.0.into()); + self.attributes + .insert(EventAttrKey::RemoteNonce, interaction.1.into()); + } +} + +impl TimelineMeta { + fn new(context: ObjectHandle, attributes: TimelineAttributes) -> Self { + let id = TimelineId::allocate(); + trace!(%context, timeline_id = %id, "Creating timeline metadata"); + Self { + id, + attributes, + nonce: 0, + } + } + + fn increment_nonce(&mut self) { + self.nonce = self.nonce.wrapping_add(1); + } + + fn interaction_source(&self) -> Interaction { + (self.id, self.nonce) + } + + pub fn id(&self) -> TimelineId { + self.id + } + + pub fn attributes(&self) -> &TimelineAttributes { + &self.attributes } } -*/ diff --git a/src/context_manager_new.rs b/src/context_manager_new.rs deleted file mode 100644 index 4808141..0000000 --- a/src/context_manager_new.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::{ - attr::{EventAttrKey, TimelineAttrKey}, - config::{PluginConfig, TraceRecorderConfig}, - error::Error, - recorder_data::{ContextHandle, EventAttributes, RecorderDataExt, TimelineAttributes}, -}; -use auxon_sdk::api::{AttrVal, TimelineId}; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::hash::Hash; -use trace_recorder_parser::{ - streaming::event::{Event, EventCode, EventType, TrackingEventCounter}, - time::{StreamingInstant, Timestamp}, - types::{Argument, FormatString, FormattedString, ObjectHandle, UserEventChannel}, -}; -use tracing::{debug, warn}; -use uuid::Uuid; - -#[derive(Debug)] -pub struct ContextEvent { - pub context: ObjectHandle, - pub global_ordering: u128, - pub attributes: EventAttributes, - // In fully-linearized interaction mode, every event has a nonce. - // When true, this event contains an interaction from the previous - // event and the previous event's nonce should be visible in - // the conventional attribute key. - pub add_previous_event_nonce: bool, -} - -type RemoteTimelineId = TimelineId; -type RemoteInteractionNonce = i64; -type InteractionNonce = i64; -type RemoteContext = ObjectHandle; -type ContextSwitchInteraction = (RemoteContext, RemoteTimelineId, RemoteInteractionNonce); - -#[derive(Debug)] -pub struct TimelineMeta { - id: TimelineId, - context: ObjectHandle, - attributes: TimelineAttributes, - /// The nonce recorded on the last event. - /// Effectively a timeline-local event counter so we can draw arbitrary interactions - nonce: InteractionNonce, -} - -#[derive(Debug)] -pub struct ContextManager { - cfg: TraceRecorderConfig, - common_timeline_attrs: TimelineAttributes, - - global_ordering: u128, - time_rollover_tracker: StreamingInstant, - event_counter_tracker: TrackingEventCounter, - - objects_to_timelines: HashMap, - // State for fully-linearized interaction mode - // TODO - // State for ipc interaction mode - // TODO -} - -impl ContextManager { - pub fn new(cfg: TraceRecorderConfig, trd: &TR) -> Result { - todo!() - } - - pub fn timeline_meta(&self, handle: ObjectHandle) -> Result<&TimelineMeta, Error> { - self.objects_to_timelines - .get(&handle) - .ok_or(Error::TaskTimelineLookup(handle)) - } - - pub fn process_event( - &mut self, - event_code: EventCode, - event: &Event, - trd: &TR, - ) -> Result { - // NOTE: We get events in logical (and tomporal) order, so only need - // a local counter for ordering - self.global_ordering = self.global_ordering.saturating_add(1); - - todo!() - } -} diff --git a/src/error.rs b/src/error.rs index aee23ec..db94464 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,3 @@ -use auxon_sdk::ingest_client::{IngestClientInitializationError, IngestError}; use std::io; use trace_recorder_parser::types::{ KernelPortIdentity, ObjectHandle, UserEventArgRecordCount, STARTUP_TASK_NAME, @@ -40,10 +39,13 @@ pub enum Error { DeviantEvent(String), #[error("Encountered an ingest client initialization error. {0}")] - IngestClientInitialization(#[from] IngestClientInitializationError), + IngestClientInitialization(#[from] auxon_sdk::ingest_client::IngestClientInitializationError), #[error("Encountered an ingest client error. {0}")] - Ingest(#[from] IngestError), + Ingest(#[from] auxon_sdk::ingest_client::IngestError), + + #[error("Encountered an ingest client error. {0}")] + DynamicIngest(#[from] auxon_sdk::ingest_client::dynamic::DynamicIngestError), #[error(transparent)] Auth(#[from] AuthTokenError), diff --git a/src/lib.rs b/src/lib.rs index 4135f7a..7a770e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,7 @@ -pub use attr::{AttrKeyIndex, AttrKeys, EventAttrKey, TimelineAttrKey}; +pub use attr::{AttrKeys, EventAttrKey, TimelineAttrKey}; pub use client::Client; pub use command::Command; pub use config::{TraceRecorderConfig, TraceRecorderConfigEntry}; -pub use context_manager::ContextSwitchOutcome; pub use error::Error; pub use interruptor::Interruptor; pub use opts::{ @@ -18,7 +17,6 @@ pub mod client; pub mod command; pub mod config; pub mod context_manager; -pub mod context_manager_new; pub mod deviant_event_parser; pub mod error; pub mod interruptor; diff --git a/src/opts.rs b/src/opts.rs index e4ec548..7fb587b 100644 --- a/src/opts.rs +++ b/src/opts.rs @@ -210,6 +210,18 @@ pub struct TraceRecorderOpts { help_heading = "TRACE RECORDER CONFIGURATION" )] pub include_unknown_events: bool, + + /// Interaction mode to use. + /// + /// * fully-linearized + /// * ipc + #[clap( + long, + verbatim_doc_comment, + name = "interaction-mode", + help_heading = "TRACE RECORDER CONFIGURATION" + )] + pub interaction_mode: Option, } /// A map of trace recorder USER_EVENT channels/format-strings to Modality event names @@ -368,6 +380,27 @@ impl IgnoredObjectClasses { } } +#[derive( + Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default, serde_with::DeserializeFromStr, +)] +pub enum InteractionMode { + #[default] + FullyLinearized, + Ipc, +} + +impl FromStr for InteractionMode { + type Err = String; + + fn from_str(s: &str) -> Result { + Ok(match s.trim().to_lowercase().as_str() { + "fully-linearized" => InteractionMode::FullyLinearized, + "ipc" => InteractionMode::Ipc, + _ => return Err(format!("Invalid interaction mode '{}'", s)), + }) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/recorder_data.rs b/src/recorder_data.rs index 904e9ac..b09d0d4 100644 --- a/src/recorder_data.rs +++ b/src/recorder_data.rs @@ -19,7 +19,7 @@ use trace_recorder_parser::{ time::{Frequency, Timestamp}, types::{Argument, ObjectClass, ObjectHandle, STARTUP_TASK_NAME, UNNAMED_OBJECT}, }; -use tracing::{debug, warn}; +use tracing::{trace, warn}; use uuid::Uuid; pub trait NanosecondsExt { @@ -62,14 +62,14 @@ impl ContextHandle { } } -impl From for ContextHandle { - fn from(event: TaskEvent) -> Self { +impl From<&TaskEvent> for ContextHandle { + fn from(event: &TaskEvent) -> Self { ContextHandle::Task(event.handle) } } -impl From for ContextHandle { - fn from(event: IsrEvent) -> Self { +impl From<&IsrEvent> for ContextHandle { + fn from(event: &IsrEvent) -> Self { ContextHandle::Isr(event.handle) } } @@ -98,10 +98,7 @@ pub trait RecorderDataExt { attrs: &mut TimelineAttributes, ) -> Result<(), Error>; - fn common_timeline_attributes( - &self, - cfg: &TraceRecorderConfig, - ) -> Result; + fn common_timeline_attributes(&self, cfg: &TraceRecorderConfig) -> TimelineAttributes; } #[async_trait] @@ -532,14 +529,11 @@ impl RecorderDataExt for RecorderData { Ok(()) } - fn common_timeline_attributes( - &self, - cfg: &TraceRecorderConfig, - ) -> Result { + fn common_timeline_attributes(&self, cfg: &TraceRecorderConfig) -> TimelineAttributes { let mut attrs = HashMap::new(); let run_id = cfg.plugin.run_id.unwrap_or_else(Uuid::new_v4); let time_domain = cfg.plugin.time_domain.unwrap_or_else(Uuid::new_v4); - debug!(run_id = %run_id, time_domain = %time_domain); + trace!(run_id = %run_id, time_domain = %time_domain); if let Some(r) = self.timestamp_info.timer_frequency.resolution_ns() { // Only have ns resolution if frequency is non-zero @@ -648,25 +642,24 @@ impl RecorderDataExt for RecorderData { .timeline_attributes .additional_timeline_attributes, &mut attrs, - )?; + ); merge_cfg_attributes( &cfg.ingest.timeline_attributes.override_timeline_attributes, &mut attrs, - )?; + ); - Ok(attrs) + attrs } } pub(crate) fn merge_cfg_attributes( attrs_to_merge: &[AttrKeyEqValuePair], attrs: &mut TimelineAttributes, -) -> Result<(), Error> { +) { for kv in attrs_to_merge.iter() { attrs.insert(TimelineAttrKey::Custom(kv.0.to_string()), kv.1.clone()); } - Ok(()) } fn object_name(name: String, class: MaybeUnknownObjectClass, handle: ObjectHandle) -> String { diff --git a/src/trc_reader.rs b/src/trc_reader.rs index 66b3577..6d7d8d5 100644 --- a/src/trc_reader.rs +++ b/src/trc_reader.rs @@ -1,18 +1,14 @@ use crate::{ - attr::EventAttrKey, config::TraceRecorderConfig, deviant_event_parser::DeviantEventParser, - error::Error, interruptor::Interruptor, recorder_data::NanosecondsExt, -}; -use auxon_sdk::{ - api::{AttrVal, BigInt}, - ingest_client::IngestClient, -}; -use std::{collections::HashMap, io::Read}; -use trace_recorder_parser::{ - streaming::event::{Event, EventType, TrackingEventCounter}, - streaming::RecorderData, - time::StreamingInstant, - types::{KernelPortIdentity, ObjectClass}, + client::Client, + config::TraceRecorderConfig, + context_manager::{ContextEvent, ContextManager}, + error::Error, + interruptor::Interruptor, + recorder_data::TimelineAttributes, }; +use auxon_sdk::ingest_client::IngestClient; +use std::{collections::HashSet, io::Read}; +use trace_recorder_parser::{streaming::RecorderData, types::KernelPortIdentity}; use tracing::{debug, warn}; pub async fn run( @@ -31,27 +27,15 @@ pub async fn run( warn!("Frequency is zero, time domain will be in unit ticks"); } - let mut deviant_event_parser = if let Some(base_event_id) = cfg.plugin.deviant_event_id_base { - Some(DeviantEventParser::new( - trd.header.endianness, - base_event_id.into(), - )?) - } else { - None - }; - /* - let client = IngestClient::connect(&cfg.protocol_parent_url()?, cfg.ingest.allow_insecure_tls).await?; - let client = client.authenticate(cfg.resolve_auth()?.into()).await?; - - let mut ordering = 0; - let mut time_rollover_tracker = StreamingInstant::zero(); - let mut event_counter_tracker = TrackingEventCounter::zero(); - let mut first_event_observed = false; - let mut ctx_mngr = ContextManager::begin(client, cfg.clone(), &trd).await?; + let mut client = Client::new(client.authenticate(cfg.resolve_auth()?.into()).await?); + let mut ctx_mngr = ContextManager::new(cfg, &trd)?; + let mut observed_timelines = HashSet::new(); + let mut buffered_event: Option = None; + let mut maybe_result: Option> = None; - loop { + while !intr.is_set() { let (event_code, event) = match trd.read_event(&mut r) { Ok(Some((ec, ev))) => (ec, ev), Ok(None) => break, @@ -61,608 +45,104 @@ pub async fn run( TrcError::ObjectLookup(_) | TrcError::InvalidObjectHandle(_) | TrcError::FormattedString(_) => { - warn!(err=%e, "Downgrading to single timeline mode due to an error in the data"); - ctx_mngr.set_degraded_single_timeline_mode(); + ctx_mngr.set_degraded(format!("Data error {e}")); continue; } TrcError::TraceRestarted(psf_start_word_endianness) => { warn!("Detected a restarted trace stream"); trd = RecorderData::read_with_endianness(psf_start_word_endianness, &mut r)?; - first_event_observed = false; + ctx_mngr.observe_trace_restart(); continue; } _ => { - let _ = ctx_mngr.end().await; - return Err(e.into()); + // Store the result so we can pass it along after flushing buffered events + maybe_result = Some(Err(e.into())); + break; } } } }; - let mut attrs = HashMap::new(); - let event_type = event_code.event_type(); - let event_id = event_code.event_id(); - let parameter_count = event_code.parameter_count(); + debug!(%event_code, %event, "Received event"); - let dropped_events = if !first_event_observed { - if event_type != EventType::TraceStart { - warn!(event_type = %event_type, "First event should be TRACE_START"); + let ctx_event = match ctx_mngr.process_event(event_code, &event, &trd) { + Ok(Some(ev)) => ev, + Ok(None) => continue, + Err(e) => { + // Store the result so we can pass it along after flushing buffered events + maybe_result = Some(Err(e)); + break; } - event_counter_tracker.set_initial_count(event.event_count()); - first_event_observed = true; - None - } else { - event_counter_tracker.update(event.event_count()) }; - let event_count_raw = u16::from(event.event_count()); - let event_count = event_counter_tracker.count(); - let timer_ticks = event.timestamp(); - let timestamp = time_rollover_tracker.elapsed(timer_ticks); - - attrs.insert( - ctx_mngr.event_key(EventAttrKey::EventCountRaw).await?, - event_count_raw.into(), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::EventCount).await?, - event_count.into(), - ); - if let Some(dropped_events) = dropped_events { - warn!( - event_count = event_count_raw, - dropped_events, "Dropped events detected" - ); - - attrs.insert( - ctx_mngr.event_key(EventAttrKey::DroppedEvents).await?, - dropped_events.into(), - ); - } - - match event { - Event::TraceStart(ev) => { - // TODO - check if we need to switch to a non-startup task/timeline - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.current_task_handle).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TaskName).await?, - ev.current_task.to_string().into(), - ); - } - - Event::IsrBegin(ev) | Event::IsrResume(ev) | Event::IsrDefine(ev) => { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::IsrName).await?, - ev.name.to_string().into(), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::IsrPriority).await?, - AttrVal::Integer(u32::from(ev.priority).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - - if matches!( - event_type, - EventType::TaskSwitchIsrBegin | EventType::TaskSwitchIsrResume - ) { - match ctx_mngr.context_switch_in(ev.into(), &trd).await? { - ContextSwitchOutcome::Same => (), - ContextSwitchOutcome::Different(remote_timeline_id, remote_timestamp) => { - if !cfg.plugin.disable_task_interactions { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::RemoteTimelineId).await?, - AttrVal::TimelineId(Box::new(remote_timeline_id)), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::RemoteTimestamp).await?, - AttrVal::Timestamp( - frequency.lossy_timestamp_ns(remote_timestamp), - ), - ); - } - } - } + // Maintain a 1-element buffer so we can ensure the interaction nonce attr key + // is present on the previous event when we encounter a context switch + // on the current event + match buffered_event.take() { + Some(mut prev_event) => { + if ctx_event.add_previous_event_nonce { + prev_event.promote_internal_nonce(); } - } - - Event::TaskBegin(ev) - | Event::TaskReady(ev) - | Event::TaskResume(ev) - | Event::TaskCreate(ev) - | Event::TaskActivate(ev) - | Event::TaskPriority(ev) - | Event::TaskPriorityInherit(ev) - | Event::TaskPriorityDisinherit(ev) => { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TaskName).await?, - ev.name.to_string().into(), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TaskPriority).await?, - AttrVal::Integer(u32::from(ev.priority).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - if matches!( - event_type, - EventType::TaskSwitchTaskBegin - | EventType::TaskSwitchTaskResume - | EventType::TaskActivate - ) { - match ctx_mngr.context_switch_in(ev.into(), &trd).await? { - ContextSwitchOutcome::Same => (), - ContextSwitchOutcome::Different(remote_timeline_id, remote_timestamp) => { - if !cfg.plugin.disable_task_interactions { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::RemoteTimelineId).await?, - AttrVal::TimelineId(Box::new(remote_timeline_id)), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::RemoteTimestamp).await?, - AttrVal::Timestamp( - frequency.lossy_timestamp_ns(remote_timestamp), - ), - ); - } - } - } - } - } - - Event::TaskNotify(ev) - | Event::TaskNotifyFromIsr(ev) - | Event::TaskNotifyWait(ev) - | Event::TaskNotifyWaitBlock(ev) => { - if cfg - .plugin - .ignored_object_classes - .contains(ObjectClass::Task) - { - continue; - } - if let Some(name) = ev.task_name { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TaskName).await?, - AttrVal::String(name.to_string().into()), - ); - } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - if let Some(ticks_to_wait) = ev.ticks_to_wait { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TicksToWait).await?, - AttrVal::Integer(u32::from(ticks_to_wait).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::NanosToWait).await?, - AttrVal::Timestamp(frequency.lossy_timestamp_ns(ticks_to_wait)), - ); - } - } - - Event::MemoryAlloc(ev) | Event::MemoryFree(ev) => { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MemoryAddress).await?, - AttrVal::Integer(ev.address.into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MemorySize).await?, - AttrVal::Integer(ev.size.into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MemoryHeapCurrent).await?, - AttrVal::Integer(ev.heap.current.into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MemoryHeapHighMark).await?, - AttrVal::Integer(ev.heap.high_water_mark.into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MemoryHeapMax).await?, - AttrVal::Integer(ev.heap.max.into()), - ); - } - - Event::UnusedStack(ev) => { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TaskName).await?, - ev.task.to_string().into(), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::StackLowMark).await?, - AttrVal::Integer(ev.low_mark.into()), - ); - } - - Event::QueueCreate(ev) => { - if cfg - .plugin - .ignored_object_classes - .contains(ObjectClass::Queue) - { - continue; - } - if let Some(name) = ev.name { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::QueueName).await?, - AttrVal::String(name.to_string().into()), - ); - } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::QueueLength).await?, - AttrVal::Integer(ev.queue_length.into()), - ); - } - - Event::QueueSend(ev) - | Event::QueueSendBlock(ev) - | Event::QueueSendFromIsr(ev) - | Event::QueueReceive(ev) - | Event::QueueReceiveBlock(ev) - | Event::QueueReceiveFromIsr(ev) - | Event::QueuePeek(ev) - | Event::QueuePeekBlock(ev) - | Event::QueueSendFront(ev) - | Event::QueueSendFrontBlock(ev) - | Event::QueueSendFrontFromIsr(ev) => { - if cfg - .plugin - .ignored_object_classes - .contains(ObjectClass::Queue) - { - continue; - } - if let Some(name) = ev.name { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::QueueName).await?, - AttrVal::String(name.to_string().into()), - ); - } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - attrs.insert( - ctx_mngr - .event_key(EventAttrKey::QueueMessagesWaiting) - .await?, - AttrVal::Integer(ev.messages_waiting.into()), - ); - if let Some(ticks_to_wait) = ev.ticks_to_wait { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TicksToWait).await?, - AttrVal::Integer(u32::from(ticks_to_wait).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::NanosToWait).await?, - AttrVal::Timestamp(frequency.lossy_timestamp_ns(ticks_to_wait)), - ); - } - } - - Event::MutexCreate(ev) => { - if cfg - .plugin - .ignored_object_classes - .contains(ObjectClass::Mutex) - { - continue; - } - if let Some(name) = ev.name { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MutexName).await?, - AttrVal::String(name.to_string().into()), - ); - } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - } + // Buffer the current event + buffered_event = Some(ctx_event); - Event::MutexGive(ev) - | Event::MutexGiveBlock(ev) - | Event::MutexGiveRecursive(ev) - | Event::MutexTake(ev) - | Event::MutexTakeBlock(ev) - | Event::MutexTakeRecursive(ev) - | Event::MutexTakeRecursiveBlock(ev) => { - if cfg - .plugin - .ignored_object_classes - .contains(ObjectClass::Mutex) - { - continue; - } - if let Some(name) = ev.name { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MutexName).await?, - AttrVal::String(name.to_string().into()), - ); + // Send the previous event + let timeline = ctx_mngr.timeline_meta(prev_event.context)?; + let mut new_timeline_attrs: Option<&TimelineAttributes> = None; + if observed_timelines.insert(timeline.id()) { + new_timeline_attrs = Some(timeline.attributes()); } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - if let Some(ticks_to_wait) = ev.ticks_to_wait { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TicksToWait).await?, - AttrVal::Integer(u32::from(ticks_to_wait).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::NanosToWait).await?, - AttrVal::Timestamp(frequency.lossy_timestamp_ns(ticks_to_wait)), - ); - } - } - Event::SemaphoreBinaryCreate(ev) | Event::SemaphoreCountingCreate(ev) => { - if cfg - .plugin - .ignored_object_classes - .contains(ObjectClass::Semaphore) - { - continue; - } - if let Some(name) = ev.name { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::SemaphoreName).await?, - AttrVal::String(name.to_string().into()), - ); - } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - if let Some(count) = ev.count { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::SemaphoreCount).await?, - AttrVal::Integer(count.into()), - ); - } - } + client + .switch_timeline(timeline.id(), new_timeline_attrs) + .await?; - Event::SemaphoreGive(ev) - | Event::SemaphoreGiveBlock(ev) - | Event::SemaphoreGiveFromIsr(ev) - | Event::SemaphoreTake(ev) - | Event::SemaphoreTakeBlock(ev) - | Event::SemaphoreTakeFromIsr(ev) - | Event::SemaphorePeek(ev) - | Event::SemaphorePeekBlock(ev) => { - if cfg - .plugin - .ignored_object_classes - .contains(ObjectClass::Semaphore) - { - continue; - } - if let Some(name) = ev.name { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::SemaphoreName).await?, - AttrVal::String(name.to_string().into()), - ); - } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::ObjectHandle).await?, - AttrVal::Integer(u32::from(ev.handle).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::SemaphoreCount).await?, - AttrVal::Integer(ev.count.into()), - ); - if let Some(ticks_to_wait) = ev.ticks_to_wait { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TicksToWait).await?, - AttrVal::Integer(u32::from(ticks_to_wait).into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::NanosToWait).await?, - AttrVal::Timestamp(frequency.lossy_timestamp_ns(ticks_to_wait)), - ); - } + client + .send_event(prev_event.global_ordering, prev_event.attributes.iter()) + .await?; } - Event::User(ev) => { - ctx_mngr.handle_device_timeline_id_channel_event( - &ev.channel, - &ev.format_string, - &ev.formatted_string, - &ev.args, - &trd, - ); - - if cfg.plugin.user_event_channel { - // Use the channel as the event name - attrs.insert( - ctx_mngr.event_key(EventAttrKey::Name).await?, - ev.channel.to_string().into(), - ); - } else if cfg.plugin.user_event_format_string { - // Use the formatted string as the event name - attrs.insert( - ctx_mngr.event_key(EventAttrKey::Name).await?, - ev.formatted_string.to_string().into(), - ); - } - - // Handle channel event name mappings - if let Some(name) = cfg - .plugin - .user_event_channel_rename_map - .get(ev.channel.as_str()) - { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::Name).await?, - name.to_string().into(), - ); - } else if ev.channel.as_str() == "#WFR" { - warn!( - msg = %ev.formatted_string, - "Target produced a warning on the '#WFR' channel" - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::Name).await?, - "WARNING_FROM_RECORDER".into(), - ); - } - - // Handle format string event name mappings - if let Some(name) = cfg - .plugin - .user_event_formatted_string_rename_map - .get(ev.formatted_string.as_str()) - { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::Name).await?, - name.to_string().into(), - ); - } - - attrs.insert( - ctx_mngr.event_key(EventAttrKey::UserChannel).await?, - ev.channel.to_string().into(), - ); - attrs.insert( - ctx_mngr - .event_key(EventAttrKey::UserFormattedString) - .await?, - ev.formatted_string.to_string().into(), - ); - - let custom_arg_keys = cfg - .plugin - .user_event_fmt_arg_attr_keys - .arg_attr_keys(ev.channel.as_str(), &ev.format_string); - if let Some(custom_arg_keys) = custom_arg_keys { - if custom_arg_keys.len() != ev.args.len() { - return Err(Error::FmtArgAttrKeysCountMismatch( - ev.format_string.into(), - custom_arg_keys.to_vec(), - )); - } - } - - for (idx, arg) in ev.args.iter().enumerate() { - let key = if let Some(custom_arg_keys) = custom_arg_keys { - // SAFETY: len checked above - ctx_mngr - .event_key(EventAttrKey::CustomUserArg(custom_arg_keys[idx].clone())) - .await? - } else { - match idx { - 0 => ctx_mngr.event_key(EventAttrKey::UserArg0).await?, - 1 => ctx_mngr.event_key(EventAttrKey::UserArg1).await?, - 2 => ctx_mngr.event_key(EventAttrKey::UserArg2).await?, - 3 => ctx_mngr.event_key(EventAttrKey::UserArg3).await?, - 4 => ctx_mngr.event_key(EventAttrKey::UserArg4).await?, - 5 => ctx_mngr.event_key(EventAttrKey::UserArg5).await?, - 6 => ctx_mngr.event_key(EventAttrKey::UserArg6).await?, - 7 => ctx_mngr.event_key(EventAttrKey::UserArg7).await?, - 8 => ctx_mngr.event_key(EventAttrKey::UserArg8).await?, - 9 => ctx_mngr.event_key(EventAttrKey::UserArg9).await?, - 10 => ctx_mngr.event_key(EventAttrKey::UserArg10).await?, - 11 => ctx_mngr.event_key(EventAttrKey::UserArg11).await?, - 12 => ctx_mngr.event_key(EventAttrKey::UserArg12).await?, - 13 => ctx_mngr.event_key(EventAttrKey::UserArg13).await?, - 14 => ctx_mngr.event_key(EventAttrKey::UserArg14).await?, - _ => return Err(Error::ExceededMaxUserEventArgs), - } - }; - attrs.insert(key, arg_to_attr_val(arg)); - } + // First iter of the loop + None => { + buffered_event = Some(ctx_event); } + } + } - // Skip These - Event::ObjectName(_) | Event::TsConfig(_) => continue, - - Event::Unknown(ev) => { - let maybe_dev = if let Some(p) = deviant_event_parser.as_mut() { - p.parse(&ev)? - } else { - None - }; - - if let Some(dev) = maybe_dev { - attrs.insert( - ctx_mngr.event_key(EventAttrKey::Name).await?, - dev.kind.to_modality_name().into(), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::MutatorId).await?, - dev.mutator_id.1, - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::InternalMutatorId).await?, - dev.mutator_id.0.to_string().into(), - ); - if let Some(m) = dev.mutation_id { - attrs.insert(ctx_mngr.event_key(EventAttrKey::MutationId).await?, m.1); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::InternalMutationId).await?, - m.0.to_string().into(), - ); - } - if let Some(s) = dev.mutation_success { - attrs.insert(ctx_mngr.event_key(EventAttrKey::MutationSuccess).await?, s); - } - } else if !cfg.plugin.include_unknown_events { - debug!( - %event_type, - timestamp = %ev.timestamp, - id = %ev.code.event_id(), - event_count = %ev.event_count, "Skipping unknown"); - continue; - } - } + // Flush the last event + if let Some(last_event) = buffered_event.take() { + debug!("Flushing buffered events"); + let timeline = ctx_mngr.timeline_meta(last_event.context)?; + let mut new_timeline_attrs: Option<&TimelineAttributes> = None; + if observed_timelines.insert(timeline.id()) { + new_timeline_attrs = Some(timeline.attributes()); } - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TimerTicks).await?, - BigInt::new_attr_val(timer_ticks.ticks().into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::TimestampTicks).await?, - BigInt::new_attr_val(timestamp.ticks().into()), - ); - attrs.insert( - ctx_mngr.event_key(EventAttrKey::Timestamp).await?, - AttrVal::Timestamp(frequency.lossy_timestamp_ns(timestamp)), - ); + client + .switch_timeline(timeline.id(), new_timeline_attrs) + .await?; - ctx_mngr.event(timestamp, ordering, attrs).await?; + client + .send_event(last_event.global_ordering, last_event.attributes.iter()) + .await?; + } - // We get events in logical (and tomporal) order, so only need - // a local counter for ordering - ordering += 1; + client.inner.flush().await?; - if intr.is_set() { - break; - } + if let Ok(status) = client.inner.status().await { + debug!( + events_received = status.events_received, + events_written = status.events_written, + events_pending = status.events_pending, + "Ingest status" + ); } - ctx_mngr.end().await?; - */ - - Ok(()) + if let Some(res) = maybe_result { + res + } else { + Ok(()) + } } diff --git a/test_system/include/config/FreeRTOSConfig.h b/test_system/include/config/FreeRTOSConfig.h index dccdc09..0065684 100644 --- a/test_system/include/config/FreeRTOSConfig.h +++ b/test_system/include/config/FreeRTOSConfig.h @@ -78,7 +78,7 @@ void vAssertCalled(const char* file, int line); #define INCLUDE_uxTaskPriorityGet 1 #define INCLUDE_vTaskDelete 0 #define INCLUDE_vTaskCleanUpResources 0 -#define INCLUDE_vTaskSuspend 0 +#define INCLUDE_vTaskSuspend 1 #define INCLUDE_vTaskDelay 1 #define INCLUDE_xTaskDelayUntil 1 #define INCLUDE_uxTaskGetStackHighWaterMark 0 diff --git a/test_system/src/sensor.c b/test_system/src/sensor.c index b5b1759..be95df4 100644 --- a/test_system/src/sensor.c +++ b/test_system/src/sensor.c @@ -34,14 +34,6 @@ static void sensor_task(void* params) next_wake = xTaskGetTickCount(); while(1) { - /* Intentionally trip an assert */ - if(i == 256) - { - WARN("Going to assert"); - vTaskDelay(UPDATE_PERIOD_MS*10); - } - configASSERT(i < 256); - const BaseType_t was_delayed = xTaskDelayUntil(&next_wake, UPDATE_PERIOD_MS); if(was_delayed == pdFALSE) { @@ -51,6 +43,13 @@ static void sensor_task(void* params) vTracePrintF(ch, "%d", adc_value); actuator_send_adc_data(adc_value); i += 1; + + /* Shutdown the task */ + if(i == 256) + { + WARN("Called vTaskSuspend"); + vTaskSuspend(NULL); + } } }