Skip to content

Commit

Permalink
Feature: add Raft::data_metrics() and Raft::server_metrics()
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jan 14, 2024
1 parent 42e809c commit 13ab097
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 0 deletions.
4 changes: 4 additions & 0 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ mod wait_condition;
use std::collections::BTreeMap;

pub use metric::Metric;
pub use raft_metrics::is_data_metrics_changed;
pub use raft_metrics::is_server_metrics_changed;
pub use raft_metrics::RaftDataMetrics;
pub use raft_metrics::RaftMetrics;
pub use raft_metrics::RaftServerMetrics;
pub use wait::Wait;
pub use wait::WaitError;
pub(crate) use wait_condition::Condition;
Expand Down
152 changes: 152 additions & 0 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,155 @@ where
}
}
}

pub fn is_data_metrics_changed<NID, N>(old: &RaftMetrics<NID, N>, new: &RaftMetrics<NID, N>) -> bool
where
NID: NodeId,
N: Node,
{
new.last_log_index.ne(&old.last_log_index)
|| new.last_applied.ne(&old.last_applied)
|| new.snapshot.ne(&old.snapshot)
|| new.purged.ne(&old.purged)
|| new.replication.ne(&old.replication)
}

pub fn is_server_metrics_changed<NID, N>(old: &RaftMetrics<NID, N>, new: &RaftMetrics<NID, N>) -> bool
where
NID: NodeId,
N: Node,
{
new.current_term.ne(&old.current_term)
|| new.vote.ne(&old.vote)
|| new.state.ne(&old.state)
|| new.current_leader.ne(&old.current_leader)
|| new.membership_config.ne(&old.membership_config)
}

/// Subset of RaftMetrics, only include data-related metrics
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftDataMetrics<NID>
where NID: NodeId
{
pub last_log_index: Option<u64>,
pub last_applied: Option<LogId<NID>>,
pub snapshot: Option<LogId<NID>>,
pub purged: Option<LogId<NID>>,
pub replication: Option<ReplicationMetrics<NID>>,
}

impl<NID> fmt::Display for RaftDataMetrics<NID>
where NID: NodeId
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DataMetrics{{")?;

write!(
f,
"last_log:{}, last_applied:{}, snapshot:{}, purged:{}, replication:{{{}}}",
DisplayOption(&self.last_log_index),
DisplayOption(&self.last_applied),
DisplayOption(&self.snapshot),
DisplayOption(&self.purged),
self.replication
.as_ref()
.map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::<Vec<_>>().join(",") })
.unwrap_or_default(),
)?;

write!(f, "}}")?;
Ok(())
}
}

impl<NID> MessageSummary<RaftDataMetrics<NID>> for RaftDataMetrics<NID>
where NID: NodeId
{
fn summary(&self) -> String {
self.to_string()
}
}

impl<NID, N> From<RaftMetrics<NID, N>> for RaftDataMetrics<NID>
where
NID: NodeId,
N: Node,
{
fn from(metrics: RaftMetrics<NID, N>) -> Self {
Self {
last_log_index: metrics.last_log_index,
last_applied: metrics.last_applied,
snapshot: metrics.snapshot,
purged: metrics.purged,
replication: metrics.replication,
}
}
}

/// Subset of RaftMetrics, only include server-related metrics
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
pub id: NID,
pub current_term: u64,
pub vote: Vote<NID>,
pub state: ServerState,
pub current_leader: Option<NID>,
pub membership_config: Arc<StoredMembership<NID, N>>,
}

impl<NID, N> fmt::Display for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Metrics{{")?;

write!(
f,
"id:{}, {:?}, term:{}, vote:{}, leader:{}, membership:{}",
self.id,
self.state,
self.current_term,
self.vote,
DisplayOption(&self.current_leader),
self.membership_config.summary(),
)?;

write!(f, "}}")?;
Ok(())
}
}

impl<NID, N> MessageSummary<RaftServerMetrics<NID, N>> for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn summary(&self) -> String {
self.to_string()
}
}

impl<NID, N> From<RaftMetrics<NID, N>> for RaftServerMetrics<NID, N>
where
NID: NodeId,
N: Node,
{
fn from(metrics: RaftMetrics<NID, N>) -> Self {
Self {
id: metrics.id,
current_term: metrics.current_term,
vote: metrics.vote,
state: metrics.state,
current_leader: metrics.current_leader,
membership_config: metrics.membership_config,
}
}
}
45 changes: 45 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::error::RaftError;
use crate::membership::IntoNodes;
use crate::metrics::is_data_metrics_changed;
use crate::metrics::is_server_metrics_changed;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::metrics::RaftServerMetrics;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::network::RaftNetworkFactory;
Expand Down Expand Up @@ -174,8 +178,37 @@ where C: RaftTypeConfig
let (tx_api, rx_api) = mpsc::unbounded_channel();
let (tx_notify, rx_notify) = mpsc::unbounded_channel();
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default());
let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default());
let (tx_shutdown, rx_shutdown) = oneshot::channel();

