diff --git a/Cargo.lock b/Cargo.lock index 29a40a0ef..193a5b9cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,7 +170,7 @@ checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" [[package]] name = "clock-steering" version = "0.1.0" -source = "git+https://github.com/pendulum-project/clock-steering.git?rev=3ab6721#3ab6721439d602968e4f6acde085266b7c27636b" +source = "git+https://github.com/pendulum-project/clock-steering.git?rev=8ca7481#8ca748164fab25c709eb7f93af75a0ea915dba69" dependencies = [ "libc", ] diff --git a/statime-linux/Cargo.toml b/statime-linux/Cargo.toml index 8051e5800..ae1437e27 100644 --- a/statime-linux/Cargo.toml +++ b/statime-linux/Cargo.toml @@ -19,5 +19,5 @@ rand = { version = "0.8.5", default-features = false, features = ["std", "std_rn serde = { version = "1.0.188", features = ["derive"] } -clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "3ab6721" } +clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "8ca7481" } timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "b98af7e", features = ["serde"] } diff --git a/statime-linux/src/clock/mod.rs b/statime-linux/src/clock/mod.rs index 1203afe80..57e0a505b 100644 --- a/statime-linux/src/clock/mod.rs +++ b/statime-linux/src/clock/mod.rs @@ -7,7 +7,7 @@ use statime::{Clock, Duration, Time, TimePropertiesDS}; #[derive(Debug, Clone)] pub struct LinuxClock { - clock: clock_steering::unix::UnixClock, + pub clock: clock_steering::unix::UnixClock, } impl LinuxClock { @@ -20,16 +20,6 @@ impl LinuxClock { Ok(Self { clock }) } - - pub fn timespec(&self) -> std::io::Result { - use clock_steering::Clock; - - let now = self.clock.now()?; - Ok(libc::timespec { - tv_sec: now.seconds, - tv_nsec: now.nanos as _, - }) - } } impl clock_steering::Clock for LinuxClock { @@ -68,8 +58,10 @@ impl clock_steering::Clock for LinuxClock { } } -fn time_from_timestamp(timestamp: clock_steering::Timestamp) -> Time { - let seconds: u64 = timestamp.seconds.try_into().unwrap(); +fn time_from_timestamp(timestamp: clock_steering::Timestamp, fallback: Time) -> Time { + let Ok(seconds): Result = timestamp.seconds.try_into() else { + return fallback; + }; let nanos = seconds * 1_000_000_000 + timestamp.nanos as u64; Time::from_nanos_subnanos(nanos, 0) @@ -82,14 +74,14 @@ impl Clock for LinuxClock { use clock_steering::Clock; let timestamp = self.clock.now().unwrap(); - time_from_timestamp(timestamp) + time_from_timestamp(timestamp, Time::from_fixed_nanos(0)) } fn set_frequency(&mut self, freq: f64) -> Result { use clock_steering::Clock; log::trace!("Setting clock frequency to {:e}ppm", freq); let timestamp = self.clock.set_frequency(freq)?; - Ok(time_from_timestamp(timestamp)) + Ok(time_from_timestamp(timestamp, statime::Clock::now(self))) } fn step_clock(&mut self, time_offset: Duration) -> Result { @@ -112,21 +104,11 @@ impl Clock for LinuxClock { ); let timestamp = self.clock.step_clock(offset)?; - Ok(time_from_timestamp(timestamp)) + Ok(time_from_timestamp(timestamp, statime::Clock::now(self))) } - fn set_properties(&mut self, time_properties: &TimePropertiesDS) -> Result<(), Self::Error> { - use clock_steering::Clock; - - let leap_indicator = match time_properties.leap_indicator() { - statime::LeapIndicator::NoLeap => clock_steering::LeapIndicator::NoWarning, - statime::LeapIndicator::Leap61 => clock_steering::LeapIndicator::Leap61, - statime::LeapIndicator::Leap59 => clock_steering::LeapIndicator::Leap59, - }; - - if time_properties.is_ptp() { - self.clock.set_leap_seconds(leap_indicator)?; - } + fn set_properties(&mut self, _time_properties: &TimePropertiesDS) -> Result<(), Self::Error> { + // For now just ignore these Ok(()) } diff --git a/statime-linux/src/config/mod.rs b/statime-linux/src/config/mod.rs index 968892967..651c596a4 100644 --- a/statime-linux/src/config/mod.rs +++ b/statime-linux/src/config/mod.rs @@ -13,7 +13,6 @@ pub struct Config { pub domain: u8, pub priority1: u8, pub priority2: u8, - pub hardware_clock: Option, #[serde(rename = "port")] pub ports: Vec, } @@ -22,6 +21,8 @@ pub struct Config { pub struct PortConfig { pub interface: InterfaceName, #[serde(default)] + pub hardware_clock: Option, + #[serde(default)] pub network_mode: NetworkMode, pub announce_interval: i8, pub sync_interval: i8, diff --git a/statime-linux/src/main.rs b/statime-linux/src/main.rs index 92e15c081..ca89ff287 100644 --- a/statime-linux/src/main.rs +++ b/statime-linux/src/main.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, future::Future, path::PathBuf, pin::{pin, Pin}, @@ -9,8 +10,9 @@ use clap::Parser; use fern::colors::Color; use rand::{rngs::StdRng, SeedableRng}; use statime::{ - BasicFilter, ClockIdentity, InBmca, InstanceConfig, Port, PortAction, PortActionIterator, - PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext, MAX_DATA_LEN, + BasicFilter, ClockIdentity, Filter, InBmca, InstanceConfig, Measurement, Port, PortAction, + PortActionIterator, PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext, + MAX_DATA_LEN, }; use statime_linux::{ clock::LinuxClock, @@ -23,7 +25,7 @@ use statime_linux::{ use timestamped_socket::{ interface::InterfaceIterator, networkaddress::NetworkAddress, - socket::{Open, Socket}, + socket::{InterfaceTimestampMode, Open, Socket}, }; use tokio::{ sync::mpsc::{Receiver, Sender}, @@ -152,6 +154,102 @@ impl Future for Timer { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +enum ClockSyncMode { + #[default] + FromSystem, + ToSystem, +} + +fn start_clock_task(clock: LinuxClock) -> tokio::sync::watch::Sender { + let (mode_sender, mode_receiver) = tokio::sync::watch::channel(ClockSyncMode::FromSystem); + + tokio::spawn(clock_task(clock, mode_receiver)); + + mode_sender +} + +fn clock_timestamp_to_time(t: clock_steering::Timestamp) -> statime::Time { + Time::from_nanos((t.seconds as u64) * 1_000_000_000 + (t.nanos as u64)) +} + +async fn clock_task( + clock: LinuxClock, + mut mode_receiver: tokio::sync::watch::Receiver, +) { + let mut measurement_timer = pin!(Timer::new()); + let mut update_timer = pin!(Timer::new()); + + measurement_timer.as_mut().reset(std::time::Duration::ZERO); + + let mut filter = BasicFilter::new(0.25); + + let mut current_mode = *mode_receiver.borrow_and_update(); + let mut filter_clock = match current_mode { + ClockSyncMode::FromSystem => clock.clone(), + ClockSyncMode::ToSystem => LinuxClock::CLOCK_REALTIME, + }; + loop { + tokio::select! { + () = &mut measurement_timer => { + let (t1, t2, t3) = clock.clock.system_offset().expect("Unable to determine offset from system clock"); + let t1 = clock_timestamp_to_time(t1); + let t2 = clock_timestamp_to_time(t2); + let t3 = clock_timestamp_to_time(t3); + + log::debug!("Interclock measurement: {} {} {}", t1, t2, t3); + + let delay = (t3-t1)/2; + let offset_a = t2 - t1; + let offset_b = t3 - t2; + + let m = match current_mode { + ClockSyncMode::FromSystem => Measurement { + event_time: t2, + offset: Some(offset_a - delay), + delay: Some(delay), + raw_sync_offset: Some(offset_a), + raw_delay_offset: Some(-offset_b), + }, + ClockSyncMode::ToSystem => Measurement { + event_time: t1+delay, + offset: Some(offset_b - delay), + delay: Some(delay), + raw_sync_offset: Some(offset_b), + raw_delay_offset: Some(-offset_a), + }, + }; + + let update = filter.measurement(m, &mut filter_clock); + if let Some(timeout) = update.next_update { + update_timer.as_mut().reset(timeout); + } + + measurement_timer.as_mut().reset(std::time::Duration::from_millis(250)); + } + () = &mut update_timer => { + let update = filter.update(&mut filter_clock); + if let Some(timeout) = update.next_update { + update_timer.as_mut().reset(timeout); + } + } + _ = mode_receiver.changed() => { + let new_mode = *mode_receiver.borrow_and_update(); + if new_mode != current_mode { + let mut new_filter = BasicFilter::new(0.25); + std::mem::swap(&mut filter, &mut new_filter); + new_filter.demobilize(&mut filter_clock); + match new_mode { + ClockSyncMode::FromSystem => filter_clock = clock.clone(), + ClockSyncMode::ToSystem => filter_clock = LinuxClock::CLOCK_REALTIME, + } + current_mode = new_mode; + } + } + } + } +} + #[tokio::main] async fn main() { actual_main().await; @@ -170,12 +268,6 @@ async fn actual_main() { let log_level = log::LevelFilter::from_str(&config.loglevel).unwrap(); setup_logger(log_level).expect("could not setup logging"); - let local_clock = if let Some(hardware_clock) = config.hardware_clock { - LinuxClock::open(hardware_clock).expect("could not open hardware clock") - } else { - LinuxClock::CLOCK_REALTIME - }; - let clock_identity = ClockIdentity(get_clock_id().expect("could not get clock identity")); let instance_config = InstanceConfig { @@ -202,11 +294,37 @@ async fn actual_main() { let mut main_task_senders = Vec::with_capacity(config.ports.len()); let mut main_task_receivers = Vec::with_capacity(config.ports.len()); + let mut internal_sync_senders = vec![]; + + let mut clock_name_map = HashMap::new(); + let mut clock_port_map = Vec::with_capacity(config.ports.len()); + for port_config in config.ports { let interface = port_config.interface; let network_mode = port_config.network_mode; + let (port_clock, timestamping) = match &port_config.hardware_clock { + Some(path) => { + let clock = LinuxClock::open(path).expect("Unable to open clock"); + if let Some(id) = clock_name_map.get(path) { + clock_port_map.push(Some(*id)); + } else { + let id = internal_sync_senders.len(); + clock_port_map.push(Some(id)); + clock_name_map.insert(path.clone(), id); + internal_sync_senders.push(start_clock_task(clock.clone())); + } + (clock, InterfaceTimestampMode::HardwarePTPAll) + } + None => { + clock_port_map.push(None); + ( + LinuxClock::CLOCK_REALTIME, + InterfaceTimestampMode::SoftwareAll, + ) + } + }; let rng = StdRng::from_entropy(); - let port = instance.add_port(port_config.into(), 0.25, local_clock.clone(), rng); + let port = instance.add_port(port_config.into(), 0.25, port_clock, rng); let (main_task_sender, port_task_receiver) = tokio::sync::mpsc::channel(1); let (port_task_sender, main_task_receiver) = tokio::sync::mpsc::channel(1); @@ -221,8 +339,8 @@ async fn actual_main() { match network_mode { statime_linux::config::NetworkMode::Ipv4 => { - let event_socket = - open_ipv4_event_socket(interface).expect("Could not open event socket"); + let event_socket = open_ipv4_event_socket(interface, timestamping) + .expect("Could not open event socket"); let general_socket = open_ipv4_general_socket(interface).expect("Could not open general socket"); @@ -235,8 +353,8 @@ async fn actual_main() { )); } statime_linux::config::NetworkMode::Ipv6 => { - let event_socket = - open_ipv6_event_socket(interface).expect("Could not open event socket"); + let event_socket = open_ipv6_event_socket(interface, timestamping) + .expect("Could not open event socket"); let general_socket = open_ipv6_general_socket(interface).expect("Could not open general socket"); @@ -256,6 +374,8 @@ async fn actual_main() { bmca_notify_sender, main_task_receivers, main_task_senders, + internal_sync_senders, + clock_port_map, ) .await } @@ -265,6 +385,8 @@ async fn run( bmca_notify_sender: tokio::sync::watch::Sender, mut main_task_receivers: Vec, StdRng, LinuxClock, BasicFilter>>>, main_task_senders: Vec, StdRng, LinuxClock, BasicFilter>>>, + internal_sync_senders: Vec>, + clock_port_map: Vec>, ) -> ! { // run bmca over all of the ports at the same time. The ports don't perform // their normal actions at this time: bmca is stop-the-world! @@ -300,6 +422,18 @@ async fn run( instance.bmca(&mut mut_bmca_ports); + let mut clock_states = vec![ClockSyncMode::FromSystem; internal_sync_senders.len()]; + for (idx, port) in mut_bmca_ports.iter().enumerate() { + if port.is_steering() { + if let Some(id) = clock_port_map[idx] { + clock_states[id] = ClockSyncMode::ToSystem; + } + } + } + for (mode, sender) in clock_states.into_iter().zip(internal_sync_senders.iter()) { + sender.send(mode).expect("Clock mode change failed"); + } + drop(mut_bmca_ports); for (port, sender) in bmca_ports.into_iter().zip(main_task_senders.iter()) { @@ -356,7 +490,15 @@ async fn port_task( loop { let mut actions = tokio::select! { result = event_socket.recv(&mut event_buffer) => match result { - Ok(packet) => port.handle_timecritical_receive(&event_buffer[..packet.bytes_read], timestamp_to_time(packet.timestamp.expect("Missing timestamp on recv"))), + Ok(packet) => { + if let Some(timestamp) = packet.timestamp { + log::trace!("Recv timestamp: {:?}", packet.timestamp); + port.handle_timecritical_receive(&event_buffer[..packet.bytes_read], timestamp_to_time(timestamp)) + } else { + log::error!("Missing recv timestamp"); + PortActionIterator::empty() + } + } Err(error) => panic!("Error receiving: {error:?}"), }, result = general_socket.recv(&mut general_buffer) => match result { @@ -425,17 +567,21 @@ async fn handle_actions( let time = event_socket .send_to(data, A::PRIMARY_EVENT) .await - .unwrap() - .unwrap(); + .expect("Failed to send event message"); // anything we send later will have a later pending (send) timestamp - pending_timestamp = Some((context, timestamp_to_time(time))); + if let Some(time) = time { + log::trace!("Send timestamp {:?}", time); + pending_timestamp = Some((context, timestamp_to_time(time))); + } else { + log::error!("Missing send timestamp"); + } } PortAction::SendGeneral { data } => { general_socket .send_to(data, A::PRIMARY_GENERAL) .await - .unwrap(); + .expect("Failed to send general message"); } PortAction::ResetAnnounceTimer { duration } => { timers.port_announce_timer.as_mut().reset(duration); diff --git a/statime-linux/src/socket.rs b/statime-linux/src/socket.rs index a490398b9..2a2b7cbef 100644 --- a/statime-linux/src/socket.rs +++ b/statime-linux/src/socket.rs @@ -42,8 +42,9 @@ impl PtpTargetAddress for SocketAddrV6 { pub fn open_ipv4_event_socket( interface: InterfaceName, + timestamping: InterfaceTimestampMode, ) -> std::io::Result> { - let socket = open_interface_udp4(interface, EVENT_PORT, InterfaceTimestampMode::SoftwareAll)?; + let socket = open_interface_udp4(interface, EVENT_PORT, timestamping)?; socket.join_multicast(SocketAddrV4::new(IPV4_PRIMARY_MULTICAST, 0), interface)?; socket.join_multicast(SocketAddrV4::new(IPV4_PDELAY_MULTICAST, 0), interface)?; Ok(socket) @@ -60,8 +61,9 @@ pub fn open_ipv4_general_socket( pub fn open_ipv6_event_socket( interface: InterfaceName, + timestamping: InterfaceTimestampMode, ) -> std::io::Result> { - let socket = open_interface_udp6(interface, EVENT_PORT, InterfaceTimestampMode::SoftwareAll)?; + let socket = open_interface_udp6(interface, EVENT_PORT, timestamping)?; socket.join_multicast( SocketAddrV6::new(IPV6_PRIMARY_MULTICAST, 0, 0, 0), interface, diff --git a/statime/src/port/mod.rs b/statime/src/port/mod.rs index b6f3daeb1..d607ad6d5 100644 --- a/statime/src/port/mod.rs +++ b/statime/src/port/mod.rs @@ -131,6 +131,11 @@ pub struct PortActionIterator<'a> { } impl<'a> PortActionIterator<'a> { + pub fn empty() -> Self { + Self { + internal: ArrayVec::new().into_iter(), + } + } fn from(list: ArrayVec, MAX_ACTIONS>) -> Self { Self { internal: list.into_iter(), diff --git a/statime/src/port/state/slave.rs b/statime/src/port/state/slave.rs index b98fb3135..7ef192709 100644 --- a/statime/src/port/state/slave.rs +++ b/statime/src/port/state/slave.rs @@ -397,8 +397,6 @@ impl SlaveState { self.last_raw_sync_offset = Some(raw_sync_offset); self.sync_state = SyncState::Empty; - - log::debug!("Raw sync measurement {:?}", result.raw_sync_offset); } else if let DelayState::Measuring { send_time: Some(send_time), recv_time: Some(recv_time), @@ -419,6 +417,8 @@ impl SlaveState { return None; } + log::info!("Measurement: {:?}", result); + Some(result) } }