Skip to content

Commit

Permalink
Merge pull request #913 from drmingdrmer/44-tracing-log-option
Browse files Browse the repository at this point in the history
Fix: Do not report snapshot.last_log_id to metrics until snapshot is finished building/installing
  • Loading branch information
drmingdrmer authored Oct 18, 2023
2 parents 4b78dd3 + c4a44a2 commit d8e0417
Show file tree
Hide file tree
Showing 25 changed files with 86 additions and 36 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repository = "https://github.com/datafuselabs/openraft"


[workspace.dependencies]
anyerror = { version = "0.1.8" }
anyerror = { version = "0.1.10" }
anyhow = "1.0.63"
async-entry = "0.3.1"
async-trait = "0.1.36"
Expand All @@ -32,7 +32,7 @@ rand = "0.8"
serde = { version="1.0.114", features=["derive", "rc"]}
serde_json = "1.0.57"
tempfile = { version = "3.4.0" }
thiserror = "1.0.33"
thiserror = "1.0.49"
tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = { version = "0.1.29" }
tracing-appender = "0.2.0"
Expand Down
1 change: 0 additions & 1 deletion cluster_benchmark/tests/benchmark/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(unused_crate_dependencies)]
#![deny(unused_qualifications)]
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

pub(crate) mod network;
pub(crate) mod store;
Expand Down
14 changes: 12 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ where
vote: *st.io_state().vote(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().copied(),
snapshot: st.snapshot_meta.last_log_id,
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),

// --- cluster ---
Expand Down Expand Up @@ -1311,7 +1311,15 @@ where
meta.summary(),
func_name!()
);

// Update in-memory state first, then the io state.
// In-memory state should always be ahead or equal to the io state.

let last_log_id = meta.last_log_id;
self.engine.finish_building_snapshot(meta);

let st = self.engine.state.io_state_mut();
st.update_snapshot(last_log_id);
}
sm::Response::ReceiveSnapshotChunk(_) => {
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());
Expand All @@ -1324,7 +1332,9 @@ where
);

if let Some(meta) = meta {
self.engine.state.io_state_mut().update_applied(meta.last_log_id);
let st = self.engine.state.io_state_mut();
st.update_applied(meta.last_log_id);
st.update_snapshot(meta.last_log_id);
}
}
sm::Response::Apply(res) => {
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ where
let cmd_res = CommandResult::new(seq, res);
let _ = resp_tx.send(Notify::sm(cmd_res));
});
tracing::info!("{} returning; spawned building snapshot task", func_name!());
}

#[tracing::instrument(level = "info", skip_all)]
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]
#![cfg_attr(feature = "bench", feature(test))]
// TODO: `clippy::result-large-err`: StorageError is 136 bytes, try to reduce the size.
#![allow(clippy::bool_assert_comparison, clippy::type_complexity, clippy::result_large_err)]
Expand All @@ -16,7 +15,7 @@
//! `RUSTC_BOOTSTRAP=1`.
//!
//! - `bt`: Enable backtrace: generate backtrace for errors. This requires unstable feature
//! `error_generic_member_access` and `provide_any` thus it can not be used with stable rust.
//! `error_generic_member_access` thus it can not be used with stable rust.
//!
//! - `serde`: Add serde::Serialize and serde:Deserialize bound to data types. If you'd like to use
//! `serde` to serialize messages.
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/progress/bench/vec_progress_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn progress_update_01234_567(b: &mut Bencher) {
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, 0..=7, 0);