let mut raft_metrics_rx = rx_metrics.clone();

#[allow(clippy::let_underscore_future)]
let _ = C::AsyncRuntime::spawn(async move {
let mut last = RaftMetrics::new_initial(id);
loop {
let latest = raft_metrics_rx.borrow().clone();
if is_data_metrics_changed(&last, &latest) {
if let Err(err) = tx_data_metrics.send(latest.clone().into()) {
tracing::error!(error=%err, id=display(id), "error reporting data metrics");
}
}

if is_server_metrics_changed(&last, &latest) {
if let Err(err) = tx_server_metrics.send(latest.clone().into()) {
tracing::error!(error=%err, id=display(id), "error reporting server metrics");
}
}

last = latest;
if let Err(e) = raft_metrics_rx.changed().await {
tracing::info!(error=%e, id=display(id), "metrics sender closed, so close data_metrics sender and server_metrics sender");
return;
}
}
});

let tick_handle = Tick::spawn(
Duration::from_millis(config.heartbeat_interval * 3 / 2),
tx_notify.clone(),
Expand Down Expand Up @@ -240,6 +273,8 @@ where C: RaftTypeConfig
tick_handle,
tx_api,
rx_metrics,
rx_data_metrics,
rx_server_metrics,
tx_shutdown: Mutex::new(Some(tx_shutdown)),
core_state: Mutex::new(CoreState::Running(core_handle)),
};
Expand Down Expand Up @@ -822,6 +857,16 @@ where C: RaftTypeConfig
self.inner.rx_metrics.clone()
}

/// Get a handle to the data metrics channel.
pub fn data_metrics(&self) -> watch::Receiver<RaftDataMetrics<C::NodeId>> {
self.inner.rx_data_metrics.clone()
}

/// Get a handle to the server metrics channel.
pub fn server_metrics(&self) -> watch::Receiver<RaftServerMetrics<C::NodeId, C::Node>> {
self.inner.rx_server_metrics.clone()
}

/// Get a handle to wait for the metrics to satisfy some condition.
///
/// If `timeout` is `None`, then it will wait forever(10 years).
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::RaftMsg;
use crate::core::TickHandle;
use crate::error::Fatal;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftServerMetrics;
use crate::raft::core_state::CoreState;
use crate::AsyncRuntime;
use crate::Config;
Expand All @@ -28,6 +30,8 @@ where C: RaftTypeConfig
pub(in crate::raft) tick_handle: TickHandle<C>,
pub(in crate::raft) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
pub(in crate::raft) rx_metrics: watch::Receiver<RaftMetrics<C::NodeId, C::Node>>,
pub(in crate::raft) rx_data_metrics: watch::Receiver<RaftDataMetrics<C::NodeId>>,
pub(in crate::raft) rx_server_metrics: watch::Receiver<RaftServerMetrics<C::NodeId, C::Node>>,

// TODO(xp): it does not need to be a async mutex.
#[allow(clippy::type_complexity)]
Expand Down
1 change: 1 addition & 0 deletions tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod fixtures;

mod t10_current_leader;
mod t10_purged;
mod t10_server_metrics_and_data_metrics;
mod t20_metrics_state_machine_consistency;
mod t30_leader_metrics;
mod t40_metrics_wait;
54 changes: 54 additions & 0 deletions tests/tests/metrics/t10_server_metrics_and_data_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::sync::Arc;

use anyhow::Result;
use maplit::btreeset;
use openraft::Config;
#[allow(unused_imports)] use pretty_assertions::assert_eq;
#[allow(unused_imports)] use pretty_assertions::assert_ne;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Server metrics and data metrics method should work.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn server_metrics_and_data_metrics() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
enable_heartbeat: false,
enable_elect: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

let node = router.get_raft_handle(&0)?;
let mut server_metrics = node.server_metrics();
let data_metrics = node.data_metrics();

let current_leader = router.current_leader(0).await;
let leader = server_metrics.borrow_and_update().current_leader;
assert_eq!(leader, current_leader, "current_leader should be {:?}", current_leader);

// Write some logs.
let n = 10;
tracing::info!(log_index, "--- write {} logs", n);
log_index += router.client_request_many(0, "foo", n).await?;

let last_log_index = data_metrics.borrow().last_log_index;
assert_eq!(
last_log_index,
Some(log_index),
"last_log_index should be {:?}",
Some(log_index)
);
assert!(
!server_metrics.borrow().has_changed(),
"server metrics should not update"
);
Ok(())
}

0 comments on commit 13ab097

Please sign in to comment.