Skip to content

Commit

Permalink
Feature: add absolute timestamp RaftMetrics::last_quorum_acked
Browse files Browse the repository at this point in the history
`RaftMetrics::last_quorum_acked` is the absolute timestamp of the most
recent time point that is accepted by a quorum via `AppendEntries` RPC.
This field is a wrapped `Instant` type: `SerdeInstant` which support
serde for `Instant`. This field is added as a replacement of
`millis_since_quorum_ack`, which is a relative time.

`SerdeInstant` serialize `Instant` into a u64 in nano seconds since January 1, 1970 UTC.

Note: Serialization and deserialization are not perfectly accurate and
can be indeterministic, resulting in minor variations each time. These
deviations(could be smaller or greater) are typically less than a
microsecond (10^-6 seconds).
  • Loading branch information
drmingdrmer committed Jul 25, 2024
1 parent bcc566e commit ab35b28
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 31 deletions.
20 changes: 15 additions & 5 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::core::sm;
use crate::core::ServerState;
use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplayResultExt;
use crate::display_ext::DisplaySlice;
use crate::display_ext::DisplaySliceExt;
use crate::engine::handler::replication_handler::SendNone;
Expand All @@ -60,6 +61,7 @@ use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::SerdeInstant;
use crate::network::v2::RaftNetworkV2;
use crate::network::RPCOption;
use crate::network::RPCTypes;
Expand Down Expand Up @@ -498,12 +500,16 @@ where
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool {
tracing::debug!(now = debug(C::now()), "send_heartbeat");
tracing::debug!(now = display(C::now().display()), "send_heartbeat");

let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
lh
} else {
tracing::debug!(now = debug(C::now()), "{} failed to send heartbeat", emitter);
tracing::debug!(
now = display(C::now().display()),
"{} failed to send heartbeat",
emitter
);
return false;
};

Expand Down Expand Up @@ -553,6 +559,7 @@ where
let membership_config = st.membership_state.effective().stored_membership().clone();
let current_leader = self.current_leader();

#[allow(deprecated)]
let m = RaftMetrics {
running_state: Ok(()),
id: self.id,
Expand All @@ -569,19 +576,22 @@ where
state: st.server_state,
current_leader,
millis_since_quorum_ack,
last_quorum_acked: last_quorum_acked.map(SerdeInstant::new),
membership_config: membership_config.clone(),
heartbeat: heartbeat.clone(),

// --- replication ---
replication: replication.clone(),
};

#[allow(deprecated)]
let data_metrics = RaftDataMetrics {
last_log: st.last_log_id().copied(),
last_applied: st.io_applied().copied(),
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),
millis_since_quorum_ack,
last_quorum_acked: last_quorum_acked.map(SerdeInstant::new),
replication,
heartbeat,
};
Expand Down Expand Up @@ -1265,7 +1275,7 @@ where
// check every timer

let now = C::now();
tracing::debug!("received tick: {}, now: {:?}", i, now);
tracing::debug!("received tick: {}, now: {}", i, now.display());

self.handle_tick_election();

Expand Down Expand Up @@ -1422,7 +1432,7 @@ where
fn handle_tick_election(&mut self) {
let now = C::now();

tracing::debug!("try to trigger election by tick, now: {:?}", now);
tracing::debug!("try to trigger election by tick, now: {}", now.display());

// TODO: leader lease should be extended. Or it has to examine if it is leader
// before electing.
Expand Down Expand Up @@ -1494,7 +1504,7 @@ where
tracing::debug!(
target = display(target),
request_id = display(request_id),
result = debug(&result),
result = display(result.display()),
"handle_replication_progress"
);

Expand Down
8 changes: 5 additions & 3 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplayResultExt;
use crate::engine::handler::log_handler::LogHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::Command;
Expand Down Expand Up @@ -147,7 +149,7 @@ where C: RaftTypeConfig
/// accepted.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_leader_clock(&mut self, node_id: C::NodeId, t: InstantOf<C>) {
tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!());
tracing::debug!(target = display(node_id), t = display(t.display()), "{}", func_name!());

let granted = *self
.leader
Expand All @@ -156,7 +158,7 @@ where C: RaftTypeConfig
.expect("it should always update existing progress");

