Skip to content

Commit

Permalink
Feature: add RaftMetrics::millis_since_quorum_ack
Browse files Browse the repository at this point in the history
`RaftMetrics::millis_since_quorum_ack` is the interval in milliseconds
since the last timestamp a quorum acknowledged.

This duration is used by the application to assess the likelihood that
the leader has lost synchronization with the cluster.
A longer duration without acknowledgment may suggest a higher
probability of the leader being partitioned from the cluster.
  • Loading branch information
drmingdrmer committed Mar 17, 2024
1 parent a9e2fc4 commit 73cd572
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 16 deletions.
23 changes: 20 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,15 @@ where

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&self, replication: Option<ReplicationMetrics<C::NodeId>>) {
pub(crate) fn report_metrics(&mut self, replication: Option<ReplicationMetrics<C::NodeId>>) {
let last_quorum_acked = self.last_quorum_acked_time();
let millis_since_quorum_ack = last_quorum_acked.map(|t| t.elapsed().as_millis() as u64);

let st = &self.engine.state;

let membership_config = st.membership_state.effective().stored_membership().clone();
let current_leader = self.current_leader();

let m = RaftMetrics {
running_state: Ok(()),
id: self.id,
Expand All @@ -545,7 +550,8 @@ where

// --- cluster ---
state: st.server_state,
current_leader: self.current_leader(),
current_leader,
millis_since_quorum_ack,
membership_config: membership_config.clone(),

// --- replication ---
Expand All @@ -564,6 +570,7 @@ where
last_applied: st.io_applied().copied(),
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),
millis_since_quorum_ack,
replication,
};
self.tx_data_metrics.send_if_modified(|metrix| {
Expand All @@ -578,7 +585,7 @@ where
id: self.id,
vote: *st.io_state().vote(),
state: st.server_state,
current_leader: self.current_leader(),
current_leader,
membership_config,
};
self.tx_server_metrics.send_if_modified(|metrix| {
Expand Down Expand Up @@ -665,6 +672,16 @@ where
}
}

/// Retrieves the most recent timestamp that is acknowledged by a quorum.
///
/// This function returns the latest known time at which the leader received acknowledgment
/// from a quorum of followers, indicating its leadership is current and recognized.
/// If the node is not a leader or no acknowledgment has been received, `None` is returned.
fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
let leading = self.engine.internal_server_state.leading_mut();
leading.and_then(|l| l.last_quorum_acked_time())
}

pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<C::Node> {
let leader_id = match leader_id {
None => return None,
Expand Down
7 changes: 6 additions & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ where C: RaftTypeConfig
///
/// Note: This method does not check last-log-id. handle-vote-request has to deal with
/// last-log-id itself.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), RejectVoteRequest<C::NodeId>> {
// Partial ord compare:
// Vote does not has to be total ord.
Expand Down Expand Up @@ -137,7 +138,11 @@ where C: RaftTypeConfig
pub(crate) fn become_leading(&mut self) {
if let Some(l) = self.internal_server_state.leading_mut() {
if l.vote.leader_id() == self.state.vote_ref().leader_id() {
// Vote still belongs to the same leader. Just updating vote is enough.
tracing::debug!(
"vote still belongs to the same leader. Just updating vote is enough: node-{}, {}",
self.config.id,
self.state.vote_ref()
);
l.vote = *self.state.vote_ref();
self.server_state_handler().update_server_state_if_changed();
return;
Expand Down
12 changes: 12 additions & 0 deletions openraft/src/instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ pub trait Instant:
{
/// Return the current instant.
fn now() -> Self;

/// Return the amount of time since the instant.
///
/// The returned duration is guaranteed to be non-negative.
fn elapsed(&self) -> Duration {
let now = Self::now();
if now > *self {
now - *self
} else {
Duration::from_secs(0)
}
}
}

pub type TokioInstant = tokio::time::Instant;
Expand Down
19 changes: 9 additions & 10 deletions openraft/src/leader/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ where
/// Note that the leader may not be in the QuorumSet at all.
/// In such a case, the update operation will be just ignored,
/// and the quorum-acked-time is totally determined by remove voters.
#[allow(dead_code)]
pub(crate) fn quorum_acked_time(&mut self) -> Option<I> {
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<I> {
// For `Leading`, the vote is always the leader's vote.
// Thus vote.voted_for() is this node.

Expand Down Expand Up @@ -142,7 +141,7 @@ mod tests {
use crate::Vote;

#[test]
fn test_leading_quorum_acked_time_leader_is_voter() {
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 1),
vec![1, 2, 3],
Expand All @@ -153,12 +152,12 @@ mod tests {
let now1 = InstantOf::<UTConfig>::now();

let _t2 = leading.clock_progress.increase_to(&2, Some(now1));
let t1 = leading.quorum_acked_time();
let t1 = leading.last_quorum_acked_time();
assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_learner() {
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 4),
vec![1, 2, 3],
Expand All @@ -168,17 +167,17 @@ mod tests {

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_not_member() {
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 5),
vec![1, 2, 3],
Expand All @@ -188,12 +187,12 @@ mod tests {

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
}
40 changes: 38 additions & 2 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use crate::core::ServerState;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::Fatal;
use crate::metrics::ReplicationMetrics;
use crate::summary::MessageSummary;
Expand Down Expand Up @@ -54,6 +55,21 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// The current cluster leader.
pub current_leader: Option<C::NodeId>,

/// For a leader, it is the elapsed time in milliseconds since the most recently acknowledged
/// timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This duration is used by the application to assess the likelihood that the leader has lost
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
pub millis_since_quorum_ack: Option<u64>,

/// The current membership config of the cluster.
pub membership_config: Arc<StoredMembership<C::NodeId, C::Node>>,

Expand All @@ -72,14 +88,15 @@ where C: RaftTypeConfig

write!(
f,
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}",
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}(since_last_ack:{} ms)",
self.id,
self.state,
self.current_term,
self.vote,
DisplayOption(&self.last_log_index),
DisplayOption(&self.last_applied),
DisplayOption(&self.current_leader),
DisplayOption(&self.millis_since_quorum_ack),
)?;

write!(f, ", ")?;
Expand Down Expand Up @@ -124,6 +141,7 @@ where C: RaftTypeConfig

state: ServerState::Follower,
current_leader: None,
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::default()),
replication: None,
}
Expand All @@ -138,6 +156,22 @@ pub struct RaftDataMetrics<C: RaftTypeConfig> {
pub last_applied: Option<LogId<C::NodeId>>,
pub snapshot: Option<LogId<C::NodeId>>,
pub purged: Option<LogId<C::NodeId>>,

/// For a leader, it is the elapsed time in milliseconds since the most recently acknowledged
/// timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This duration is used by the application to assess the likelihood that the leader has lost
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
pub millis_since_quorum_ack: Option<u64>,

pub replication: Option<ReplicationMetrics<C::NodeId>>,
}

Expand All @@ -149,11 +183,12 @@ where C: RaftTypeConfig

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, replication:{{{}}}",
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}",
DisplayOption(&self.last_log),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.millis_since_quorum_ack.display(),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
Expand Down Expand Up @@ -181,6 +216,7 @@ pub struct RaftServerMetrics<C: RaftTypeConfig> {
pub vote: Vote<C::NodeId>,
pub state: ServerState,
pub current_leader: Option<C::NodeId>,

pub membership_config: Arc<StoredMembership<C::NodeId, C::Node>>,
}

Expand Down
1 change: 1 addition & 0 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ where C: RaftTypeConfig {
purged: None,

current_leader: None,
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))),

snapshot: None,
Expand Down
1 change: 1 addition & 0 deletions tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod fixtures;
// The later tests may depend on the earlier ones.

mod t10_current_leader;
mod t10_leader_last_ack;
mod t10_purged;
mod t10_server_metrics_and_data_metrics;
mod t20_metrics_state_machine_consistency;
Expand Down
Loading

0 comments on commit 73cd572

Please sign in to comment.