From ec42d9a1772dc48769f3f43218f43f3340ac9d2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 24 Jul 2024 15:08:58 +0800 Subject: [PATCH] Refactor: `RaftMetrics::heartbeat` stores last-acked timestamp instead of duration since last ack Other changes: Display human readable Instant in log --- openraft/src/core/raft_core.rs | 11 +- .../engine/handler/replication_handler/mod.rs | 13 +- openraft/src/metrics/mod.rs | 3 +- openraft/src/metrics/raft_metrics.rs | 3 +- openraft/src/metrics/serde_instant.rs | 6 +- .../src/progress/bench/vec_progress_update.rs | 2 +- openraft/src/progress/mod.rs | 125 +++++++++++++----- openraft/src/proposer/candidate.rs | 2 +- openraft/src/proposer/leader.rs | 10 +- openraft/src/raft/impl_raft_blocking_write.rs | 6 +- openraft/src/type_config.rs | 1 + .../t10_server_metrics_and_data_metrics.rs | 14 +- 12 files changed, 127 insertions(+), 69 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index c14ed729d..37b030f12 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -526,16 +526,7 @@ where let replication = Some(replication_prog.iter().map(|(id, p)| (*id, *p.borrow())).collect()); let clock_prog = &leader.clock_progress; - let heartbeat = Some( - clock_prog - .iter() - .map(|(id, opt_t)| { - let millis_since_last_ack = opt_t.map(|t| t.elapsed().as_millis() as u64); - - (*id, millis_since_last_ack) - }) - .collect(), - ); + let heartbeat = Some(clock_prog.iter().map(|(id, opt_t)| (*id, opt_t.map(SerdeInstant::new))).collect()); (replication, heartbeat) } else { diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 8d703eb42..e5c07b3c8 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -102,19 +102,19 @@ where C: RaftTypeConfig { let end = self.state.last_log_id().next_index(); - let default_v = ProgressEntry::empty(end); + let default_v = || ProgressEntry::empty(end); let old_progress = self.leader.progress.clone(); self.leader.progress = - old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), &learner_ids, default_v); + old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), learner_ids.clone(), default_v); } { let old_progress = self.leader.clock_progress.clone(); self.leader.clock_progress = - old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), &learner_ids, None); + old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), learner_ids, || None); } } @@ -159,7 +159,12 @@ where C: RaftTypeConfig tracing::debug!( granted = display(granted.as_ref().map(|x| x.display()).display()), - clock_progress = debug(&self.leader.clock_progress), + clock_progress = display( + &self + .leader + .clock_progress + .display_with(|f, id, v| { write!(f, "{}: {}", id, v.as_ref().map(|x| x.display()).display()) }) + ), "granted leader vote clock after updating" ); diff --git a/openraft/src/metrics/mod.rs b/openraft/src/metrics/mod.rs index f62f337a4..16ba028be 100644 --- a/openraft/src/metrics/mod.rs +++ b/openraft/src/metrics/mod.rs @@ -50,8 +50,9 @@ pub(crate) use wait_condition::Condition; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::NodeIdOf; +use crate::type_config::alias::SerdeInstantOf; pub(crate) type ReplicationMetrics = BTreeMap, Option>>; /// Heartbeat metrics, a mapping between a node's ID and milliseconds since the /// last acknowledged heartbeat or replication to this node. -pub(crate) type HeartbeatMetrics = BTreeMap, Option>; +pub(crate) type HeartbeatMetrics = BTreeMap, Option>>; diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index 552e6c55c..803b6ca92 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -9,6 +9,7 @@ use crate::metrics::HeartbeatMetrics; use crate::metrics::ReplicationMetrics; use crate::metrics::SerdeInstant; use crate::type_config::alias::InstantOf; +use crate::type_config::alias::SerdeInstantOf; use crate::Instant; use crate::LogId; use crate::RaftTypeConfig; @@ -90,7 +91,7 @@ pub struct RaftMetrics { /// lost synchronization with the cluster. /// An older value may suggest a higher probability of the leader being partitioned from the /// cluster. - pub last_quorum_acked: Option>>, + pub last_quorum_acked: Option>, /// The current membership config of the cluster. pub membership_config: Arc>, diff --git a/openraft/src/metrics/serde_instant.rs b/openraft/src/metrics/serde_instant.rs index 42d301143..117e0a629 100644 --- a/openraft/src/metrics/serde_instant.rs +++ b/openraft/src/metrics/serde_instant.rs @@ -147,7 +147,7 @@ mod serde_impl { use super::SerdeInstant; use crate::engine::testing::UTConfig; - use crate::type_config::alias::InstantOf; + use crate::type_config::alias::SerdeInstantOf; use crate::type_config::TypeConfigExt; #[test] @@ -158,7 +158,7 @@ mod serde_impl { println!("json: {}", json); println!("Now: {:?}", now); - let deserialized: SerdeInstant> = serde_json::from_str(&json).unwrap(); + let deserialized: SerdeInstantOf = serde_json::from_str(&json).unwrap(); println!("Des: {:?}", *deserialized); // Convert Instant to SerdeInstant is inaccurate. @@ -171,7 +171,7 @@ mod serde_impl { // Test serialization format let nano = "1721829051211301916"; - let deserialized: SerdeInstant> = serde_json::from_str(nano).unwrap(); + let deserialized: SerdeInstantOf = serde_json::from_str(nano).unwrap(); let serialized = serde_json::to_string(&deserialized).unwrap(); assert_eq!( diff --git a/openraft/src/progress/bench/vec_progress_update.rs b/openraft/src/progress/bench/vec_progress_update.rs index 814e81de5..3cb5d3f1f 100644 --- a/openraft/src/progress/bench/vec_progress_update.rs +++ b/openraft/src/progress/bench/vec_progress_update.rs @@ -11,7 +11,7 @@ use crate::quorum::Joint; fn progress_update_01234_567(b: &mut Bencher) { let membership: Vec> = vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7]]; let quorum_set = Joint::from(membership); - let mut progress = VecProgress::::new(quorum_set, 0..=7, 0); + let mut progress = VecProgress::::new(quorum_set, 0..=7, || 0); let mut id = 0u64; let mut values = [0, 1, 2, 3, 4, 5, 6, 7]; diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index 5f3335148..db1e19b74 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -28,13 +28,14 @@ use crate::quorum::QuorumSet; /// /// When one of the value is updated, it uses a `QuorumSet` to calculate the committed value. /// `ID` is the identifier of every progress value. -/// `V` is type of a progress entry. +/// `V` is type of progress entry. /// `P` is the progress data of `V`, a progress entry `V` could contain other user data. /// `QS` is a quorum set implementation. pub(crate) trait Progress where - ID: 'static, + ID: PartialEq + 'static, V: Borrow

