Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented cross-clock synchronization for hardware timestamping. #270

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion statime-linux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ rand = { version = "0.8.5", default-features = false, features = ["std", "std_rn
serde = { version = "1.0.188", features = ["derive"] }


clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "3ab6721" }
clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "8ca7481" }
timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "b98af7e", features = ["serde"] }
38 changes: 10 additions & 28 deletions statime-linux/src/clock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use statime::{Clock, Duration, Time, TimePropertiesDS};

#[derive(Debug, Clone)]
pub struct LinuxClock {
clock: clock_steering::unix::UnixClock,
pub clock: clock_steering::unix::UnixClock,
}

impl LinuxClock {
Expand All @@ -20,16 +20,6 @@ impl LinuxClock {

Ok(Self { clock })
}

pub fn timespec(&self) -> std::io::Result<libc::timespec> {
use clock_steering::Clock;

let now = self.clock.now()?;
Ok(libc::timespec {
tv_sec: now.seconds,
tv_nsec: now.nanos as _,
})
}
}

impl clock_steering::Clock for LinuxClock {
Expand Down Expand Up @@ -68,8 +58,10 @@ impl clock_steering::Clock for LinuxClock {
}
}

fn time_from_timestamp(timestamp: clock_steering::Timestamp) -> Time {
let seconds: u64 = timestamp.seconds.try_into().unwrap();
fn time_from_timestamp(timestamp: clock_steering::Timestamp, fallback: Time) -> Time {
let Ok(seconds): Result<u64, _> = timestamp.seconds.try_into() else {
return fallback;
};

let nanos = seconds * 1_000_000_000 + timestamp.nanos as u64;
Time::from_nanos_subnanos(nanos, 0)
Expand All @@ -82,14 +74,14 @@ impl Clock for LinuxClock {
use clock_steering::Clock;

let timestamp = self.clock.now().unwrap();
time_from_timestamp(timestamp)
time_from_timestamp(timestamp, Time::from_fixed_nanos(0))
}

fn set_frequency(&mut self, freq: f64) -> Result<Time, Self::Error> {
use clock_steering::Clock;
log::trace!("Setting clock frequency to {:e}ppm", freq);
let timestamp = self.clock.set_frequency(freq)?;
Ok(time_from_timestamp(timestamp))
Ok(time_from_timestamp(timestamp, statime::Clock::now(self)))
}

fn step_clock(&mut self, time_offset: Duration) -> Result<Time, Self::Error> {
Expand All @@ -112,21 +104,11 @@ impl Clock for LinuxClock {
);

let timestamp = self.clock.step_clock(offset)?;
Ok(time_from_timestamp(timestamp))
Ok(time_from_timestamp(timestamp, statime::Clock::now(self)))
}

fn set_properties(&mut self, time_properties: &TimePropertiesDS) -> Result<(), Self::Error> {
use clock_steering::Clock;

let leap_indicator = match time_properties.leap_indicator() {
statime::LeapIndicator::NoLeap => clock_steering::LeapIndicator::NoWarning,
statime::LeapIndicator::Leap61 => clock_steering::LeapIndicator::Leap61,
statime::LeapIndicator::Leap59 => clock_steering::LeapIndicator::Leap59,
};

if time_properties.is_ptp() {
self.clock.set_leap_seconds(leap_indicator)?;
}
fn set_properties(&mut self, _time_properties: &TimePropertiesDS) -> Result<(), Self::Error> {
// For now just ignore these

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion statime-linux/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub struct Config {
pub domain: u8,
pub priority1: u8,
pub priority2: u8,
pub hardware_clock: Option<String>,
#[serde(rename = "port")]
pub ports: Vec<PortConfig>,
}
Expand All @@ -22,6 +21,8 @@ pub struct Config {
pub struct PortConfig {
pub interface: InterfaceName,
#[serde(default)]
pub hardware_clock: Option<String>,
#[serde(default)]
pub network_mode: NetworkMode,
pub announce_interval: i8,
pub sync_interval: i8,
Expand Down
184 changes: 165 additions & 19 deletions statime-linux/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
future::Future,
path::PathBuf,
pin::{pin, Pin},
Expand All @@ -9,8 +10,9 @@ use clap::Parser;
use fern::colors::Color;
use rand::{rngs::StdRng, SeedableRng};
use statime::{
BasicFilter, ClockIdentity, InBmca, InstanceConfig, Port, PortAction, PortActionIterator,
PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext, MAX_DATA_LEN,
BasicFilter, ClockIdentity, Filter, InBmca, InstanceConfig, Measurement, Port, PortAction,
PortActionIterator, PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext,
MAX_DATA_LEN,
};
use statime_linux::{
clock::LinuxClock,
Expand All @@ -23,7 +25,7 @@ use statime_linux::{
use timestamped_socket::{
interface::InterfaceIterator,
networkaddress::NetworkAddress,
socket::{Open, Socket},
socket::{InterfaceTimestampMode, Open, Socket},
};
use tokio::{
sync::mpsc::{Receiver, Sender},
Expand Down Expand Up @@ -152,6 +154,102 @@ impl Future for Timer {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
enum ClockSyncMode {
#[default]
FromSystem,
ToSystem,
}

fn start_clock_task(clock: LinuxClock) -> tokio::sync::watch::Sender<ClockSyncMode> {
let (mode_sender, mode_receiver) = tokio::sync::watch::channel(ClockSyncMode::FromSystem);

tokio::spawn(clock_task(clock, mode_receiver));

mode_sender
}

fn clock_timestamp_to_time(t: clock_steering::Timestamp) -> statime::Time {
Time::from_nanos((t.seconds as u64) * 1_000_000_000 + (t.nanos as u64))
}

async fn clock_task(
clock: LinuxClock,
mut mode_receiver: tokio::sync::watch::Receiver<ClockSyncMode>,
) {
let mut measurement_timer = pin!(Timer::new());
let mut update_timer = pin!(Timer::new());

measurement_timer.as_mut().reset(std::time::Duration::ZERO);

let mut filter = BasicFilter::new(0.25);

let mut current_mode = *mode_receiver.borrow_and_update();
let mut filter_clock = match current_mode {
ClockSyncMode::FromSystem => clock.clone(),
ClockSyncMode::ToSystem => LinuxClock::CLOCK_REALTIME,
};
loop {
tokio::select! {
() = &mut measurement_timer => {
let (t1, t2, t3) = clock.clock.system_offset().expect("Unable to determine offset from system clock");
let t1 = clock_timestamp_to_time(t1);
let t2 = clock_timestamp_to_time(t2);
let t3 = clock_timestamp_to_time(t3);

log::debug!("Interclock measurement: {} {} {}", t1, t2, t3);

let delay = (t3-t1)/2;
let offset_a = t2 - t1;
let offset_b = t3 - t2;

let m = match current_mode {
ClockSyncMode::FromSystem => Measurement {
event_time: t2,
offset: Some(offset_a - delay),
delay: Some(delay),
raw_sync_offset: Some(offset_a),
raw_delay_offset: Some(-offset_b),
},
ClockSyncMode::ToSystem => Measurement {
event_time: t1+delay,
offset: Some(offset_b - delay),
delay: Some(delay),
raw_sync_offset: Some(offset_b),
raw_delay_offset: Some(-offset_a),
},
};

let update = filter.measurement(m, &mut filter_clock);
if let Some(timeout) = update.next_update {
update_timer.as_mut().reset(timeout);
}

measurement_timer.as_mut().reset(std::time::Duration::from_millis(250));
}
() = &mut update_timer => {
let update = filter.update(&mut filter_clock);
if let Some(timeout) = update.next_update {
update_timer.as_mut().reset(timeout);
}
}
_ = mode_receiver.changed() => {
let new_mode = *mode_receiver.borrow_and_update();
if new_mode != current_mode {
let mut new_filter = BasicFilter::new(0.25);
std::mem::swap(&mut filter, &mut new_filter);
new_filter.demobilize(&mut filter_clock);
match new_mode {
ClockSyncMode::FromSystem => filter_clock = clock.clone(),
ClockSyncMode::ToSystem => filter_clock = LinuxClock::CLOCK_REALTIME,
}
current_mode = new_mode;
}
}
}
}
}

#[tokio::main]
async fn main() {
actual_main().await;
Expand All @@ -170,12 +268,6 @@ async fn actual_main() {
let log_level = log::LevelFilter::from_str(&config.loglevel).unwrap();
setup_logger(log_level).expect("could not setup logging");

let local_clock = if let Some(hardware_clock) = config.hardware_clock {
LinuxClock::open(hardware_clock).expect("could not open hardware clock")
} else {
LinuxClock::CLOCK_REALTIME
};

let clock_identity = ClockIdentity(get_clock_id().expect("could not get clock identity"));

let instance_config = InstanceConfig {
Expand All @@ -202,11 +294,37 @@ async fn actual_main() {
let mut main_task_senders = Vec::with_capacity(config.ports.len());
let mut main_task_receivers = Vec::with_capacity(config.ports.len());

let mut internal_sync_senders = vec![];

let mut clock_name_map = HashMap::new();
let mut clock_port_map = Vec::with_capacity(config.ports.len());

for port_config in config.ports {
let interface = port_config.interface;
let network_mode = port_config.network_mode;
let (port_clock, timestamping) = match &port_config.hardware_clock {
Some(path) => {
let clock = LinuxClock::open(path).expect("Unable to open clock");
if let Some(id) = clock_name_map.get(path) {
clock_port_map.push(Some(*id));
} else {
let id = internal_sync_senders.len();
clock_port_map.push(Some(id));
clock_name_map.insert(path.clone(), id);
internal_sync_senders.push(start_clock_task(clock.clone()));
}
(clock, InterfaceTimestampMode::HardwarePTPAll)
}
None => {
clock_port_map.push(None);
(
LinuxClock::CLOCK_REALTIME,
InterfaceTimestampMode::SoftwareAll,
)
}
};
let rng = StdRng::from_entropy();
let port = instance.add_port(port_config.into(), 0.25, local_clock.clone(), rng);
let port = instance.add_port(port_config.into(), 0.25, port_clock, rng);

let (main_task_sender, port_task_receiver) = tokio::sync::mpsc::channel(1);
let (port_task_sender, main_task_receiver) = tokio::sync::mpsc::channel(1);
Expand All @@ -221,8 +339,8 @@ async fn actual_main() {

match network_mode {
statime_linux::config::NetworkMode::Ipv4 => {
let event_socket =
open_ipv4_event_socket(interface).expect("Could not open event socket");
let event_socket = open_ipv4_event_socket(interface, timestamping)
.expect("Could not open event socket");
let general_socket =
open_ipv4_general_socket(interface).expect("Could not open general socket");

Expand All @@ -235,8 +353,8 @@ async fn actual_main() {
));
}
statime_linux::config::NetworkMode::Ipv6 => {
let event_socket =
open_ipv6_event_socket(interface).expect("Could not open event socket");
let event_socket = open_ipv6_event_socket(interface, timestamping)
.expect("Could not open event socket");
let general_socket =
open_ipv6_general_socket(interface).expect("Could not open general socket");

Expand All @@ -256,6 +374,8 @@ async fn actual_main() {
bmca_notify_sender,
main_task_receivers,
main_task_senders,
internal_sync_senders,
clock_port_map,
)
.await
}
Expand All @@ -265,6 +385,8 @@ async fn run(
bmca_notify_sender: tokio::sync::watch::Sender<bool>,
mut main_task_receivers: Vec<Receiver<Port<InBmca<'static>, StdRng, LinuxClock, BasicFilter>>>,
main_task_senders: Vec<Sender<Port<InBmca<'static>, StdRng, LinuxClock, BasicFilter>>>,
internal_sync_senders: Vec<tokio::sync::watch::Sender<ClockSyncMode>>,
clock_port_map: Vec<Option<usize>>,
) -> ! {
// run bmca over all of the ports at the same time. The ports don't perform
// their normal actions at this time: bmca is stop-the-world!
Expand Down Expand Up @@ -300,6 +422,18 @@ async fn run(

instance.bmca(&mut mut_bmca_ports);

let mut clock_states = vec![ClockSyncMode::FromSystem; internal_sync_senders.len()];
for (idx, port) in mut_bmca_ports.iter().enumerate() {
if port.is_steering() {
if let Some(id) = clock_port_map[idx] {
clock_states[id] = ClockSyncMode::ToSystem;
}
}
}
for (mode, sender) in clock_states.into_iter().zip(internal_sync_senders.iter()) {
sender.send(mode).expect("Clock mode change failed");
}

drop(mut_bmca_ports);

for (port, sender) in bmca_ports.into_iter().zip(main_task_senders.iter()) {
Expand Down Expand Up @@ -356,7 +490,15 @@ async fn port_task<A: NetworkAddress + PtpTargetAddress>(
loop {
let mut actions = tokio::select! {
result = event_socket.recv(&mut event_buffer) => match result {
Ok(packet) => port.handle_timecritical_receive(&event_buffer[..packet.bytes_read], timestamp_to_time(packet.timestamp.expect("Missing timestamp on recv"))),
Ok(packet) => {
if let Some(timestamp) = packet.timestamp {
log::trace!("Recv timestamp: {:?}", packet.timestamp);
port.handle_timecritical_receive(&event_buffer[..packet.bytes_read], timestamp_to_time(timestamp))
} else {
log::error!("Missing recv timestamp");
PortActionIterator::empty()
}
}
Err(error) => panic!("Error receiving: {error:?}"),
},
result = general_socket.recv(&mut general_buffer) => match result {
Expand Down Expand Up @@ -425,17 +567,21 @@ async fn handle_actions<A: NetworkAddress + PtpTargetAddress>(
let time = event_socket
.send_to(data, A::PRIMARY_EVENT)
.await
.unwrap()
.unwrap();
.expect("Failed to send event message");

// anything we send later will have a later pending (send) timestamp
pending_timestamp = Some((context, timestamp_to_time(time)));
if let Some(time) = time {
log::trace!("Send timestamp {:?}", time);
pending_timestamp = Some((context, timestamp_to_time(time)));
} else {
log::error!("Missing send timestamp");
}
}
PortAction::SendGeneral { data } => {
general_socket
.send_to(data, A::PRIMARY_GENERAL)
.await
.unwrap();
.expect("Failed to send general message");
}
PortAction::ResetAnnounceTimer { duration } => {
timers.port_announce_timer.as_mut().reset(duration);
Expand Down
Loading