let mut id = 0u64;
let mut values = vec![0, 1, 2, 3, 4, 5, 6, 7];
let mut values = [0, 1, 2, 3, 4, 5, 6, 7];
b.iter(|| {
id = (id + 1) & 7;
values[id as usize] += 1;
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/progress/entry/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl LogStateReader<u64> for LogState {
todo!()
}

fn io_snapshot_last_log_id(&self) -> Option<&LogId<u64>> {
todo!()
}

fn io_purged(&self) -> Option<&LogId<u64>> {
todo!()
}
Expand Down
22 changes: 11 additions & 11 deletions openraft/src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ mod t {
#[test]
fn vec_progress_new() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

assert_eq!(
vec![
Expand All @@ -393,7 +393,7 @@ mod t {
#[test]
fn vec_progress_get() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

let _ = progress.update(&6, 5);
assert_eq!(&5, progress.get(&6));
Expand All @@ -414,7 +414,7 @@ mod t {
#[test]
fn vec_progress_iter() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

let _ = progress.update(&7, 7);
let _ = progress.update(&3, 3);
Expand All @@ -441,10 +441,10 @@ mod t {
#[test]
fn vec_progress_move_up() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);

// initial: 0-0, 1-0, 2-0, 3-0, 4-0
let cases = vec![
let cases = [
((1, 2), &[(1, 2), (0, 0), (2, 0), (3, 0), (4, 0), (6, 0)], 0), //
((2, 3), &[(2, 3), (1, 2), (0, 0), (3, 0), (4, 0), (6, 0)], 0), //
((1, 3), &[(2, 3), (1, 3), (0, 0), (3, 0), (4, 0), (6, 0)], 1), // no move
Expand Down Expand Up @@ -473,7 +473,7 @@ mod t {
#[test]
fn vec_progress_update() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);

// initial: 0,0,0,0,0
let cases = vec![
Expand Down Expand Up @@ -515,10 +515,10 @@ mod t {
let pv = |p, user_data| ProgressEntry { progress: p, user_data };

let quorum_set: Vec<u64> = vec![0, 1, 2];
let mut progress = VecProgress::<u64, ProgressEntry, u64, _>::new(quorum_set, [3].into_iter(), pv(0, "foo"));
let mut progress = VecProgress::<u64, ProgressEntry, u64, _>::new(quorum_set, [3], pv(0, "foo"));

// initial: 0,0,0,0
let cases = vec![
let cases = [
(3, pv(9, "a"), Ok(&0)), // 0,0,0,9 // learner won't affect granted
(1, pv(2, "b"), Ok(&0)), // 0,2,0,9
(2, pv(3, "c"), Ok(&2)), // 0,2,3,9
Expand All @@ -543,7 +543,7 @@ mod t {
#[test]
fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);

assert_eq!(Some(5), progress.index(&6));

Expand All @@ -563,7 +563,7 @@ mod t {

// Initially, committed is 5

let mut p012 = VecProgress::<u64, u64, u64, _>::new(qs012, [5].into_iter(), 0);
let mut p012 = VecProgress::<u64, u64, u64, _>::new(qs012, [5], 0);

let _ = p012.update(&0, 5);
let _ = p012.update(&1, 6);
Expand Down Expand Up @@ -597,7 +597,7 @@ mod t {
#[test]
fn vec_progress_is_voter() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

assert_eq!(Some(true), progress.is_voter(&1));
assert_eq!(Some(true), progress.is_voter(&3));
Expand Down
35 changes: 34 additions & 1 deletion openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@ pub(crate) struct LogIOId<NID: NodeId> {
///
/// These states are updated only when the io complete and thus may fall behind to the state stored
/// in [`RaftState`](`crate::RaftState`),.
///
/// The log ids that are tracked includes:
///
/// ```text
/// | log ids
/// | *------------+---------+---------+---------+------------------>
/// | | | | `---> flushed
/// | | | `-------------> applied
/// | | `-----------------------> snapshot
/// | `---------------------------------> purged
/// ```
#[derive(Debug, Clone, Copy)]
#[derive(Default)]
#[derive(PartialEq, Eq)]
pub(crate) struct IOState<NID: NodeId> {
/// Whether it is building a snapshot
building_snapshot: bool,

// The last flushed vote.
/// The last flushed vote.
pub(crate) vote: Vote<NID>,

/// The last log id that has been flushed to storage.
Expand All @@ -33,6 +44,9 @@ pub(crate) struct IOState<NID: NodeId> {
/// The last log id that has been applied to state machine.
pub(crate) applied: Option<LogId<NID>>,

/// The last log id in the currently persisted snapshot.
pub(crate) snapshot: Option<LogId<NID>>,

/// The last log id that has been purged from storage.
///
/// `RaftState::last_purged_log_id()`
Expand All @@ -46,13 +60,15 @@ impl<NID: NodeId> IOState<NID> {
vote: Vote<NID>,
flushed: LogIOId<NID>,
applied: Option<LogId<NID>>,
snapshot: Option<LogId<NID>>,
purged: Option<LogId<NID>>,
) -> Self {
Self {
building_snapshot: false,
vote,
flushed,
applied,
snapshot,
purged,
}
}
Expand Down Expand Up @@ -83,6 +99,23 @@ impl<NID: NodeId> IOState<NID> {
self.applied.as_ref()
}

pub(crate) fn update_snapshot(&mut self, log_id: Option<LogId<NID>>) {
tracing::debug!(snapshot = display(DisplayOption(&log_id)), "{}", func_name!());

debug_assert!(
log_id > self.snapshot,
"snapshot log id should be monotonically increasing: current: {:?}, update: {:?}",
self.snapshot,
log_id
);

self.snapshot = log_id;
}

pub(crate) fn snapshot(&self) -> Option<&LogId<NID>> {
self.snapshot.as_ref()
}

pub(crate) fn set_building_snapshot(&mut self, building: bool) {
self.building_snapshot = building;
}
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/raft_state/log_state_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ pub(crate) trait LogStateReader<NID: NodeId> {
/// This is actually happened io-state which might fall behind committed log id.
fn io_applied(&self) -> Option<&LogId<NID>>;

/// The last log id in the last persisted snapshot.
///
/// This is actually happened io-state which might fall behind `Self::snapshot_last_log_id()`.
fn io_snapshot_last_log_id(&self) -> Option<&LogId<NID>>;

/// The last known purged log id, inclusive.
///
/// This is actually purged log id from storage.
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ where
self.io_state.applied()
}

fn io_snapshot_last_log_id(&self) -> Option<&LogId<NID>> {
self.io_state.snapshot()
}

fn io_purged(&self) -> Option<&LogId<NID>> {
self.io_state.purged()
}
Expand Down
12 changes: 9 additions & 3 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ where
);
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;

// TODO: `flushed` is not set.
let io_state = IOState::new(vote, LogIOId::default(), last_applied, last_purged_log_id);

let snapshot = self.state_machine.get_current_snapshot().await?;

// If there is not a snapshot and there are logs purged, which means the snapshot is not persisted,
Expand All @@ -146,6 +143,15 @@ where
};
let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default();

// TODO: `flushed` is not set.
let io_state = IOState::new(
vote,
LogIOId::default(),
last_applied,
snapshot_meta.last_log_id,
last_purged_log_id,
);

let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();

Ok(RaftState {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-2023-06-01
nightly-2023-10-17
1 change: 0 additions & 1 deletion tests/tests/append_entries/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/elect/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/life_cycle/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/log_store/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/membership/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/replication/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/snapshot_building/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/snapshot_streaming/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/snapshot_streaming/t10_api_install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn snapshot_arguments() -> Result<()> {

let n = router.remove_node(0).ok_or_else(|| anyhow::anyhow!("node not found"))?;
let make_req = || InstallSnapshotRequest {
/// force it to be a follower
// force it to be a follower
vote: Vote::new_committed(2, 1),
meta: SnapshotMeta {
snapshot_id: "ss1".into(),
Expand Down
1 change: 0 additions & 1 deletion tests/tests/state_machine/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down

0 comments on commit d8e0417

Please sign in to comment.