From 3c8f82acff8416ab62073ab2465a4c16472ddc57 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Thu, 30 Nov 2023 11:01:22 +0100 Subject: [PATCH] Actually do something with delay asymmetry. --- statime/src/port/mod.rs | 12 ++-- statime/src/port/state/mod.rs | 19 +++-- statime/src/port/state/slave.rs | 124 +++++++++++++++++++++++++++----- 3 files changed, 130 insertions(+), 25 deletions(-) diff --git a/statime/src/port/mod.rs b/statime/src/port/mod.rs index c0c151fb8..9a1ba0ab5 100644 --- a/statime/src/port/mod.rs +++ b/statime/src/port/mod.rs @@ -399,6 +399,7 @@ impl<'a, A: AcceptableMasterList, C: Clock, F: Filter, R: Rng> Port, timestamp: Time, ) -> PortActionIterator<'_> { let actions = self.port_state.handle_timestamp( + self.config.delay_asymmetry, context, timestamp, self.port_identity, @@ -505,6 +506,7 @@ impl<'a, A: AcceptableMasterList, C: Clock, F: Filter, R: Rng> Port, if message.is_event() { self.port_state.handle_event_receive( + self.config.delay_asymmetry, message, timestamp, self.config.min_delay_req_interval(), @@ -551,10 +553,12 @@ impl<'a, A: AcceptableMasterList, C: Clock, F: Filter, R: Rng> Port, actions![] } } - _ => { - self.port_state - .handle_general_receive(message, self.port_identity, &mut self.clock) - } + _ => self.port_state.handle_general_receive( + self.config.delay_asymmetry, + message, + self.port_identity, + &mut self.clock, + ), } } } diff --git a/statime/src/port/state/mod.rs b/statime/src/port/state/mod.rs index f4902b6e2..8a65cc1c7 100644 --- a/statime/src/port/state/mod.rs +++ b/statime/src/port/state/mod.rs @@ -8,7 +8,7 @@ use crate::{ datastructures::{common::PortIdentity, datasets::DefaultDS, messages::Message}, filters::Filter, ptp_instance::PtpInstanceState, - time::{Interval, Time}, + time::{Duration, Interval, Time}, Clock, }; @@ -28,8 +28,10 @@ pub(crate) enum PortState { } impl PortState { + #[allow(clippy::too_many_arguments)] pub(crate) fn handle_timestamp<'a, C: Clock>( &mut self, + delay_asymmetry: Duration, context: TimestampContext, timestamp: Time, port_identity: PortIdentity, @@ -38,7 +40,9 @@ impl PortState { buffer: &'a mut [u8], ) -> PortActionIterator<'a> { match self { - PortState::Slave(slave) => slave.handle_timestamp(context, timestamp, clock), + PortState::Slave(slave) => { + slave.handle_timestamp(delay_asymmetry, context, timestamp, clock) + } PortState::Master(master) => { master.handle_timestamp(context, timestamp, port_identity, default_ds, buffer) } @@ -46,8 +50,10 @@ impl PortState { } } + #[allow(clippy::too_many_arguments)] pub(crate) fn handle_event_receive<'a, C: Clock>( &mut self, + delay_asymmetry: Duration, message: Message, timestamp: Time, min_delay_req_interval: Interval, @@ -63,13 +69,16 @@ impl PortState { port_identity, buffer, ), - PortState::Slave(slave) => slave.handle_event_receive(message, timestamp, clock), + PortState::Slave(slave) => { + slave.handle_event_receive(delay_asymmetry, message, timestamp, clock) + } PortState::Listening | PortState::Passive => actions![], } } pub(crate) fn handle_general_receive( &mut self, + delay_asymmetry: Duration, message: Message, port_identity: PortIdentity, clock: &mut C, @@ -81,7 +90,9 @@ impl PortState { } actions![] } - PortState::Slave(slave) => slave.handle_general_receive(message, port_identity, clock), + PortState::Slave(slave) => { + slave.handle_general_receive(delay_asymmetry, message, port_identity, clock) + } PortState::Listening | PortState::Passive => { actions![] } diff --git a/statime/src/port/state/slave.rs b/statime/src/port/state/slave.rs index 063e02964..2a07a5a0e 100644 --- a/statime/src/port/state/slave.rs +++ b/statime/src/port/state/slave.rs @@ -70,8 +70,12 @@ impl SlaveState { } } - fn handle_time_measurement<'a, C: Clock>(&mut self, clock: &mut C) -> PortActionIterator<'a> { - if let Some(measurement) = self.extract_measurement() { + fn handle_time_measurement<'a, C: Clock>( + &mut self, + delay_asymmetry: Duration, + clock: &mut C, + ) -> PortActionIterator<'a> { + if let Some(measurement) = self.extract_measurement(delay_asymmetry) { // If the received message allowed the (slave) state to calculate its offset // from the master, update the local clock let filter_updates = self.filter.measurement(measurement, clock); @@ -86,6 +90,7 @@ impl SlaveState { pub(crate) fn handle_timestamp<'a, C: Clock>( &mut self, + delay_asymmetry: Duration, context: TimestampContext, timestamp: Time, clock: &mut C, @@ -93,7 +98,7 @@ impl SlaveState { match context.inner { crate::port::TimestampContextInner::DelayReq { id } => { // handle our send timestamp on a delay request message - self.handle_delay_timestamp(id, timestamp, clock) + self.handle_delay_timestamp(delay_asymmetry, id, timestamp, clock) } _ => { log::error!("Unexpected timestamp"); @@ -104,6 +109,7 @@ impl SlaveState { fn handle_delay_timestamp<'a, C: Clock>( &mut self, + delay_asymmetry: Duration, timestamp_id: u16, timestamp: Time, clock: &mut C, @@ -123,7 +129,7 @@ impl SlaveState { .. } if id == timestamp_id => { *send_time = Some(timestamp); - self.handle_time_measurement(clock) + self.handle_time_measurement(delay_asymmetry, clock) } _ => { log::warn!("Late timestamp for delay request ignored"); @@ -134,6 +140,7 @@ impl SlaveState { pub(crate) fn handle_event_receive<'a, C: Clock>( &mut self, + delay_asymmetry: Duration, message: Message, timestamp: Time, clock: &mut C, @@ -146,7 +153,9 @@ impl SlaveState { } match message.body { - MessageBody::Sync(sync) => self.handle_sync(header, sync, timestamp, clock), + MessageBody::Sync(sync) => { + self.handle_sync(delay_asymmetry, header, sync, timestamp, clock) + } _ => { log::warn!("Unexpected message {:?}", message); actions![] @@ -156,6 +165,7 @@ impl SlaveState { pub(crate) fn handle_general_receive( &mut self, + delay_asymmetry: Duration, message: Message, port_identity: PortIdentity, clock: &mut C, @@ -168,9 +178,11 @@ impl SlaveState { } match message.body { - MessageBody::FollowUp(message) => self.handle_follow_up(header, message, clock), + MessageBody::FollowUp(message) => { + self.handle_follow_up(delay_asymmetry, header, message, clock) + } MessageBody::DelayResp(message) => { - self.handle_delay_resp(header, message, port_identity, clock) + self.handle_delay_resp(delay_asymmetry, header, message, port_identity, clock) } _ => { log::warn!("Unexpected message {:?}", message); @@ -192,6 +204,7 @@ impl SlaveState { fn handle_sync<'a, C: Clock>( &mut self, + delay_asymmetry: Duration, header: &Header, message: SyncMessage, recv_time: Time, @@ -220,7 +233,7 @@ impl SlaveState { .. } if id == header.sequence_id => { *recv_time = Some(corrected_recv_time); - self.handle_time_measurement(clock) + self.handle_time_measurement(delay_asymmetry, clock) } _ => { self.sync_state = SyncState::Measuring { @@ -244,7 +257,7 @@ impl SlaveState { send_time: Some(Time::from(message.origin_timestamp)), recv_time: Some(corrected_recv_time), }; - self.handle_time_measurement(clock) + self.handle_time_measurement(delay_asymmetry, clock) } } } @@ -252,6 +265,7 @@ impl SlaveState { fn handle_follow_up( &mut self, + delay_asymmetry: Duration, header: &Header, message: FollowUpMessage, clock: &mut C, @@ -277,7 +291,7 @@ impl SlaveState { .. } if id == header.sequence_id => { *send_time = Some(packet_send_time); - self.handle_time_measurement(clock) + self.handle_time_measurement(delay_asymmetry, clock) } _ => { self.sync_state = SyncState::Measuring { @@ -285,13 +299,14 @@ impl SlaveState { send_time: Some(packet_send_time), recv_time: None, }; - self.handle_time_measurement(clock) + self.handle_time_measurement(delay_asymmetry, clock) } } } fn handle_delay_resp( &mut self, + delay_asymmetry: Duration, header: &Header, message: DelayRespMessage, port_identity: PortIdentity, @@ -320,7 +335,7 @@ impl SlaveState { *recv_time = Some( Time::from(message.receive_timestamp) - Duration::from(header.correction_field), ); - self.handle_time_measurement(clock) + self.handle_time_measurement(delay_asymmetry, clock) } _ => { log::warn!("Unexpected DelayResp message"); @@ -380,7 +395,7 @@ impl SlaveState { ] } - fn extract_measurement(&mut self) -> Option { + fn extract_measurement(&mut self, delay_asymmetry: Duration) -> Option { let mut result = Measurement::default(); if let SyncState::Measuring { @@ -389,7 +404,7 @@ impl SlaveState { .. } = self.sync_state { - let raw_sync_offset = recv_time - send_time; + let raw_sync_offset = recv_time - send_time - delay_asymmetry; result.event_time = recv_time; result.raw_sync_offset = Some(raw_sync_offset); @@ -513,6 +528,7 @@ mod tests { }); let mut action = state.handle_event_receive( + Duration::ZERO, Message { header, body, @@ -543,6 +559,7 @@ mod tests { }; let mut action = state.handle_event_receive( + Duration::ZERO, Message { header, body: MessageBody::Sync(SyncMessage { @@ -564,6 +581,7 @@ mod tests { }; let mut action = state.handle_general_receive( + Duration::ZERO, Message { header, body: MessageBody::FollowUp(FollowUpMessage { @@ -590,6 +608,46 @@ mod tests { ); } + #[test] + fn test_delay_asymmetry() { + let mut state = SlaveState::::new(Default::default(), ()); + state.mean_delay = Some(Duration::from_micros(100)); + + let header = Header { + two_step_flag: false, + correction_field: TimeInterval(1000.into()), + ..Default::default() + }; + + let body = MessageBody::Sync(SyncMessage { + origin_timestamp: Time::from_micros(0).into(), + }); + + let mut action = state.handle_event_receive( + Duration::from_micros(100), + Message { + header, + body, + suffix: TlvSet::default(), + }, + Time::from_micros(50), + &mut TestClock, + ); + + assert!(action.next().is_none()); + drop(action); + assert_eq!( + state.filter.last_measurement.take(), + Some(Measurement { + event_time: Time::from_micros(49), + offset: Some(Duration::from_micros(-151)), + delay: None, + raw_sync_offset: Some(Duration::from_micros(-51)), + raw_delay_offset: None, + }) + ); + } + #[test] fn test_sync_with_delay() { let mut state = SlaveState::::new(Default::default(), ()); @@ -626,6 +684,7 @@ mod tests { }; let mut action = state.handle_event_receive( + Duration::ZERO, Message { header, body: MessageBody::Sync(SyncMessage { @@ -676,7 +735,12 @@ mod tests { _ => panic!("Incorrect message type"), }; - let mut action = state.handle_timestamp(context, Time::from_micros(100), &mut TestClock); + let mut action = state.handle_timestamp( + Duration::ZERO, + context, + Time::from_micros(100), + &mut TestClock, + ); assert!(action.next().is_none()); drop(action); assert_eq!(state.filter.last_measurement.take(), None); @@ -693,6 +757,7 @@ mod tests { }); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header, body, @@ -726,6 +791,7 @@ mod tests { }; let mut action = state.handle_event_receive( + Duration::ZERO, Message { header, body: MessageBody::Sync(SyncMessage { @@ -767,11 +833,17 @@ mod tests { _ => panic!("Incorrect message type"), }; - let mut action = state.handle_timestamp(context, Time::from_micros(1100), &mut TestClock); + let mut action = state.handle_timestamp( + Duration::ZERO, + context, + Time::from_micros(1100), + &mut TestClock, + ); assert!(action.next().is_none()); assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { correction_field: TimeInterval(2000.into()), @@ -800,6 +872,7 @@ mod tests { ); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { correction_field: TimeInterval(2000.into()), @@ -838,6 +911,7 @@ mod tests { state.mean_delay = Some(Duration::from_micros(100)); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { sequence_id: 15, @@ -859,6 +933,7 @@ mod tests { assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_event_receive( + Duration::ZERO, Message { header: Header { two_step_flag: true, @@ -894,6 +969,7 @@ mod tests { state.mean_delay = Some(Duration::from_micros(100)); let mut action = state.handle_event_receive( + Duration::ZERO, Message { header: Header { two_step_flag: true, @@ -914,6 +990,7 @@ mod tests { assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { sequence_id: 14, @@ -935,6 +1012,7 @@ mod tests { assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { sequence_id: 15, @@ -962,6 +1040,7 @@ mod tests { state.mean_delay = Some(Duration::from_micros(100)); let mut action = state.handle_event_receive( + Duration::ZERO, Message { header: Header { two_step_flag: true, @@ -983,6 +1062,7 @@ mod tests { assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_event_receive( + Duration::ZERO, Message { header: Header { two_step_flag: true, @@ -1003,6 +1083,7 @@ mod tests { assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { sequence_id: 15, @@ -1063,6 +1144,7 @@ mod tests { }; let mut action = state.handle_event_receive( + Duration::ZERO, Message { header: Header { two_step_flag: false, @@ -1108,7 +1190,12 @@ mod tests { panic!("Unexpected action"); }; - let mut action = state.handle_timestamp(context, Time::from_micros(100), &mut TestClock); + let mut action = state.handle_timestamp( + Duration::ZERO, + context, + Time::from_micros(100), + &mut TestClock, + ); assert!(action.next().is_none()); assert_eq!(state.filter.last_measurement.take(), None); @@ -1122,6 +1209,7 @@ mod tests { }; let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { correction_field: TimeInterval(2000.into()), @@ -1147,6 +1235,7 @@ mod tests { assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { correction_field: TimeInterval(2000.into()), @@ -1169,6 +1258,7 @@ mod tests { assert_eq!(state.filter.last_measurement.take(), None); let mut action = state.handle_general_receive( + Duration::ZERO, Message { header: Header { correction_field: TimeInterval(2000.into()),