, + P: PartialOrd + Copy, QS: QuorumSet, { /// Update one of the scalar value and re-calculate the committed value with provided function. @@ -91,12 +92,17 @@ where fn iter(&self) -> Iter<(ID, V)>; /// Build a new instance with the new quorum set, inheriting progress data from `self`. - fn upgrade_quorum_set(self, quorum_set: QS, learner_ids: &[ID], default_v: V) -> Self; + fn upgrade_quorum_set( + self, + quorum_set: QS, + learner_ids: impl IntoIterator, + default_v: impl Fn() -> V, + ) -> Self; /// Return if the given id is a voter. /// /// A voter is a node in the quorum set that can grant a value. - /// A learner's progress is also tracked but it will never grant a value. + /// A learner's progress is also tracked, but it will never grant a value. /// /// If the given id is not in this `Progress`, it returns `None`. fn is_voter(&self, id: &ID) -> Option; @@ -128,7 +134,7 @@ where /// /// The first `voter_count` elements are voters, the left are learners. /// Learner elements are always still. - /// A voter element will be moved up to keep them in a descending order, when a new value is + /// A voter element will be moved up to keep them in descending order, when a new value is /// updated. vector: Vec<(ID, V)>, @@ -160,6 +166,38 @@ where } } +pub struct DisplayVecProgress<'a, ID, V, P, QS, Fmt> +where + ID: 'static, + QS: QuorumSet, + Fmt: Fn(&mut Formatter<'_>, &ID, &V) -> std::fmt::Result, +{ + inner: &'a VecProgress, + f: Fmt, +} + +impl<'a, ID, V, P, QS, Fmt> Display for DisplayVecProgress<'a, ID, V, P, QS, Fmt> +where + ID: PartialEq + 'static, + V: Borrow

, + P: PartialOrd + Copy, + QS: QuorumSet, + Fmt: Fn(&mut Formatter<'_>, &ID, &V) -> std::fmt::Result, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + for (i, (id, v)) in self.inner.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + (self.f)(f, id, v)?; + } + write!(f, "}}")?; + + Ok(()) + } +} + #[derive(Clone, Debug, Default)] #[derive(PartialEq, Eq)] pub(crate) struct Stat { @@ -170,22 +208,21 @@ pub(crate) struct Stat { impl VecProgress where - ID: PartialEq + Copy + Debug + 'static, - V: Copy + 'static, + ID: 'static, V: Borrow

, - P: PartialOrd + Ord + Copy + 'static, QS: QuorumSet, + P: Copy, { - pub(crate) fn new(quorum_set: QS, learner_ids: impl IntoIterator, default_v: V) -> Self { - let mut vector = quorum_set.ids().map(|id| (id, default_v)).collect::>(); + pub(crate) fn new(quorum_set: QS, learner_ids: impl IntoIterator, default_v: impl Fn() -> V) -> Self { + let mut vector = quorum_set.ids().map(|id| (id, default_v())).collect::>(); let voter_count = vector.len(); - vector.extend(learner_ids.into_iter().map(|id| (id, default_v))); + vector.extend(learner_ids.into_iter().map(|id| (id, default_v()))); Self { quorum_set, - granted: *default_v.borrow(), + granted: *default_v().borrow(), voter_count, vector, stat: Default::default(), @@ -194,7 +231,8 @@ where /// Find the index in of the specified id. #[inline(always)] - pub(crate) fn index(&self, target: &ID) -> Option { + pub(crate) fn index(&self, target: &ID) -> Option + where ID: PartialEq { for (i, elt) in self.vector.iter().enumerate() { if elt.0 == *target { return Some(i); @@ -206,7 +244,8 @@ where /// Move an element at `index` up so that all the values greater than `committed` are sorted. #[inline(always)] - fn move_up(&mut self, index: usize) -> usize { + fn move_up(&mut self, index: usize) -> usize + where P: PartialOrd { self.stat.move_count += 1; for i in (0..index).rev() { if self.vector[i].1.borrow() < self.vector[i + 1].1.borrow() { @@ -223,19 +262,27 @@ where self.vector.iter_mut() } + pub(crate) fn into_iter(self) -> impl Iterator { + self.vector.into_iter() + } + #[allow(dead_code)] pub(crate) fn stat(&self) -> &Stat { &self.stat } + + pub(crate) fn display_with(&self, f: Fmt) -> DisplayVecProgress + where Fmt: Fn(&mut Formatter<'_>, &ID, &V) -> std::fmt::Result { + DisplayVecProgress { inner: self, f } + } } impl Progress for VecProgress where - ID: PartialEq + Debug + Copy + 'static, - V: Copy + 'static, + ID: PartialEq + 'static, V: Borrow

