From 717ed633f0a2cc5e59a9b0c3ebfc34806c4f0a45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 15 Mar 2024 10:34:19 +0800 Subject: [PATCH] Change: Consolidate `RaftMetrics` type parameters into `C` The `RaftMetrics` type, along with related metric types, previously used separate type parameters (`NID`, `N`) which have now been replaced with a single generic parameter `C` bound by `RaftTypeConfig`. This modification simplifies the type signatures and usage. Upgrade tip: To adapt to this change, update the metric-related type parameters from separate `NID` and `N` to the single generic `C` constrained by `RaftTypeConfig`: ```rust RaftMetrics --> RaftMetrics RaftDataMetrics --> RaftDataMetrics RaftServerMetrics --> RaftServerMetrics Metric --> Metric Wait --> Wait ``` --- .../src/api.rs | 3 +- .../src/lib.rs | 2 +- .../src/api.rs | 3 +- .../src/lib.rs | 2 +- .../src/api.rs | 3 +- .../src/lib.rs | 2 +- examples/raft-kv-memstore/src/client.rs | 4 +- .../src/network/management.rs | 3 +- examples/raft-kv-rocksdb/src/client.rs | 3 +- .../raft-kv-rocksdb/src/network/management.rs | 3 +- openraft/src/core/raft_core.rs | 6 +- openraft/src/engine/mod.rs | 2 +- openraft/src/metrics/metric.rs | 37 ++++---- openraft/src/metrics/metric_display.rs | 12 +-- openraft/src/metrics/raft_metrics.rs | 95 ++++++++----------- openraft/src/metrics/wait.rs | 86 +++++++---------- openraft/src/metrics/wait_condition.rs | 24 ++--- openraft/src/metrics/wait_test.rs | 42 ++++---- openraft/src/raft/mod.rs | 12 +-- openraft/src/raft/raft_inner.rs | 6 +- tests/tests/fixtures/mod.rs | 11 +-- 21 files changed, 157 insertions(+), 204 deletions(-) diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs index 89394adc5..62550634f 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs @@ -14,6 +14,7 @@ use crate::decode; use crate::encode; use crate::typ; use crate::NodeId; +use crate::TypeConfig; pub async fn write(app: &mut App, req: String) -> String { let res = app.raft.client_write(decode(&req)).await; @@ -99,6 +100,6 @@ pub async fn init(app: &mut App) -> String { pub async fn metrics(app: &mut App) -> String { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result, Infallible> = Ok(metrics); encode(res) } diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs index 24356a7bd..b6cb211e9 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs @@ -58,7 +58,7 @@ pub mod typ { pub type RPCError = openraft::error::RPCError>; pub type StreamingError = openraft::error::StreamingError; - pub type RaftMetrics = openraft::RaftMetrics; + pub type RaftMetrics = openraft::RaftMetrics; pub type ClientWriteError = openraft::error::ClientWriteError; pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs index 89394adc5..62550634f 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs @@ -14,6 +14,7 @@ use crate::decode; use crate::encode; use crate::typ; use crate::NodeId; +use crate::TypeConfig; pub async fn write(app: &mut App, req: String) -> String { let res = app.raft.client_write(decode(&req)).await; @@ -99,6 +100,6 @@ pub async fn init(app: &mut App) -> String { pub async fn metrics(app: &mut App) -> String { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result, Infallible> = Ok(metrics); encode(res) } diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs index b7cc20a0c..ca46be7f3 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs @@ -57,7 +57,7 @@ pub mod typ { pub type RPCError = openraft::error::RPCError>; pub type StreamingError = openraft::error::StreamingError; - pub type RaftMetrics = openraft::RaftMetrics; + pub type RaftMetrics = openraft::RaftMetrics; pub type ClientWriteError = openraft::error::ClientWriteError; pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; diff --git a/examples/raft-kv-memstore-singlethreaded/src/api.rs b/examples/raft-kv-memstore-singlethreaded/src/api.rs index 52684de5f..2ce84589b 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/api.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/api.rs @@ -13,6 +13,7 @@ use crate::app::App; use crate::decode; use crate::encode; use crate::NodeId; +use crate::TypeConfig; pub async fn write(app: &mut App, req: String) -> String { let res = app.raft.client_write(decode(&req)).await; @@ -88,6 +89,6 @@ pub async fn init(app: &mut App) -> String { pub async fn metrics(app: &mut App) -> String { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result, Infallible> = Ok(metrics); encode(res) } diff --git a/examples/raft-kv-memstore-singlethreaded/src/lib.rs b/examples/raft-kv-memstore-singlethreaded/src/lib.rs index e3c4f8185..c6ff3323c 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/lib.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/lib.rs @@ -70,7 +70,7 @@ pub mod typ { pub type RaftError = openraft::error::RaftError; pub type RPCError = openraft::error::RPCError>; - pub type RaftMetrics = openraft::RaftMetrics; + pub type RaftMetrics = openraft::RaftMetrics; pub type ClientWriteError = openraft::error::ClientWriteError; pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; diff --git a/examples/raft-kv-memstore/src/client.rs b/examples/raft-kv-memstore/src/client.rs index 99c57e530..10eff8b3b 100644 --- a/examples/raft-kv-memstore/src/client.rs +++ b/examples/raft-kv-memstore/src/client.rs @@ -6,7 +6,6 @@ use std::time::Duration; use openraft::error::ForwardToLeader; use openraft::error::NetworkError; use openraft::error::RemoteError; -use openraft::BasicNode; use openraft::RaftMetrics; use openraft::TryAsRef; use serde::de::DeserializeOwned; @@ -17,6 +16,7 @@ use tokio::time::timeout; use crate::typ; use crate::NodeId; use crate::Request; +use crate::TypeConfig; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Empty {} @@ -103,7 +103,7 @@ impl ExampleClient { /// Metrics contains various information about the cluster, such as current leader, /// membership config, replication status etc. /// See [`RaftMetrics`]. - pub async fn metrics(&self) -> Result, typ::RPCError> { + pub async fn metrics(&self) -> Result, typ::RPCError> { self.do_send_rpc_to_leader("metrics", None::<&()>).await } diff --git a/examples/raft-kv-memstore/src/network/management.rs b/examples/raft-kv-memstore/src/network/management.rs index 912cdbf42..1e20cf0f9 100644 --- a/examples/raft-kv-memstore/src/network/management.rs +++ b/examples/raft-kv-memstore/src/network/management.rs @@ -12,6 +12,7 @@ use openraft::RaftMetrics; use crate::app::App; use crate::NodeId; +use crate::TypeConfig; // --- Cluster management @@ -49,6 +50,6 @@ pub async fn init(app: Data) -> actix_web::Result { pub async fn metrics(app: Data) -> actix_web::Result { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result, Infallible> = Ok(metrics); Ok(Json(res)) } diff --git a/examples/raft-kv-rocksdb/src/client.rs b/examples/raft-kv-rocksdb/src/client.rs index f784f6137..00ff3aacc 100644 --- a/examples/raft-kv-rocksdb/src/client.rs +++ b/examples/raft-kv-rocksdb/src/client.rs @@ -16,6 +16,7 @@ use crate::typ; use crate::Node; use crate::NodeId; use crate::Request; +use crate::TypeConfig; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Empty {} @@ -102,7 +103,7 @@ impl ExampleClient { /// Metrics contains various information about the cluster, such as current leader, /// membership config, replication status etc. /// See [`RaftMetrics`]. - pub async fn metrics(&self) -> Result, typ::RPCError> { + pub async fn metrics(&self) -> Result, typ::RPCError> { self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await } diff --git a/examples/raft-kv-rocksdb/src/network/management.rs b/examples/raft-kv-rocksdb/src/network/management.rs index 20ba75de8..ab1fc10b0 100644 --- a/examples/raft-kv-rocksdb/src/network/management.rs +++ b/examples/raft-kv-rocksdb/src/network/management.rs @@ -13,6 +13,7 @@ use crate::app::App; use crate::Node; use crate::NodeId; use crate::Server; +use crate::TypeConfig; // --- Cluster management @@ -60,6 +61,6 @@ async fn init(req: Request>) -> tide::Result { async fn metrics(req: Request>) -> tide::Result { let metrics = req.state().raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result, Infallible> = Ok(metrics); Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build()) } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 5f7733e4c..ba29029ad 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -196,9 +196,9 @@ where /// A Receiver to receive callback from other components. pub(crate) rx_notify: mpsc::UnboundedReceiver>, - pub(crate) tx_metrics: watch::Sender>, - pub(crate) tx_data_metrics: watch::Sender>, - pub(crate) tx_server_metrics: watch::Sender>, + pub(crate) tx_metrics: watch::Sender>, + pub(crate) tx_data_metrics: watch::Sender>, + pub(crate) tx_server_metrics: watch::Sender>, pub(crate) command_state: CommandState, diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index ad051db6e..09722b02f 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -48,7 +48,7 @@ mod tests { mod startup_test; mod trigger_purge_log_test; } -#[cfg(test)] mod testing; +#[cfg(test)] pub(crate) mod testing; pub(crate) use command::Command; pub(crate) use command::Condition; diff --git a/openraft/src/metrics/metric.rs b/openraft/src/metrics/metric.rs index 48a9a7c0a..15c876c60 100644 --- a/openraft/src/metrics/metric.rs +++ b/openraft/src/metrics/metric.rs @@ -3,29 +3,28 @@ use std::cmp::Ordering; use crate::metrics::metric_display::MetricDisplay; use crate::LogId; use crate::LogIdOptionExt; -use crate::Node; -use crate::NodeId; use crate::RaftMetrics; +use crate::RaftTypeConfig; use crate::Vote; /// A metric entry of a Raft node. /// /// This is used to specify which metric to observe. #[derive(Debug)] -pub enum Metric -where NID: NodeId +pub enum Metric +where C: RaftTypeConfig { Term(u64), - Vote(Vote), + Vote(Vote), LastLogIndex(Option), - Applied(Option>), + Applied(Option>), AppliedIndex(Option), - Snapshot(Option>), - Purged(Option>), + Snapshot(Option>), + Purged(Option>), } -impl Metric -where NID: NodeId +impl Metric +where C: RaftTypeConfig { pub(crate) fn name(&self) -> &'static str { match self { @@ -39,18 +38,16 @@ where NID: NodeId } } - pub(crate) fn value(&self) -> MetricDisplay<'_, NID> { + pub(crate) fn value(&self) -> MetricDisplay<'_, C> { MetricDisplay { metric: self } } } /// Metric can be compared with RaftMetrics by comparing the corresponding field of RaftMetrics. -impl PartialEq> for RaftMetrics -where - NID: NodeId, - N: Node, +impl PartialEq> for RaftMetrics +where C: RaftTypeConfig { - fn eq(&self, other: &Metric) -> bool { + fn eq(&self, other: &Metric) -> bool { match other { Metric::Term(v) => self.current_term == *v, Metric::Vote(v) => &self.vote == v, @@ -64,12 +61,10 @@ where } /// Metric can be compared with RaftMetrics by comparing the corresponding field of RaftMetrics. -impl PartialOrd> for RaftMetrics -where - NID: NodeId, - N: Node, +impl PartialOrd> for RaftMetrics +where C: RaftTypeConfig { - fn partial_cmp(&self, other: &Metric) -> Option { + fn partial_cmp(&self, other: &Metric) -> Option { match other { Metric::Term(v) => Some(self.current_term.cmp(v)), Metric::Vote(v) => self.vote.partial_cmp(v), diff --git a/openraft/src/metrics/metric_display.rs b/openraft/src/metrics/metric_display.rs index 7468c11b5..f27585474 100644 --- a/openraft/src/metrics/metric_display.rs +++ b/openraft/src/metrics/metric_display.rs @@ -3,17 +3,17 @@ use std::fmt::Formatter; use crate::display_ext::DisplayOption; use crate::metrics::Metric; -use crate::NodeId; +use crate::RaftTypeConfig; /// Display the value of a metric. -pub(crate) struct MetricDisplay<'a, NID> -where NID: NodeId +pub(crate) struct MetricDisplay<'a, C> +where C: RaftTypeConfig { - pub(crate) metric: &'a Metric, + pub(crate) metric: &'a Metric, } -impl<'a, NID> fmt::Display for MetricDisplay<'a, NID> -where NID: NodeId +impl<'a, C> fmt::Display for MetricDisplay<'a, C> +where C: RaftTypeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self.metric { diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index e0bd6362b..d4d200fb0 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -5,25 +5,20 @@ use crate::core::ServerState; use crate::display_ext::DisplayOption; use crate::error::Fatal; use crate::metrics::ReplicationMetrics; -use crate::node::Node; use crate::summary::MessageSummary; use crate::LogId; -use crate::NodeId; +use crate::RaftTypeConfig; use crate::StoredMembership; use crate::Vote; /// A set of metrics describing the current state of a Raft node. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct RaftMetrics -where - NID: NodeId, - N: Node, -{ - pub running_state: Result<(), Fatal>, +pub struct RaftMetrics { + pub running_state: Result<(), Fatal>, /// The ID of the Raft node. - pub id: NID, + pub id: C::NodeId, // --- // --- data --- @@ -32,23 +27,23 @@ where pub current_term: u64, /// The last accepted vote. - pub vote: Vote, + pub vote: Vote, /// The last log index has been appended to this Raft node's log. pub last_log_index: Option, /// The last log index has been applied to this Raft node's state machine. - pub last_applied: Option>, + pub last_applied: Option>, /// The id of the last log included in snapshot. /// If there is no snapshot, it is (0,0). - pub snapshot: Option>, + pub snapshot: Option>, /// The last log id that has purged from storage, inclusive. /// /// `purged` is also the first log id Openraft knows, although the corresponding log entry has /// already been deleted. - pub purged: Option>, + pub purged: Option>, // --- // --- cluster --- @@ -57,22 +52,20 @@ where pub state: ServerState, /// The current cluster leader. - pub current_leader: Option, + pub current_leader: Option, /// The current membership config of the cluster. - pub membership_config: Arc>, + pub membership_config: Arc>, // --- // --- replication --- // --- /// The replication states. It is Some() only when this node is leader. - pub replication: Option>, + pub replication: Option>, } -impl fmt::Display for RaftMetrics -where - NID: NodeId, - N: Node, +impl fmt::Display for RaftMetrics +where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Metrics{{")?; @@ -106,22 +99,18 @@ where Ok(()) } } -impl MessageSummary> for RaftMetrics -where - NID: NodeId, - N: Node, +impl MessageSummary> for RaftMetrics +where C: RaftTypeConfig { fn summary(&self) -> String { self.to_string() } } -impl RaftMetrics -where - NID: NodeId, - N: Node, +impl RaftMetrics +where C: RaftTypeConfig { - pub fn new_initial(id: NID) -> Self { + pub fn new_initial(id: C::NodeId) -> Self { Self { running_state: Ok(()), id, @@ -144,18 +133,16 @@ where /// Subset of RaftMetrics, only include data-related metrics #[derive(Clone, Debug, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct RaftDataMetrics -where NID: NodeId -{ - pub last_log: Option>, - pub last_applied: Option>, - pub snapshot: Option>, - pub purged: Option>, - pub replication: Option>, +pub struct RaftDataMetrics { + pub last_log: Option>, + pub last_applied: Option>, + pub snapshot: Option>, + pub purged: Option>, + pub replication: Option>, } -impl fmt::Display for RaftDataMetrics -where NID: NodeId +impl fmt::Display for RaftDataMetrics +where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "DataMetrics{{")?; @@ -178,8 +165,8 @@ where NID: NodeId } } -impl MessageSummary> for RaftDataMetrics -where NID: NodeId +impl MessageSummary> for RaftDataMetrics +where C: RaftTypeConfig { fn summary(&self) -> String { self.to_string() @@ -189,22 +176,16 @@ where NID: NodeId /// Subset of RaftMetrics, only include server-related metrics #[derive(Clone, Debug, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct RaftServerMetrics -where - NID: NodeId, - N: Node, -{ - pub id: NID, - pub vote: Vote, +pub struct RaftServerMetrics { + pub id: C::NodeId, + pub vote: Vote, pub state: ServerState, - pub current_leader: Option, - pub membership_config: Arc>, + pub current_leader: Option, + pub membership_config: Arc>, } -impl fmt::Display for RaftServerMetrics -where - NID: NodeId, - N: Node, +impl fmt::Display for RaftServerMetrics +where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "ServerMetrics{{")?; @@ -224,10 +205,8 @@ where } } -impl MessageSummary> for RaftServerMetrics -where - NID: NodeId, - N: Node, +impl MessageSummary> for RaftServerMetrics +where C: RaftTypeConfig { fn summary(&self) -> String { self.to_string() diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 80b8a63ff..23528b36f 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -7,13 +7,14 @@ use crate::core::ServerState; use crate::metrics::Condition; use crate::metrics::Metric; use crate::metrics::RaftMetrics; -use crate::node::Node; +use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::alias::InstantOf; use crate::AsyncRuntime; use crate::Instant; use crate::LogId; use crate::MessageSummary; -use crate::NodeId; use crate::OptionalSend; +use crate::RaftTypeConfig; use crate::Vote; // Error variants related to metrics. @@ -28,28 +29,19 @@ pub enum WaitError { /// Wait is a wrapper of RaftMetrics channel that impls several utils to wait for metrics to satisfy /// some condition. -pub struct Wait -where - NID: NodeId, - N: Node, - A: AsyncRuntime, -{ +pub struct Wait { pub timeout: Duration, - pub rx: watch::Receiver>, - pub(crate) _phantom: std::marker::PhantomData, + pub rx: watch::Receiver>, } -impl Wait -where - NID: NodeId, - N: Node, - A: AsyncRuntime, +impl Wait +where C: RaftTypeConfig { /// Wait for metrics to satisfy some condition or timeout. #[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))] - pub async fn metrics(&self, func: T, msg: impl ToString) -> Result, WaitError> - where T: Fn(&RaftMetrics) -> bool + OptionalSend { - let timeout_at = A::Instant::now() + self.timeout; + pub async fn metrics(&self, func: T, msg: impl ToString) -> Result, WaitError> + where T: Fn(&RaftMetrics) -> bool + OptionalSend { + let timeout_at = InstantOf::::now() + self.timeout; let mut rx = self.rx.clone(); loop { @@ -72,7 +64,7 @@ where return Ok(latest); } - let now = A::Instant::now(); + let now = InstantOf::::now(); if now >= timeout_at { return Err(WaitError::Timeout( self.timeout, @@ -82,7 +74,7 @@ where let sleep_time = timeout_at - now; tracing::debug!(?sleep_time, "wait timeout"); - let delay = A::sleep(sleep_time); + let delay = AsyncRuntimeOf::::sleep(sleep_time); tokio::select! { _ = delay => { @@ -113,13 +105,13 @@ where /// Wait for `vote` to become `want` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn vote(&self, want: Vote, msg: impl ToString) -> Result, WaitError> { + pub async fn vote(&self, want: Vote, msg: impl ToString) -> Result, WaitError> { self.eq(Metric::Vote(want), msg).await } /// Wait for `current_leader` to become `Some(leader_id)` until timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn current_leader(&self, leader_id: NID, msg: impl ToString) -> Result, WaitError> { + pub async fn current_leader(&self, leader_id: C::NodeId, msg: impl ToString) -> Result, WaitError> { self.metrics( |m| m.current_leader == Some(leader_id), &format!("{} .current_leader == {}", msg.to_string(), leader_id), @@ -130,7 +122,7 @@ where /// Wait until applied exactly `want_log`(inclusive) logs or timeout. #[deprecated(since = "0.9.0", note = "use `log_index()` and `applied_index()` instead")] #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn log(&self, want_log_index: Option, msg: impl ToString) -> Result, WaitError> { + pub async fn log(&self, want_log_index: Option, msg: impl ToString) -> Result, WaitError> { self.eq(Metric::LastLogIndex(want_log_index), msg.to_string()).await?; self.eq(Metric::AppliedIndex(want_log_index), msg.to_string()).await } @@ -141,18 +133,14 @@ where note = "use `log_index_at_least()` and `applied_index_at_least()` instead" )] #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn log_at_least( - &self, - want_log: Option, - msg: impl ToString, - ) -> Result, WaitError> { + pub async fn log_at_least(&self, want_log: Option, msg: impl ToString) -> Result, WaitError> { self.ge(Metric::LastLogIndex(want_log), msg.to_string()).await?; self.ge(Metric::AppliedIndex(want_log), msg.to_string()).await } /// Block until the last log index becomes exactly `index`(inclusive) or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn log_index(&self, index: Option, msg: impl ToString) -> Result, WaitError> { + pub async fn log_index(&self, index: Option, msg: impl ToString) -> Result, WaitError> { self.eq(Metric::LastLogIndex(index), msg).await } @@ -162,17 +150,13 @@ where &self, index: Option, msg: impl ToString, - ) -> Result, WaitError> { + ) -> Result, WaitError> { self.ge(Metric::LastLogIndex(index), msg).await } /// Block until the applied index becomes exactly `index`(inclusive) or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn applied_index( - &self, - index: Option, - msg: impl ToString, - ) -> Result, WaitError> { + pub async fn applied_index(&self, index: Option, msg: impl ToString) -> Result, WaitError> { self.eq(Metric::AppliedIndex(index), msg).await } @@ -183,13 +167,13 @@ where &self, index: Option, msg: impl ToString, - ) -> Result, WaitError> { + ) -> Result, WaitError> { self.ge(Metric::AppliedIndex(index), msg).await } /// Wait for `state` to become `want_state` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result, WaitError> { + pub async fn state(&self, want_state: ServerState, msg: impl ToString) -> Result, WaitError> { self.metrics( |m| m.state == want_state, &format!("{} .state == {:?}", msg.to_string(), want_state), @@ -202,9 +186,9 @@ where #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn members( &self, - want_members: BTreeSet, + want_members: BTreeSet, msg: impl ToString, - ) -> Result, WaitError> { + ) -> Result, WaitError> { self.metrics( |m| { let got = m.membership_config.membership().voter_ids().collect::>(); @@ -219,9 +203,9 @@ where #[tracing::instrument(level = "trace", skip_all, fields(msg=msg.to_string().as_str()))] pub async fn voter_ids( &self, - voter_ids: impl IntoIterator, + voter_ids: impl IntoIterator, msg: impl ToString, - ) -> Result, WaitError> { + ) -> Result, WaitError> { let want = voter_ids.into_iter().collect::>(); tracing::debug!("block until voter_ids == {:?}", want); @@ -240,15 +224,19 @@ where #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn snapshot( &self, - snapshot_last_log_id: LogId, + snapshot_last_log_id: LogId, msg: impl ToString, - ) -> Result, WaitError> { + ) -> Result, WaitError> { self.eq(Metric::Snapshot(Some(snapshot_last_log_id)), msg).await } /// Wait for `purged` to become `want` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn purged(&self, want: Option>, msg: impl ToString) -> Result, WaitError> { + pub async fn purged( + &self, + want: Option>, + msg: impl ToString, + ) -> Result, WaitError> { self.eq(Metric::Purged(want), msg).await } @@ -258,7 +246,7 @@ where /// ```ignore /// my_raft.wait(None).ge(Metric::Term(2), "become term 2").await?; /// ``` - pub async fn ge(&self, metric: Metric, msg: impl ToString) -> Result, WaitError> { + pub async fn ge(&self, metric: Metric, msg: impl ToString) -> Result, WaitError> { self.until(Condition::ge(metric), msg).await } @@ -268,17 +256,13 @@ where /// ```ignore /// my_raft.wait(None).eq(Metric::Term(2), "become term 2").await?; /// ``` - pub async fn eq(&self, metric: Metric, msg: impl ToString) -> Result, WaitError> { + pub async fn eq(&self, metric: Metric, msg: impl ToString) -> Result, WaitError> { self.until(Condition::eq(metric), msg).await } /// Block until a metric satisfies the specified condition or timeout. #[tracing::instrument(level = "trace", skip_all, fields(cond=cond.to_string(), msg=msg.to_string().as_str()))] - pub(crate) async fn until( - &self, - cond: Condition, - msg: impl ToString, - ) -> Result, WaitError> { + pub(crate) async fn until(&self, cond: Condition, msg: impl ToString) -> Result, WaitError> { self.metrics( |raft_metrics| match &cond { Condition::GE(expect) => raft_metrics >= expect, diff --git a/openraft/src/metrics/wait_condition.rs b/openraft/src/metrics/wait_condition.rs index b488811d9..aa60c71be 100644 --- a/openraft/src/metrics/wait_condition.rs +++ b/openraft/src/metrics/wait_condition.rs @@ -2,27 +2,27 @@ use std::fmt; use crate::metrics::metric_display::MetricDisplay; use crate::metrics::Metric; -use crate::NodeId; +use crate::RaftTypeConfig; /// A condition that the application wait for. #[derive(Debug)] -pub(crate) enum Condition -where NID: NodeId +pub(crate) enum Condition +where C: RaftTypeConfig { - GE(Metric), - EQ(Metric), + GE(Metric), + EQ(Metric), } -impl Condition -where NID: NodeId +impl Condition +where C: RaftTypeConfig { /// Build a new condition which the application will await to meet or exceed. - pub(crate) fn ge(v: Metric) -> Self { + pub(crate) fn ge(v: Metric) -> Self { Self::GE(v) } /// Build a new condition which the application will await to meet. - pub(crate) fn eq(v: Metric) -> Self { + pub(crate) fn eq(v: Metric) -> Self { Self::EQ(v) } @@ -40,7 +40,7 @@ where NID: NodeId } } - pub(crate) fn value(&self) -> MetricDisplay<'_, NID> { + pub(crate) fn value(&self) -> MetricDisplay<'_, C> { match self { Condition::GE(v) => v.value(), Condition::EQ(v) => v.value(), @@ -48,8 +48,8 @@ where NID: NodeId } } -impl fmt::Display for Condition -where NID: NodeId +impl fmt::Display for Condition +where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}{}{}", self.name(), self.op(), self.value()) diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 7c24b3009..2a6a73ddc 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -7,18 +7,18 @@ use tokio::sync::watch; use tokio::time::sleep; use crate::core::ServerState; +use crate::engine::testing::UTConfig; use crate::log_id::LogIdOptionExt; use crate::metrics::Wait; use crate::metrics::WaitError; use crate::testing::log_id; +use crate::type_config::alias::NodeIdOf; use crate::vote::CommittedLeaderId; use crate::LogId; use crate::Membership; -use crate::Node; -use crate::NodeId; use crate::RaftMetrics; +use crate::RaftTypeConfig; use crate::StoredMembership; -use crate::TokioRuntime; use crate::Vote; /// Test wait for different state changes @@ -26,7 +26,7 @@ use crate::Vote; async fn test_wait() -> anyhow::Result<()> { { // wait for leader - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -42,7 +42,7 @@ async fn test_wait() -> anyhow::Result<()> { { // wait for applied log - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -70,7 +70,7 @@ async fn test_wait() -> anyhow::Result<()> { { // wait for state - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -87,7 +87,7 @@ async fn test_wait() -> anyhow::Result<()> { { // wait for members - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -110,7 +110,7 @@ async fn test_wait() -> anyhow::Result<()> { tracing::info!("--- wait for snapshot, Ok"); { - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -127,7 +127,7 @@ async fn test_wait() -> anyhow::Result<()> { tracing::info!("--- wait for snapshot, only index matches"); { - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -152,7 +152,7 @@ async fn test_wait() -> anyhow::Result<()> { { // timeout - let (_init, w, _tx) = init_wait_test::(); + let (_init, w, _tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(200)).await; @@ -176,7 +176,7 @@ async fn test_wait() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_wait_log_index() -> anyhow::Result<()> { // wait for applied log - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -203,7 +203,7 @@ async fn test_wait_log_index() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_wait_vote() -> anyhow::Result<()> { - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -226,7 +226,7 @@ async fn test_wait_vote() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_wait_purged() -> anyhow::Result<()> { - let (init, w, tx) = init_wait_test::(); + let (init, w, tx) = init_wait_test::(); let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; @@ -242,22 +242,15 @@ async fn test_wait_purged() -> anyhow::Result<()> { Ok(()) } -pub(crate) type InitResult = ( - RaftMetrics, - Wait, - watch::Sender>, -); +pub(crate) type InitResult = (RaftMetrics, Wait, watch::Sender>); /// Build a initial state for testing of Wait: /// Returns init metrics, Wait, and the tx to send an updated metrics. -fn init_wait_test() -> InitResult -where - NID: NodeId, - N: Node, -{ +fn init_wait_test() -> InitResult +where C: RaftTypeConfig { let init = RaftMetrics { running_state: Ok(()), - id: NID::default(), + id: NodeIdOf::::default(), state: ServerState::Learner, current_term: 0, vote: Vote::default(), @@ -275,7 +268,6 @@ where let w = Wait { timeout: Duration::from_millis(100), rx, - _phantom: std::marker::PhantomData, }; (init, w, tx) diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 91c9446e0..7b6d61632 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -13,7 +13,6 @@ pub(crate) use self::external_request::BoxCoreFn; pub(in crate::raft) mod core_state; use std::fmt::Debug; -use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; @@ -668,7 +667,7 @@ where C: RaftTypeConfig /// Returns Err() if it should keep waiting. fn check_replication_upto_date( &self, - metrics: &RaftMetrics, + metrics: &RaftMetrics, node_id: C::NodeId, membership_log_id: Option>, ) -> Result>, ()> { @@ -856,17 +855,17 @@ where C: RaftTypeConfig } /// Get a handle to the metrics channel. - pub fn metrics(&self) -> watch::Receiver> { + pub fn metrics(&self) -> watch::Receiver> { self.inner.rx_metrics.clone() } /// Get a handle to the data metrics channel. - pub fn data_metrics(&self) -> watch::Receiver> { + pub fn data_metrics(&self) -> watch::Receiver> { self.inner.rx_data_metrics.clone() } /// Get a handle to the server metrics channel. - pub fn server_metrics(&self) -> watch::Receiver> { + pub fn server_metrics(&self) -> watch::Receiver> { self.inner.rx_server_metrics.clone() } @@ -890,7 +889,7 @@ where C: RaftTypeConfig /// // wait for raft state to become a follower /// r.wait(None).state(State::Follower, "state").await?; /// ``` - pub fn wait(&self, timeout: Option) -> Wait { + pub fn wait(&self, timeout: Option) -> Wait { let timeout = match timeout { Some(t) => t, None => Duration::from_secs(86400 * 365 * 100), @@ -898,7 +897,6 @@ where C: RaftTypeConfig Wait { timeout, rx: self.inner.rx_metrics.clone(), - _phantom: PhantomData, } } diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index a1bf298b6..ed286b5ab 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -34,9 +34,9 @@ where C: RaftTypeConfig pub(in crate::raft) runtime_config: Arc, pub(in crate::raft) tick_handle: TickHandle, pub(in crate::raft) tx_api: mpsc::UnboundedSender>, - pub(in crate::raft) rx_metrics: watch::Receiver>, - pub(in crate::raft) rx_data_metrics: watch::Receiver>, - pub(in crate::raft) rx_server_metrics: watch::Receiver>, + pub(in crate::raft) rx_metrics: watch::Receiver>, + pub(in crate::raft) rx_data_metrics: watch::Receiver>, + pub(in crate::raft) rx_server_metrics: watch::Receiver>, // TODO(xp): it does not need to be a async mutex. #[allow(clippy::type_complexity)] diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index ea994ee79..1f6f551e9 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -58,7 +58,6 @@ use openraft::RaftState; use openraft::RaftTypeConfig; use openraft::ServerState; use openraft::TokioInstant; -use openraft::TokioRuntime; use openraft::Vote; use openraft_memstore::ClientRequest; use openraft_memstore::ClientResponse; @@ -571,7 +570,7 @@ impl TypedRaftRouter { /// Get a payload of the latest metrics from each node in the cluster. #[allow(clippy::significant_drop_in_scrutinee)] - pub fn latest_metrics(&self) -> Vec> { + pub fn latest_metrics(&self) -> Vec> { let rt = self.nodes.lock().unwrap(); let mut metrics = vec![]; for node in rt.values() { @@ -582,7 +581,7 @@ impl TypedRaftRouter { metrics } - pub fn get_metrics(&self, node_id: &MemNodeId) -> anyhow::Result> { + pub fn get_metrics(&self, node_id: &MemNodeId) -> anyhow::Result> { let node = self.get_raft_handle(node_id)?; let metrics = node.metrics().borrow().clone(); Ok(metrics) @@ -613,16 +612,16 @@ impl TypedRaftRouter { func: T, timeout: Option, msg: &str, - ) -> anyhow::Result> + ) -> anyhow::Result> where - T: Fn(&RaftMetrics) -> bool + Send, + T: Fn(&RaftMetrics) -> bool + Send, { let wait = self.wait(node_id, timeout); let rst = wait.metrics(func, format!("node-{} {}", node_id, msg)).await?; Ok(rst) } - pub fn wait(&self, node_id: &MemNodeId, timeout: Option) -> Wait { + pub fn wait(&self, node_id: &MemNodeId, timeout: Option) -> Wait { let node = { let rt = self.nodes.lock().unwrap(); rt.get(node_id).expect("target node not found in routing table").clone().0