From cb09fc95fda68f0920ea0f4fad5fb402f8f993ae Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Thu, 28 Sep 2023 15:13:53 +0200 Subject: [PATCH] Implemented cross-clock synchronization for hardware timestamping. --- Cargo.lock | 2 +- statime-linux/Cargo.toml | 2 +- statime-linux/src/clock/mod.rs | 12 +-- statime-linux/src/config/mod.rs | 3 +- statime-linux/src/main.rs | 160 +++++++++++++++++++++++++++++--- statime-linux/src/socket.rs | 6 +- 6 files changed, 155 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 993d51ebe..374480a67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,7 +170,7 @@ checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" [[package]] name = "clock-steering" version = "0.1.0" -source = "git+https://github.com/pendulum-project/clock-steering.git?rev=3ab6721#3ab6721439d602968e4f6acde085266b7c27636b" +source = "git+https://github.com/pendulum-project/clock-steering.git?rev=46974d3#46974d35e30d9cf1dc0caef7bac71fda478b8efa" dependencies = [ "libc", ] diff --git a/statime-linux/Cargo.toml b/statime-linux/Cargo.toml index f35f6f785..eea90d12f 100644 --- a/statime-linux/Cargo.toml +++ b/statime-linux/Cargo.toml @@ -19,5 +19,5 @@ rand = { version = "0.8.5", default-features = false, features = ["std", "std_rn serde = { version = "1.0.188", features = ["derive"] } -clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "3ab6721" } +clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "46974d3" } timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "021bebb", features = ["serde"] } diff --git a/statime-linux/src/clock/mod.rs b/statime-linux/src/clock/mod.rs index 1203afe80..51feb75e9 100644 --- a/statime-linux/src/clock/mod.rs +++ b/statime-linux/src/clock/mod.rs @@ -7,7 +7,7 @@ use statime::{Clock, Duration, Time, TimePropertiesDS}; #[derive(Debug, Clone)] pub struct LinuxClock { - clock: clock_steering::unix::UnixClock, + pub clock: clock_steering::unix::UnixClock, } impl LinuxClock { @@ -20,16 +20,6 @@ impl LinuxClock { Ok(Self { clock }) } - - pub fn timespec(&self) -> std::io::Result { - use clock_steering::Clock; - - let now = self.clock.now()?; - Ok(libc::timespec { - tv_sec: now.seconds, - tv_nsec: now.nanos as _, - }) - } } impl clock_steering::Clock for LinuxClock { diff --git a/statime-linux/src/config/mod.rs b/statime-linux/src/config/mod.rs index 968892967..651c596a4 100644 --- a/statime-linux/src/config/mod.rs +++ b/statime-linux/src/config/mod.rs @@ -13,7 +13,6 @@ pub struct Config { pub domain: u8, pub priority1: u8, pub priority2: u8, - pub hardware_clock: Option, #[serde(rename = "port")] pub ports: Vec, } @@ -22,6 +21,8 @@ pub struct Config { pub struct PortConfig { pub interface: InterfaceName, #[serde(default)] + pub hardware_clock: Option, + #[serde(default)] pub network_mode: NetworkMode, pub announce_interval: i8, pub sync_interval: i8, diff --git a/statime-linux/src/main.rs b/statime-linux/src/main.rs index 92e15c081..b9e8d9152 100644 --- a/statime-linux/src/main.rs +++ b/statime-linux/src/main.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, future::Future, path::PathBuf, pin::{pin, Pin}, @@ -9,8 +10,9 @@ use clap::Parser; use fern::colors::Color; use rand::{rngs::StdRng, SeedableRng}; use statime::{ - BasicFilter, ClockIdentity, InBmca, InstanceConfig, Port, PortAction, PortActionIterator, - PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext, MAX_DATA_LEN, + BasicFilter, ClockIdentity, Filter, InBmca, InstanceConfig, Measurement, Port, PortAction, + PortActionIterator, PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext, + MAX_DATA_LEN, }; use statime_linux::{ clock::LinuxClock, @@ -23,7 +25,7 @@ use statime_linux::{ use timestamped_socket::{ interface::InterfaceIterator, networkaddress::NetworkAddress, - socket::{Open, Socket}, + socket::{InterfaceTimestampMode, Open, Socket}, }; use tokio::{ sync::mpsc::{Receiver, Sender}, @@ -152,6 +154,97 @@ impl Future for Timer { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +enum ClockSyncMode { + #[default] + FromSystem, + ToSystem, +} + +fn start_clock_task(clock: LinuxClock) -> tokio::sync::watch::Sender { + let (mode_sender, mode_receiver) = tokio::sync::watch::channel(ClockSyncMode::FromSystem); + + tokio::spawn(clock_task(clock, mode_receiver)); + + mode_sender +} + +fn clock_timestamp_to_time(t: clock_steering::Timestamp) -> statime::Time { + Time::from_nanos((t.seconds as u64) * 1_000_000_000 + (t.nanos as u64)) +} + +async fn clock_task( + clock: LinuxClock, + mut mode_receiver: tokio::sync::watch::Receiver, +) { + let mut measurement_timer = pin!(Timer::new()); + let mut update_timer = pin!(Timer::new()); + + measurement_timer.as_mut().reset(std::time::Duration::ZERO); + + let mut filter = BasicFilter::new(0.25); + let mut filter_clock = LinuxClock::CLOCK_REALTIME; + + let mut current_mode = *mode_receiver.borrow_and_update(); + loop { + tokio::select! { + () = &mut measurement_timer => { + let (t1, t2, t3) = clock.clock.system_offset().expect("Unable to determine offset from system clock"); + let t1 = clock_timestamp_to_time(t1); + let t2 = clock_timestamp_to_time(t2); + let t3 = clock_timestamp_to_time(t3); + + let delay = (t3-t1)/2; + let offset_a = t2 - t1; + let offset_b = t3 - t2; + + let m = match current_mode { + ClockSyncMode::FromSystem => Measurement { + event_time: t2, + offset: Some(offset_a - delay), + delay: Some(delay), + raw_sync_offset: Some(offset_a), + raw_delay_offset: Some(-offset_b), + }, + ClockSyncMode::ToSystem => Measurement { + event_time: t1+delay, + offset: Some(offset_b - delay), + delay: Some(delay), + raw_sync_offset: Some(offset_b), + raw_delay_offset: Some(-offset_a), + }, + }; + + let update = filter.measurement(m, &mut filter_clock); + if let Some(timeout) = update.next_update { + update_timer.as_mut().reset(timeout); + } + + measurement_timer.as_mut().reset(std::time::Duration::from_millis(250)); + } + () = &mut update_timer => { + let update = filter.update(&mut filter_clock); + if let Some(timeout) = update.next_update { + update_timer.as_mut().reset(timeout); + } + } + _ = mode_receiver.changed() => { + let new_mode = *mode_receiver.borrow_and_update(); + if new_mode != current_mode { + let mut new_filter = BasicFilter::new(0.25); + std::mem::swap(&mut filter, &mut new_filter); + new_filter.demobilize(&mut filter_clock); + match new_mode { + ClockSyncMode::FromSystem => filter_clock = clock.clone(), + ClockSyncMode::ToSystem => filter_clock = LinuxClock::CLOCK_REALTIME, + } + current_mode = new_mode; + } + } + } + } +} + #[tokio::main] async fn main() { actual_main().await; @@ -170,12 +263,6 @@ async fn actual_main() { let log_level = log::LevelFilter::from_str(&config.loglevel).unwrap(); setup_logger(log_level).expect("could not setup logging"); - let local_clock = if let Some(hardware_clock) = config.hardware_clock { - LinuxClock::open(hardware_clock).expect("could not open hardware clock") - } else { - LinuxClock::CLOCK_REALTIME - }; - let clock_identity = ClockIdentity(get_clock_id().expect("could not get clock identity")); let instance_config = InstanceConfig { @@ -202,11 +289,37 @@ async fn actual_main() { let mut main_task_senders = Vec::with_capacity(config.ports.len()); let mut main_task_receivers = Vec::with_capacity(config.ports.len()); + let mut internal_sync_senders = vec![]; + + let mut clock_name_map = HashMap::new(); + let mut clock_port_map = Vec::with_capacity(config.ports.len()); + for port_config in config.ports { let interface = port_config.interface; let network_mode = port_config.network_mode; + let (port_clock, timestamping) = match &port_config.hardware_clock { + Some(path) => { + let clock = LinuxClock::open(path).expect("Unable to open clock"); + if let Some(id) = clock_name_map.get(path) { + clock_port_map.push(Some(*id)); + } else { + let id = internal_sync_senders.len(); + clock_port_map.push(Some(id)); + clock_name_map.insert(path.clone(), id); + internal_sync_senders.push(start_clock_task(clock.clone())); + } + (clock, InterfaceTimestampMode::HardwarePTPAll) + } + None => { + clock_port_map.push(None); + ( + LinuxClock::CLOCK_REALTIME, + InterfaceTimestampMode::SoftwareAll, + ) + } + }; let rng = StdRng::from_entropy(); - let port = instance.add_port(port_config.into(), 0.25, local_clock.clone(), rng); + let port = instance.add_port(port_config.into(), 0.25, port_clock, rng); let (main_task_sender, port_task_receiver) = tokio::sync::mpsc::channel(1); let (port_task_sender, main_task_receiver) = tokio::sync::mpsc::channel(1); @@ -221,8 +334,8 @@ async fn actual_main() { match network_mode { statime_linux::config::NetworkMode::Ipv4 => { - let event_socket = - open_ipv4_event_socket(interface).expect("Could not open event socket"); + let event_socket = open_ipv4_event_socket(interface, timestamping) + .expect("Could not open event socket"); let general_socket = open_ipv4_general_socket(interface).expect("Could not open general socket"); @@ -235,8 +348,8 @@ async fn actual_main() { )); } statime_linux::config::NetworkMode::Ipv6 => { - let event_socket = - open_ipv6_event_socket(interface).expect("Could not open event socket"); + let event_socket = open_ipv6_event_socket(interface, timestamping) + .expect("Could not open event socket"); let general_socket = open_ipv6_general_socket(interface).expect("Could not open general socket"); @@ -256,6 +369,8 @@ async fn actual_main() { bmca_notify_sender, main_task_receivers, main_task_senders, + internal_sync_senders, + clock_port_map, ) .await } @@ -265,6 +380,8 @@ async fn run( bmca_notify_sender: tokio::sync::watch::Sender, mut main_task_receivers: Vec, StdRng, LinuxClock, BasicFilter>>>, main_task_senders: Vec, StdRng, LinuxClock, BasicFilter>>>, + internal_sync_senders: Vec>, + clock_port_map: Vec>, ) -> ! { // run bmca over all of the ports at the same time. The ports don't perform // their normal actions at this time: bmca is stop-the-world! @@ -300,6 +417,21 @@ async fn run( instance.bmca(&mut mut_bmca_ports); + let mut clock_states: Vec<_> = internal_sync_senders + .iter() + .map(|_| ClockSyncMode::FromSystem) + .collect(); + for (idx, port) in mut_bmca_ports.iter().enumerate() { + if port.is_steering() { + if let Some(id) = clock_port_map[idx] { + clock_states[id] = ClockSyncMode::ToSystem; + } + } + } + for (mode, sender) in clock_states.into_iter().zip(internal_sync_senders.iter()) { + sender.send(mode).expect("Clock mode change failed"); + } + drop(mut_bmca_ports); for (port, sender) in bmca_ports.into_iter().zip(main_task_senders.iter()) { diff --git a/statime-linux/src/socket.rs b/statime-linux/src/socket.rs index a684aba4d..21f23d46b 100644 --- a/statime-linux/src/socket.rs +++ b/statime-linux/src/socket.rs @@ -42,8 +42,9 @@ impl PtpTargetAddress for SocketAddrV6 { pub fn open_ipv4_event_socket( interface: InterfaceName, + timestamping: InterfaceTimestampMode, ) -> std::io::Result> { - let socket = open_interface_udp4(interface, EVENT_PORT, InterfaceTimestampMode::SoftwareAll)?; + let socket = open_interface_udp4(interface, EVENT_PORT, timestamping)?; socket.join_multicast(SocketAddrV4::new(IPV4_PRIMARY_MULTICAST, 0))?; socket.join_multicast(SocketAddrV4::new(IPV4_PDELAY_MULTICAST, 0))?; Ok(socket) @@ -60,8 +61,9 @@ pub fn open_ipv4_general_socket( pub fn open_ipv6_event_socket( interface: InterfaceName, + timestamping: InterfaceTimestampMode, ) -> std::io::Result> { - let socket = open_interface_udp6(interface, EVENT_PORT, InterfaceTimestampMode::SoftwareAll)?; + let socket = open_interface_udp6(interface, EVENT_PORT, timestamping)?; socket.join_multicast(SocketAddrV6::new(IPV6_PRIMARY_MULTICAST, 0, 0, 0))?; socket.join_multicast(SocketAddrV6::new(IPV6_PDELAY_MULTICAST, 0, 0, 0))?; Ok(socket)