tracing::debug!(
granted = debug(granted),
granted = display(granted.as_ref().map(|x| x.display()).display()),
clock_progress = debug(&self.leader.clock_progress),
"granted leader vote clock after updating"
);
Expand Down Expand Up @@ -267,7 +269,7 @@ where C: RaftTypeConfig
tracing::debug!(
target = display(target),
request_id = display(request_id),
result = debug(&repl_res),
result = display(repl_res.display()),
progress = display(&self.leader.progress),
"{}",
func_name!()
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;

use crate::core::raft_msg::ResultSender;
use crate::display_ext::DisplayInstantExt;
use crate::engine::handler::leader_handler::LeaderHandler;
use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::handler::replication_handler::SendNone;
Expand Down Expand Up @@ -150,7 +151,7 @@ where C: RaftTypeConfig

// Update vote related timer and lease.

tracing::debug!(now = debug(C::now()), "{}", func_name!());
tracing::debug!(now = display(C::now().display()), "{}", func_name!());

self.update_internal_server_state();

Expand Down
2 changes: 0 additions & 2 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ where C: RaftTypeConfig
#[error(transparent)]
Closed(#[from] ReplicationClosed),

// TODO(xp): two sub type: StorageError / TransportError
// TODO(xp): a sub error for just append_entries()
#[error(transparent)]
StorageError(#[from] StorageError<C>),

Expand Down
2 changes: 2 additions & 0 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod raft_metrics;
mod wait;

mod metric_display;
mod serde_instant;
mod wait_condition;
#[cfg(test)]
mod wait_test;
Expand All @@ -42,6 +43,7 @@ pub use metric::Metric;
pub use raft_metrics::RaftDataMetrics;
pub use raft_metrics::RaftMetrics;
pub use raft_metrics::RaftServerMetrics;
pub use serde_instant::SerdeInstant;
pub use wait::Wait;
pub use wait::WaitError;
pub(crate) use wait_condition::Condition;
Expand Down
73 changes: 68 additions & 5 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use std::sync::Arc;
use crate::core::ServerState;
use crate::display_ext::DisplayBTreeMapOptValue;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::Fatal;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::SerdeInstant;
use crate::type_config::alias::InstantOf;
use crate::Instant;
use crate::LogId;
use crate::RaftTypeConfig;
use crate::StoredMembership;
Expand Down Expand Up @@ -69,8 +71,27 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
///
/// Use `last_quorum_acked` instead, which is absolute timestamp.
/// This value relates to the time when metrics is reported, which may behind the current time
/// by an unknown duration(although it should be very small).
#[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")]
pub millis_since_quorum_ack: Option<u64>,

/// For a leader, it is the most recently acknowledged timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This timestamp can be used by the application to assess the likelihood that the leader has
/// lost synchronization with the cluster.
/// An older value may suggest a higher probability of the leader being partitioned from the
/// cluster.
pub last_quorum_acked: Option<SerdeInstant<InstantOf<C>>>,

/// The current membership config of the cluster.
pub membership_config: Arc<StoredMembership<C>>,

Expand Down Expand Up @@ -98,17 +119,27 @@ where C: RaftTypeConfig

write!(
f,
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}(since_last_ack:{} ms)",
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}",
self.id,
self.state,
self.current_term,
self.vote,
DisplayOption(&self.last_log_index),
DisplayOption(&self.last_applied),
DisplayOption(&self.current_leader),
DisplayOption(&self.millis_since_quorum_ack),
)?;

if let Some(quorum_acked) = &self.last_quorum_acked {
write!(
f,
"(quorum_acked_time:{}, {:?} ago)",
quorum_acked,
quorum_acked.elapsed()
)?;
} else {
write!(f, "(quorum_acked_time:None)")?;
}

write!(f, ", ")?;
write!(
f,
Expand All @@ -129,6 +160,7 @@ impl<C> RaftMetrics<C>
where C: RaftTypeConfig
{
pub fn new_initial(id: C::NodeId) -> Self {
#[allow(deprecated)]
Self {
running_state: Ok(()),
id,
Expand All @@ -143,6 +175,7 @@ where C: RaftTypeConfig
state: ServerState::Follower,
current_leader: None,
millis_since_quorum_ack: None,
last_quorum_acked: None,
membership_config: Arc::new(StoredMembership::default()),
replication: None,
heartbeat: None,
Expand Down Expand Up @@ -172,8 +205,23 @@ pub struct RaftDataMetrics<C: RaftTypeConfig> {
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
#[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")]
pub millis_since_quorum_ack: Option<u64>,

/// For a leader, it is the most recently acknowledged timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This timestamp can be used by the application to assess the likelihood that the leader has
/// lost synchronization with the cluster.
/// An older value may suggest a higher probability of the leader being partitioned from the
/// cluster.
pub last_quorum_acked: Option<SerdeInstant<InstantOf<C>>>,

pub replication: Option<ReplicationMetrics<C>>,

/// Heartbeat metrics. It is Some() only when this node is leader.
Expand All @@ -194,12 +242,27 @@ where C: RaftTypeConfig

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}, heartbeat:{{{}}}",
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}",
DisplayOption(&self.last_log),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.millis_since_quorum_ack.display(),
)?;

if let Some(quorum_acked) = &self.last_quorum_acked {
write!(
f,
", quorum_acked_time:({}, {:?} ago)",
quorum_acked,
quorum_acked.elapsed()
)?;
} else {
write!(f, ", quorum_acked_time:None")?;
}

write!(
f,
", replication:{{{}}}, heartbeat:{{{}}}",
DisplayOption(&self.replication.as_ref().map(DisplayBTreeMapOptValue)),
DisplayOption(&self.heartbeat.as_ref().map(DisplayBTreeMapOptValue)),
)?;
Expand Down
Loading

0 comments on commit ab35b28

Please sign in to comment.