Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add RaftMetrics::millis_since_quorum_ack #1062

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,15 @@ where

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&self, replication: Option<ReplicationMetrics<C::NodeId>>) {
pub(crate) fn report_metrics(&mut self, replication: Option<ReplicationMetrics<C::NodeId>>) {
let last_quorum_acked = self.last_quorum_acked_time();
let millis_since_quorum_ack = last_quorum_acked.map(|t| t.elapsed().as_millis() as u64);

let st = &self.engine.state;

let membership_config = st.membership_state.effective().stored_membership().clone();
let current_leader = self.current_leader();

let m = RaftMetrics {
running_state: Ok(()),
id: self.id,
Expand All @@ -545,7 +550,8 @@ where

// --- cluster ---
state: st.server_state,
current_leader: self.current_leader(),
current_leader,
millis_since_quorum_ack,
membership_config: membership_config.clone(),

// --- replication ---
Expand All @@ -564,6 +570,7 @@ where
last_applied: st.io_applied().copied(),
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),
millis_since_quorum_ack,
replication,
};
self.tx_data_metrics.send_if_modified(|metrix| {
Expand All @@ -578,7 +585,7 @@ where
id: self.id,
vote: *st.io_state().vote(),
state: st.server_state,
current_leader: self.current_leader(),
current_leader,
membership_config,
};
self.tx_server_metrics.send_if_modified(|metrix| {
Expand Down Expand Up @@ -665,6 +672,16 @@ where
}
}

/// Retrieves the most recent timestamp that is acknowledged by a quorum.
///
/// This function returns the latest known time at which the leader received acknowledgment
/// from a quorum of followers, indicating its leadership is current and recognized.
/// If the node is not a leader or no acknowledgment has been received, `None` is returned.
fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
let leading = self.engine.internal_server_state.leading_mut();
leading.and_then(|l| l.last_quorum_acked_time())
}

pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<C::Node> {
let leader_id = match leader_id {
None => return None,
Expand Down
7 changes: 6 additions & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ where C: RaftTypeConfig
///
/// Note: This method does not check last-log-id. handle-vote-request has to deal with
/// last-log-id itself.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), RejectVoteRequest<C::NodeId>> {
// Partial ord compare:
// Vote does not has to be total ord.
Expand Down Expand Up @@ -137,7 +138,11 @@ where C: RaftTypeConfig
pub(crate) fn become_leading(&mut self) {
if let Some(l) = self.internal_server_state.leading_mut() {
if l.vote.leader_id() == self.state.vote_ref().leader_id() {
// Vote still belongs to the same leader. Just updating vote is enough.
tracing::debug!(
"vote still belongs to the same leader. Just updating vote is enough: node-{}, {}",
self.config.id,
self.state.vote_ref()
);
l.vote = *self.state.vote_ref();
self.server_state_handler().update_server_state_if_changed();
return;
Expand Down
12 changes: 12 additions & 0 deletions openraft/src/instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ pub trait Instant:
{
/// Return the current instant.
fn now() -> Self;

/// Return the amount of time since the instant.
///
/// The returned duration is guaranteed to be non-negative.
fn elapsed(&self) -> Duration {
let now = Self::now();
if now > *self {
now - *self
} else {
Duration::from_secs(0)
}
}
}

pub type TokioInstant = tokio::time::Instant;
Expand Down
19 changes: 9 additions & 10 deletions openraft/src/leader/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ where
/// Note that the leader may not be in the QuorumSet at all.
/// In such a case, the update operation will be just ignored,
/// and the quorum-acked-time is totally determined by remove voters.
#[allow(dead_code)]
pub(crate) fn quorum_acked_time(&mut self) -> Option<I> {
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<I> {
// For `Leading`, the vote is always the leader's vote.
// Thus vote.voted_for() is this node.

Expand Down Expand Up @@ -142,7 +141,7 @@ mod tests {
use crate::Vote;

#[test]
fn test_leading_quorum_acked_time_leader_is_voter() {
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 1),
vec![1, 2, 3],
Expand All @@ -153,12 +152,12 @@ mod tests {
let now1 = InstantOf::<UTConfig>::now();

let _t2 = leading.clock_progress.increase_to(&2, Some(now1));
let t1 = leading.quorum_acked_time();
let t1 = leading.last_quorum_acked_time();
assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_learner() {
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 4),
vec![1, 2, 3],
Expand All @@ -168,17 +167,17 @@ mod tests {

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}

#[test]
fn test_leading_quorum_acked_time_leader_is_not_member() {
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 5),
vec![1, 2, 3],
Expand All @@ -188,12 +187,12 @@ mod tests {

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert!(t.is_none(), "n1(leader+learner) does not count in quorum");

let t3 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&3, Some(t3));
let t = leading.quorum_acked_time();
let t = leading.last_quorum_acked_time();
assert_eq!(Some(t2), t, "n2 and n3 acked");
}
}
40 changes: 38 additions & 2 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use crate::core::ServerState;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::Fatal;
use crate::metrics::ReplicationMetrics;
use crate::summary::MessageSummary;
Expand Down Expand Up @@ -54,6 +55,21 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// The current cluster leader.
pub current_leader: Option<C::NodeId>,

/// For a leader, it is the elapsed time in milliseconds since the most recently acknowledged
/// timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This duration is used by the application to assess the likelihood that the leader has lost
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
pub millis_since_quorum_ack: Option<u64>,

/// The current membership config of the cluster.
pub membership_config: Arc<StoredMembership<C::NodeId, C::Node>>,

Expand All @@ -72,14 +88,15 @@ where C: RaftTypeConfig

write!(
f,
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}",
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}(since_last_ack:{} ms)",
self.id,
self.state,
self.current_term,
self.vote,
DisplayOption(&self.last_log_index),
DisplayOption(&self.last_applied),
DisplayOption(&self.current_leader),
DisplayOption(&self.millis_since_quorum_ack),
)?;

write!(f, ", ")?;
Expand Down Expand Up @@ -124,6 +141,7 @@ where C: RaftTypeConfig

state: ServerState::Follower,
current_leader: None,
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::default()),
replication: None,
}
Expand All @@ -138,6 +156,22 @@ pub struct RaftDataMetrics<C: RaftTypeConfig> {
pub last_applied: Option<LogId<C::NodeId>>,
pub snapshot: Option<LogId<C::NodeId>>,
pub purged: Option<LogId<C::NodeId>>,

/// For a leader, it is the elapsed time in milliseconds since the most recently acknowledged
/// timestamp by a quorum.
///
/// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum.
/// Being acknowledged means receiving a reply of
/// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`).
/// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count,
/// because a node will not maintain a lease for a vote with `committed == false`.
///
/// This duration is used by the application to assess the likelihood that the leader has lost
/// synchronization with the cluster.
/// A longer duration without acknowledgment may suggest a higher probability of the leader
/// being partitioned from the cluster.
pub millis_since_quorum_ack: Option<u64>,

pub replication: Option<ReplicationMetrics<C::NodeId>>,
}

Expand All @@ -149,11 +183,12 @@ where C: RaftTypeConfig

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, replication:{{{}}}",
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}",
DisplayOption(&self.last_log),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.millis_since_quorum_ack.display(),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
Expand Down Expand Up @@ -181,6 +216,7 @@ pub struct RaftServerMetrics<C: RaftTypeConfig> {
pub vote: Vote<C::NodeId>,
pub state: ServerState,
pub current_leader: Option<C::NodeId>,

pub membership_config: Arc<StoredMembership<C::NodeId, C::Node>>,
}

Expand Down
1 change: 1 addition & 0 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ where C: RaftTypeConfig {
purged: None,

current_leader: None,
millis_since_quorum_ack: None,
membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))),

snapshot: None,
Expand Down
1 change: 1 addition & 0 deletions tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod fixtures;
// The later tests may depend on the earlier ones.

mod t10_current_leader;
mod t10_leader_last_ack;
mod t10_purged;
mod t10_server_metrics_and_data_metrics;
mod t20_metrics_state_machine_consistency;
Expand Down
Loading
Loading