, - P: PartialOrd + Ord + Copy + 'static, - QS: QuorumSet + 'static, + P: PartialOrd + Copy, + QS: QuorumSet, { /// Update one of the scalar value and re-calculate the committed value. /// @@ -271,7 +318,10 @@ where /// - update(c, 4): re-calc: committed becomes 4; /// - update(c, 6): re-calc: committed becomes 5; fn update_with(&mut self, id: &ID, f: F) -> Result<&P, &P> - where F: FnOnce(&mut V) { + where + F: FnOnce(&mut V), + ID: PartialEq, + { self.stat.update_count += 1; let index = match self.index(id) { @@ -364,13 +414,18 @@ where self.vector.as_slice().iter() } - fn upgrade_quorum_set(self, quorum_set: QS, leaner_ids: &[ID], default_v: V) -> Self { - let mut new_prog = Self::new(quorum_set, leaner_ids.iter().copied(), default_v); + fn upgrade_quorum_set( + self, + quorum_set: QS, + leaner_ids: impl IntoIterator, + default_v: impl Fn() -> V, + ) -> Self { + let mut new_prog = Self::new(quorum_set, leaner_ids, default_v); new_prog.stat = self.stat.clone(); - for (id, v) in self.iter() { - let _ = new_prog.update(id, *v); + for (id, v) in self.into_iter() { + let _ = new_prog.update(&id, v); } new_prog } @@ -392,7 +447,7 @@ mod t { #[test] fn vec_progress_new() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7], 0); + let progress = VecProgress::::new(quorum_set, [6, 7], || 0); assert_eq!( vec![ @@ -415,7 +470,7 @@ mod t { #[test] fn vec_progress_get() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6, 7], 0); + let mut progress = VecProgress::::new(quorum_set, [6, 7], || 0); let _ = progress.update(&6, 5); assert_eq!(&5, progress.get(&6)); @@ -436,7 +491,7 @@ mod t { #[test] fn vec_progress_iter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6, 7], 0); + let mut progress = VecProgress::::new(quorum_set, [6, 7], || 0); let _ = progress.update(&7, 7); let _ = progress.update(&3, 3); @@ -463,7 +518,7 @@ mod t { #[test] fn vec_progress_move_up() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6], 0); + let mut progress = VecProgress::::new(quorum_set, [6], || 0); // initial: 0-0, 1-0, 2-0, 3-0, 4-0 let cases = [ @@ -471,7 +526,7 @@ mod t { ((2, 3), &[(2, 3), (1, 2), (0, 0), (3, 0), (4, 0), (6, 0)], 0), // ((1, 3), &[(2, 3), (1, 3), (0, 0), (3, 0), (4, 0), (6, 0)], 1), // no move ((4, 8), &[(4, 8), (2, 3), (1, 3), (0, 0), (3, 0), (6, 0)], 0), // - ((0, 5), &[(4, 8), (0, 5), (2, 3), (1, 3), (3, 0), (6, 0)], 1), // move to 1th + ((0, 5), &[(4, 8), (0, 5), (2, 3), (1, 3), (3, 0), (6, 0)], 1), // move to 1st ]; for (ith, ((id, v), want_vec, want_new_index)) in cases.iter().enumerate() { // Update a value and move it up to keep the order. @@ -495,7 +550,7 @@ mod t { #[test] fn vec_progress_update() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6], 0); + let mut progress = VecProgress::::new(quorum_set, [6], || 0); // initial: 0,0,0,0,0 let cases = vec![ @@ -537,7 +592,7 @@ mod t { let pv = |p, user_data| ProgressEntry { progress: p, user_data }; let quorum_set: Vec = vec![0, 1, 2]; - let mut progress = VecProgress::::new(quorum_set, [3], pv(0, "foo")); + let mut progress = VecProgress::::new(quorum_set, [3], || pv(0, "foo")); // initial: 0,0,0,0 let cases = [ @@ -565,7 +620,7 @@ mod t { #[test] fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6], 0); + let mut progress = VecProgress::::new(quorum_set, [6], || 0); assert_eq!(Some(5), progress.index(&6)); @@ -585,7 +640,7 @@ mod t { // Initially, committed is 5 - let mut p012 = VecProgress::::new(qs012, [5], 0); + let mut p012 = VecProgress::::new(qs012, [5], || 0); let _ = p012.update(&0, 5); let _ = p012.update(&1, 6); @@ -594,7 +649,7 @@ mod t { // After upgrading to a bigger quorum set, committed fall back to 0 - let mut p012_345 = p012.upgrade_quorum_set(qs012_345, &[6], 0); + let mut p012_345 = p012.upgrade_quorum_set(qs012_345, [6], || 0); assert_eq!( &0, p012_345.granted(), @@ -608,7 +663,7 @@ mod t { let _ = p012_345.update(&4, 8); assert_eq!(&5, p012_345.granted()); - let p345 = p012_345.upgrade_quorum_set(qs345, &[1], 0); + let p345 = p012_345.upgrade_quorum_set(qs345, [1], || 0); assert_eq!(&8, p345.granted(), "shrink quorum set, greater value becomes committed"); assert_eq!(&6, p345.get(&1), "inherit voter progress"); @@ -619,7 +674,7 @@ mod t { #[test] fn vec_progress_is_voter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7], 0); + let progress = VecProgress::::new(quorum_set, [6, 7], || 0); assert_eq!(Some(true), progress.is_voter(&1)); assert_eq!(Some(true), progress.is_voter(&3)); diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 2d4a63045..1747b8f78 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -69,7 +69,7 @@ where starting_time, vote, last_log_id, - progress: VecProgress::new(quorum_set.clone(), [], false), + progress: VecProgress::new(quorum_set.clone(), [], || false), quorum_set, learner_ids: learner_ids.into_iter().collect::>(), } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index b2778bf77..93e24cd9e 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -114,12 +114,10 @@ where next_heartbeat: C::now(), last_log_id, noop_log_id, - progress: VecProgress::new( - quorum_set.clone(), - learner_ids.iter().copied(), - ProgressEntry::empty(last_log_id.next_index()), - ), - clock_progress: VecProgress::new(quorum_set, learner_ids, None), + progress: VecProgress::new(quorum_set.clone(), learner_ids.iter().copied(), || { + ProgressEntry::empty(last_log_id.next_index()) + }), + clock_progress: VecProgress::new(quorum_set, learner_ids, || None), }; // Update progress for this Leader. diff --git a/openraft/src/raft/impl_raft_blocking_write.rs b/openraft/src/raft/impl_raft_blocking_write.rs index 845cb1e09..e3470095e 100644 --- a/openraft/src/raft/impl_raft_blocking_write.rs +++ b/openraft/src/raft/impl_raft_blocking_write.rs @@ -5,6 +5,7 @@ use maplit::btreemap; use crate::core::raft_msg::RaftMsg; +use crate::display_ext::DisplayResult; use crate::error::ClientWriteError; use crate::error::RaftError; use crate::raft::message::ClientWriteResult; @@ -165,7 +166,10 @@ where C: RaftTypeConfig> ) .await; - tracing::info!(wait_res = debug(&wait_res), "waiting for replication to new learner"); + tracing::info!( + wait_res = display(DisplayResult(&wait_res)), + "waiting for replication to new learner" + ); Ok(resp) } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index e1482c3aa..0ef15d850 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -142,4 +142,5 @@ pub mod alias { pub type VoteOf = crate::Vote>; pub type LeaderIdOf = crate::LeaderId>; pub type CommittedLeaderIdOf = crate::CommittedLeaderId>; + pub type SerdeInstantOf = crate::metrics::SerdeInstant>; } diff --git a/tests/tests/metrics/t10_server_metrics_and_data_metrics.rs b/tests/tests/metrics/t10_server_metrics_and_data_metrics.rs index 3e391aaaa..af722cd16 100644 --- a/tests/tests/metrics/t10_server_metrics_and_data_metrics.rs +++ b/tests/tests/metrics/t10_server_metrics_and_data_metrics.rs @@ -92,7 +92,9 @@ async fn heartbeat_metrics() -> Result<()> { let refreshed_node1; let refreshed_node2; { + let now = TypeConfig::now(); leader.trigger().heartbeat().await?; + let metrics = leader .wait(timeout()) .metrics( @@ -104,7 +106,7 @@ async fn heartbeat_metrics() -> Result<()> { let node1 = heartbeat.get(&1).unwrap().unwrap(); let node2 = heartbeat.get(&2).unwrap().unwrap(); - (node1 < 100) && (node2 < 100) + (*node1 >= now) && (*node2 >= now) }, "millis_since_quorum_ack refreshed", ) @@ -118,7 +120,7 @@ async fn heartbeat_metrics() -> Result<()> { refreshed_node2 = heartbeat.get(&2).unwrap().unwrap(); } - tracing::info!(log_index, "--- sleep 500 ms, the interval should extend"); + tracing::info!(log_index, "--- sleep 500 ms, the acked time should not change"); { TypeConfig::sleep(Duration::from_millis(500)).await; @@ -129,11 +131,11 @@ async fn heartbeat_metrics() -> Result<()> { .as_ref() .expect("expect heartbeat to be Some as metrics come from the leader node"); - let greater_node1 = heartbeat.get(&1).unwrap().unwrap(); - let greater_node2 = heartbeat.get(&2).unwrap().unwrap(); + let got_node1 = heartbeat.get(&1).unwrap().unwrap(); + let got_node2 = heartbeat.get(&2).unwrap().unwrap(); - assert!(greater_node1 > refreshed_node1); - assert!(greater_node2 > refreshed_node2); + assert!(got_node1 == refreshed_node1); + assert!(got_node2 == refreshed_node2); } Ok(())