Skip to content

Commit

Permalink
Moved sequence id generators out of state.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 committed Feb 8, 2024
1 parent b081e36 commit c4f63a0
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 111 deletions.
12 changes: 6 additions & 6 deletions statime/src/port/bmca.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rand::Rng;

use super::{state::MasterState, InBmca, Port, PortActionIterator, Running};
use super::{InBmca, Port, PortActionIterator, Running};
use crate::{
bmc::bmca::{BestAnnounceMessage, RecommendedState},
config::{AcceptableMasterList, LeapIndicator, TimePropertiesDS, TimeSource},
Expand Down Expand Up @@ -146,7 +146,7 @@ impl<'a, A, C: Clock, F: Filter, R: Rng> Port<InBmca<'a>, A, R, C, F> {
let remote_master = announce_message.header.source_port_identity;

let update_state = match &self.port_state {
PortState::Listening | PortState::Master(_) | PortState::Passive => true,
PortState::Listening | PortState::Master | PortState::Passive => true,
PortState::Slave(old_state) => old_state.remote_master() != remote_master,
};

Expand Down Expand Up @@ -177,7 +177,7 @@ impl<'a, A, C: Clock, F: Filter, R: Rng> Port<InBmca<'a>, A, R, C, F> {
let reset_announce = PortAction::ResetAnnounceReceiptTimer { duration };
self.lifecycle.pending_action = actions![reset_announce];
}
PortState::Master(_) => {
PortState::Master => {
let msg = "slave-only PTP port should not be in master state";
debug_assert!(!default_ds.slave_only, "{msg}");
log::error!("{msg}");
Expand All @@ -186,7 +186,7 @@ impl<'a, A, C: Clock, F: Filter, R: Rng> Port<InBmca<'a>, A, R, C, F> {
} else {
match self.port_state {
PortState::Listening | PortState::Slave(_) | PortState::Passive => {
self.set_forced_port_state(PortState::Master(MasterState::new()));
self.set_forced_port_state(PortState::Master);

// Immediately start sending announces and syncs
let duration = core::time::Duration::from_secs(0);
Expand All @@ -195,12 +195,12 @@ impl<'a, A, C: Clock, F: Filter, R: Rng> Port<InBmca<'a>, A, R, C, F> {
PortAction::ResetSyncTimer { duration }
];
}
PortState::Master(_) => { /* do nothing */ }
PortState::Master => { /* do nothing */ }
}
}
}
RecommendedState::P1(_) | RecommendedState::P2(_) => match self.port_state {
PortState::Listening | PortState::Slave(_) | PortState::Master(_) => {
PortState::Listening | PortState::Slave(_) | PortState::Master => {
self.set_forced_port_state(PortState::Passive)
}
PortState::Passive => {}
Expand Down
152 changes: 73 additions & 79 deletions statime/src/port/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ use crate::{

impl<'a, A, C, F: Filter, R> Port<Running<'a>, A, R, C, F> {
pub(super) fn send_sync(&mut self) -> PortActionIterator {
match self.port_state {
PortState::Master(ref mut state) => {
log::trace!("sending sync message");

let seq_id = state.sync_seq_ids.generate();
let packet_length = match Message::sync(
&self.lifecycle.state.default_ds,
self.port_identity,
seq_id,
)
.serialize(&mut self.packet_buffer)
if matches!(self.port_state, PortState::Master) {
log::trace!("sending sync message");

let seq_id = self.sync_seq_ids.generate();
let packet_length =
match Message::sync(&self.lifecycle.state.default_ds, self.port_identity, seq_id)
.serialize(&mut self.packet_buffer)
{
Ok(message) => message,
Err(error) => {
Expand All @@ -30,24 +26,24 @@ impl<'a, A, C, F: Filter, R> Port<Running<'a>, A, R, C, F> {
}
};

actions![
PortAction::ResetSyncTimer {
duration: self.config.sync_interval.as_core_duration(),
actions![
PortAction::ResetSyncTimer {
duration: self.config.sync_interval.as_core_duration(),
},
PortAction::SendEvent {
context: TimestampContext {
inner: TimestampContextInner::Sync { id: seq_id },
},
PortAction::SendEvent {
context: TimestampContext {
inner: TimestampContextInner::Sync { id: seq_id },
},
data: &self.packet_buffer[..packet_length],
}
]
}
_ => actions![],
data: &self.packet_buffer[..packet_length],
}
]
} else {
actions![]
}
}

pub(super) fn handle_sync_timestamp(&mut self, id: u16, timestamp: Time) -> PortActionIterator {
if matches!(self.port_state, PortState::Master(_)) {
if matches!(self.port_state, PortState::Master) {
let packet_length = match Message::follow_up(
&self.lifecycle.state.default_ds,
self.port_identity,
Expand Down Expand Up @@ -78,61 +74,60 @@ impl<'a, A, C, F: Filter, R> Port<Running<'a>, A, R, C, F> {
&mut self,
tlv_provider: &mut impl ForwardedTLVProvider,
) -> PortActionIterator {
match self.port_state {
PortState::Master(ref mut state) => {
log::trace!("sending announce message");

let mut tlv_buffer = [0; MAX_DATA_LEN];
let mut tlv_builder = TlvSetBuilder::new(&mut tlv_buffer);

let mut message = Message::announce(
&self.lifecycle.state,
self.port_identity,
state.announce_seq_ids.generate(),
);
let mut tlv_margin = MAX_DATA_LEN - message.wire_size();

while let Some(tlv) = tlv_provider.next_if_smaller(tlv_margin) {
assert!(tlv.size() < tlv_margin);
if self.lifecycle.state.parent_ds.parent_port_identity != tlv.sender_identity {
// Ignore, shouldn't be forwarded
continue;
}
if matches!(self.port_state, PortState::Master) {
log::trace!("sending announce message");

tlv_margin -= tlv.size();
// Will not fail as previous checks ensure sufficient space in buffer.
tlv_builder.add(tlv.tlv).unwrap();
}
let mut tlv_buffer = [0; MAX_DATA_LEN];
let mut tlv_builder = TlvSetBuilder::new(&mut tlv_buffer);

message.suffix = tlv_builder.build();
let mut message = Message::announce(
&self.lifecycle.state,
self.port_identity,
self.announce_seq_ids.generate(),
);
let mut tlv_margin = MAX_DATA_LEN - message.wire_size();

let packet_length = match Message::announce(
&self.lifecycle.state,
self.port_identity,
state.announce_seq_ids.generate(),
)
.serialize(&mut self.packet_buffer)
{
Ok(length) => length,
Err(error) => {
log::error!(
"Statime bug: Could not serialize announce message {:?}",
error
);
return actions![];
}
};
while let Some(tlv) = tlv_provider.next_if_smaller(tlv_margin) {
assert!(tlv.size() < tlv_margin);
if self.lifecycle.state.parent_ds.parent_port_identity != tlv.sender_identity {
// Ignore, shouldn't be forwarded
continue;
}

actions![
PortAction::ResetAnnounceTimer {
duration: self.config.announce_interval.as_core_duration(),
},
PortAction::SendGeneral {
data: &self.packet_buffer[..packet_length]
}
]
tlv_margin -= tlv.size();
// Will not fail as previous checks ensure sufficient space in buffer.
tlv_builder.add(tlv.tlv).unwrap();
}
_ => actions![],

message.suffix = tlv_builder.build();

let packet_length = match Message::announce(
&self.lifecycle.state,
self.port_identity,
self.announce_seq_ids.generate(),
)
.serialize(&mut self.packet_buffer)
{
Ok(length) => length,
Err(error) => {
log::error!(
"Statime bug: Could not serialize announce message {:?}",
error
);
return actions![];
}
};

actions![
PortAction::ResetAnnounceTimer {
duration: self.config.announce_interval.as_core_duration(),
},
PortAction::SendGeneral {
data: &self.packet_buffer[..packet_length]
}
]
} else {
actions![]
}
}

Expand All @@ -142,7 +137,7 @@ impl<'a, A, C, F: Filter, R> Port<Running<'a>, A, R, C, F> {
message: DelayReqMessage,
timestamp: Time,
) -> PortActionIterator {
if matches!(self.port_state, PortState::Master(_)) {
if matches!(self.port_state, PortState::Master) {
log::debug!("Received DelayReq");
let delay_resp_message = Message::delay_resp(
header,
Expand Down Expand Up @@ -181,7 +176,6 @@ mod tests {
messages::{Header, MessageBody},
},
port::{
state::MasterState,
tests::{setup_test_port, setup_test_state},
NoForwardedTLVs,
},
Expand All @@ -194,7 +188,7 @@ mod tests {

let mut port = setup_test_port(&state);

port.set_forced_port_state(PortState::Master(MasterState::default()));
port.set_forced_port_state(PortState::Master);

port.config.delay_mechanism = DelayMechanism::E2E {
interval: Interval::from_log_2(2),
Expand Down Expand Up @@ -308,7 +302,7 @@ mod tests {

let mut port = setup_test_port(&state);

port.set_forced_port_state(PortState::Master(MasterState::default()));
port.set_forced_port_state(PortState::Master);

let mut actions = port.send_announce(&mut NoForwardedTLVs);

Expand Down Expand Up @@ -369,7 +363,7 @@ mod tests {

let mut port = setup_test_port(&state);

port.set_forced_port_state(PortState::Master(MasterState::default()));
port.set_forced_port_state(PortState::Master);
let mut actions = port.send_sync();

assert!(matches!(
Expand Down
22 changes: 18 additions & 4 deletions statime/src/port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ pub use actions::{
use atomic_refcell::{AtomicRef, AtomicRefCell};
pub use measurement::Measurement;
use rand::Rng;
use state::{MasterState, PortState};
use state::PortState;

use self::sequence_id::SequenceIdGenerator;
pub use crate::datastructures::messages::MAX_DATA_LEN;
#[cfg(doc)]
use crate::PtpInstance;
Expand Down Expand Up @@ -280,6 +281,10 @@ pub struct Port<L, A, R, C, F: Filter> {
packet_buffer: [u8; MAX_DATA_LEN],
lifecycle: L,
rng: R,

announce_seq_ids: SequenceIdGenerator,
sync_seq_ids: SequenceIdGenerator,
delay_seq_ids: SequenceIdGenerator,
}

/// Type state of [`Port`] entered by [`Port::end_bmca`]
Expand Down Expand Up @@ -340,8 +345,8 @@ impl<'a, A: AcceptableMasterList, C: Clock, F: Filter, R: Rng> Port<Running<'a>,
// we didn't hear announce messages from other masters, so become master
// ourselves
match self.port_state {
PortState::Master(_) => (),
_ => self.set_forced_port_state(PortState::Master(MasterState::new())),
PortState::Master => (),
_ => self.set_forced_port_state(PortState::Master),
}

// Immediately start sending syncs and announces
Expand Down Expand Up @@ -382,6 +387,9 @@ impl<'a, A: AcceptableMasterList, C: Clock, F: Filter, R: Rng> Port<Running<'a>,
local_best: None,
state_refcell: self.lifecycle.state_refcell,
},
announce_seq_ids: self.announce_seq_ids,
sync_seq_ids: self.sync_seq_ids,
delay_seq_ids: self.delay_seq_ids,
}
}

Expand Down Expand Up @@ -476,6 +484,9 @@ impl<'a, A, C, F: Filter, R> Port<InBmca<'a>, A, R, C, F> {
state_refcell: self.lifecycle.state_refcell,
state: self.lifecycle.state_refcell.borrow(),
},
announce_seq_ids: self.announce_seq_ids,
sync_seq_ids: self.sync_seq_ids,
delay_seq_ids: self.delay_seq_ids,
},
self.lifecycle.pending_action,
)
Expand Down Expand Up @@ -505,7 +516,7 @@ impl<L, A, R, C, F: Filter> Port<L, A, R, C, F> {

/// Indicate whether this [`Port`] is in the master state.
pub fn is_master(&self) -> bool {
matches!(self.port_state, PortState::Master(_))
matches!(self.port_state, PortState::Master)
}

pub(crate) fn state(&self) -> &PortState<F> {
Expand Down Expand Up @@ -556,6 +567,9 @@ impl<'a, A, C, F: Filter, R: Rng> Port<InBmca<'a>, A, R, C, F> {
local_best: None,
state_refcell,
},
announce_seq_ids: SequenceIdGenerator::new(),
sync_seq_ids: SequenceIdGenerator::new(),
delay_seq_ids: SequenceIdGenerator::new(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion statime/src/port/slave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl<'a, A, C: Clock, F: Filter, R: Rng> Port<Running<'a>, A, R, C, F> {
PortState::Slave(ref mut state) => {
log::debug!("Starting new delay measurement");

let delay_id = state.delay_req_ids.generate();
let delay_id = self.delay_seq_ids.generate();
let delay_req = Message::delay_req(
&self.lifecycle.state.default_ds,
self.port_identity,
Expand Down
Loading

0 comments on commit c4f63a0

Please sign in to comment.