Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Do not report snapshot.last_log_id to metrics until snapshot is finished building/installing #913

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repository = "https://github.com/datafuselabs/openraft"


[workspace.dependencies]
anyerror = { version = "0.1.8" }
anyerror = { version = "0.1.10" }
anyhow = "1.0.63"
async-entry = "0.3.1"
async-trait = "0.1.36"
Expand All @@ -32,7 +32,7 @@ rand = "0.8"
serde = { version="1.0.114", features=["derive", "rc"]}
serde_json = "1.0.57"
tempfile = { version = "3.4.0" }
thiserror = "1.0.33"
thiserror = "1.0.49"
tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = { version = "0.1.29" }
tracing-appender = "0.2.0"
Expand Down
1 change: 0 additions & 1 deletion cluster_benchmark/tests/benchmark/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(unused_crate_dependencies)]
#![deny(unused_qualifications)]
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

pub(crate) mod network;
pub(crate) mod store;
Expand Down
14 changes: 12 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ where
vote: *st.io_state().vote(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().copied(),
snapshot: st.snapshot_meta.last_log_id,
snapshot: st.io_snapshot_last_log_id().copied(),
purged: st.io_purged().copied(),

// --- cluster ---
Expand Down Expand Up @@ -1311,7 +1311,15 @@ where
meta.summary(),
func_name!()
);

// Update in-memory state first, then the io state.
// In-memory state should always be ahead or equal to the io state.

let last_log_id = meta.last_log_id;
self.engine.finish_building_snapshot(meta);

let st = self.engine.state.io_state_mut();
st.update_snapshot(last_log_id);
}
sm::Response::ReceiveSnapshotChunk(_) => {
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());
Expand All @@ -1324,7 +1332,9 @@ where
);

if let Some(meta) = meta {
self.engine.state.io_state_mut().update_applied(meta.last_log_id);
let st = self.engine.state.io_state_mut();
st.update_applied(meta.last_log_id);
st.update_snapshot(meta.last_log_id);
}
}
sm::Response::Apply(res) => {
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ where
let cmd_res = CommandResult::new(seq, res);
let _ = resp_tx.send(Notify::sm(cmd_res));
});
tracing::info!("{} returning; spawned building snapshot task", func_name!());
}

#[tracing::instrument(level = "info", skip_all)]
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![doc = include_str!("../README.md")]
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]
#![cfg_attr(feature = "bench", feature(test))]
// TODO: `clippy::result-large-err`: StorageError is 136 bytes, try to reduce the size.
#![allow(clippy::bool_assert_comparison, clippy::type_complexity, clippy::result_large_err)]
Expand All @@ -16,7 +15,7 @@
//! `RUSTC_BOOTSTRAP=1`.
//!
//! - `bt`: Enable backtrace: generate backtrace for errors. This requires unstable feature
//! `error_generic_member_access` and `provide_any` thus it can not be used with stable rust.
//! `error_generic_member_access` thus it can not be used with stable rust.
//!
//! - `serde`: Add serde::Serialize and serde:Deserialize bound to data types. If you'd like to use
//! `serde` to serialize messages.
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/progress/bench/vec_progress_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn progress_update_01234_567(b: &mut Bencher) {
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, 0..=7, 0);

let mut id = 0u64;
let mut values = vec![0, 1, 2, 3, 4, 5, 6, 7];
let mut values = [0, 1, 2, 3, 4, 5, 6, 7];
b.iter(|| {
id = (id + 1) & 7;
values[id as usize] += 1;
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/progress/entry/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl LogStateReader<u64> for LogState {
todo!()
}

fn io_snapshot_last_log_id(&self) -> Option<&LogId<u64>> {
todo!()
}

fn io_purged(&self) -> Option<&LogId<u64>> {
todo!()
}
Expand Down
22 changes: 11 additions & 11 deletions openraft/src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ mod t {
#[test]
fn vec_progress_new() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

assert_eq!(
vec![
Expand All @@ -393,7 +393,7 @@ mod t {
#[test]
fn vec_progress_get() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

let _ = progress.update(&6, 5);
assert_eq!(&5, progress.get(&6));
Expand All @@ -414,7 +414,7 @@ mod t {
#[test]
fn vec_progress_iter() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

let _ = progress.update(&7, 7);
let _ = progress.update(&3, 3);
Expand All @@ -441,10 +441,10 @@ mod t {
#[test]
fn vec_progress_move_up() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);

// initial: 0-0, 1-0, 2-0, 3-0, 4-0
let cases = vec![
let cases = [
((1, 2), &[(1, 2), (0, 0), (2, 0), (3, 0), (4, 0), (6, 0)], 0), //
((2, 3), &[(2, 3), (1, 2), (0, 0), (3, 0), (4, 0), (6, 0)], 0), //
((1, 3), &[(2, 3), (1, 3), (0, 0), (3, 0), (4, 0), (6, 0)], 1), // no move
Expand Down Expand Up @@ -473,7 +473,7 @@ mod t {
#[test]
fn vec_progress_update() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);

// initial: 0,0,0,0,0
let cases = vec![
Expand Down Expand Up @@ -515,10 +515,10 @@ mod t {
let pv = |p, user_data| ProgressEntry { progress: p, user_data };

let quorum_set: Vec<u64> = vec![0, 1, 2];
let mut progress = VecProgress::<u64, ProgressEntry, u64, _>::new(quorum_set, [3].into_iter(), pv(0, "foo"));
let mut progress = VecProgress::<u64, ProgressEntry, u64, _>::new(quorum_set, [3], pv(0, "foo"));

// initial: 0,0,0,0
let cases = vec![
let cases = [
(3, pv(9, "a"), Ok(&0)), // 0,0,0,9 // learner won't affect granted
(1, pv(2, "b"), Ok(&0)), // 0,2,0,9
(2, pv(3, "c"), Ok(&2)), // 0,2,3,9
Expand All @@ -543,7 +543,7 @@ mod t {
#[test]
fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6].into_iter(), 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6], 0);

assert_eq!(Some(5), progress.index(&6));

Expand All @@ -563,7 +563,7 @@ mod t {

// Initially, committed is 5

let mut p012 = VecProgress::<u64, u64, u64, _>::new(qs012, [5].into_iter(), 0);
let mut p012 = VecProgress::<u64, u64, u64, _>::new(qs012, [5], 0);

let _ = p012.update(&0, 5);
let _ = p012.update(&1, 6);
Expand Down Expand Up @@ -597,7 +597,7 @@ mod t {
#[test]
fn vec_progress_is_voter() -> anyhow::Result<()> {
let quorum_set: Vec<u64> = vec![0, 1, 2, 3, 4];
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7].into_iter(), 0);
let progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, [6, 7], 0);

assert_eq!(Some(true), progress.is_voter(&1));
assert_eq!(Some(true), progress.is_voter(&3));
Expand Down
35 changes: 34 additions & 1 deletion openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@ pub(crate) struct LogIOId<NID: NodeId> {
///
/// These states are updated only when the io complete and thus may fall behind to the state stored
/// in [`RaftState`](`crate::RaftState`),.
///
/// The log ids that are tracked includes:
///
/// ```text
/// | log ids
/// | *------------+---------+---------+---------+------------------>
/// | | | | `---> flushed
/// | | | `-------------> applied
/// | | `-----------------------> snapshot
/// | `---------------------------------> purged
/// ```
#[derive(Debug, Clone, Copy)]
#[derive(Default)]
#[derive(PartialEq, Eq)]
pub(crate) struct IOState<NID: NodeId> {
/// Whether it is building a snapshot
building_snapshot: bool,

// The last flushed vote.
/// The last flushed vote.
pub(crate) vote: Vote<NID>,

/// The last log id that has been flushed to storage.
Expand All @@ -33,6 +44,9 @@ pub(crate) struct IOState<NID: NodeId> {
/// The last log id that has been applied to state machine.
pub(crate) applied: Option<LogId<NID>>,

/// The last log id in the currently persisted snapshot.
pub(crate) snapshot: Option<LogId<NID>>,

/// The last log id that has been purged from storage.
///
/// `RaftState::last_purged_log_id()`
Expand All @@ -46,13 +60,15 @@ impl<NID: NodeId> IOState<NID> {
vote: Vote<NID>,
flushed: LogIOId<NID>,
applied: Option<LogId<NID>>,
snapshot: Option<LogId<NID>>,
purged: Option<LogId<NID>>,
) -> Self {
Self {
building_snapshot: false,
vote,
flushed,
applied,
snapshot,
purged,
}
}
Expand Down Expand Up @@ -83,6 +99,23 @@ impl<NID: NodeId> IOState<NID> {
self.applied.as_ref()
}

pub(crate) fn update_snapshot(&mut self, log_id: Option<LogId<NID>>) {
tracing::debug!(snapshot = display(DisplayOption(&log_id)), "{}", func_name!());

debug_assert!(
log_id > self.snapshot,
"snapshot log id should be monotonically increasing: current: {:?}, update: {:?}",
self.snapshot,
log_id
);

self.snapshot = log_id;
}

pub(crate) fn snapshot(&self) -> Option<&LogId<NID>> {
self.snapshot.as_ref()
}

pub(crate) fn set_building_snapshot(&mut self, building: bool) {
self.building_snapshot = building;
}
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/raft_state/log_state_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ pub(crate) trait LogStateReader<NID: NodeId> {
/// This is actually happened io-state which might fall behind committed log id.
fn io_applied(&self) -> Option<&LogId<NID>>;

/// The last log id in the last persisted snapshot.
///
/// This is actually happened io-state which might fall behind `Self::snapshot_last_log_id()`.
fn io_snapshot_last_log_id(&self) -> Option<&LogId<NID>>;

/// The last known purged log id, inclusive.
///
/// This is actually purged log id from storage.
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ where
self.io_state.applied()
}

fn io_snapshot_last_log_id(&self) -> Option<&LogId<NID>> {
self.io_state.snapshot()
}

fn io_purged(&self) -> Option<&LogId<NID>> {
self.io_state.purged()
}
Expand Down
12 changes: 9 additions & 3 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ where
);
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;

// TODO: `flushed` is not set.
let io_state = IOState::new(vote, LogIOId::default(), last_applied, last_purged_log_id);

let snapshot = self.state_machine.get_current_snapshot().await?;

// If there is not a snapshot and there are logs purged, which means the snapshot is not persisted,
Expand All @@ -146,6 +143,15 @@ where
};
let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default();

// TODO: `flushed` is not set.
let io_state = IOState::new(
vote,
LogIOId::default(),
last_applied,
snapshot_meta.last_log_id,
last_purged_log_id,
);

let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();

Ok(RaftState {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-2023-06-01
nightly-2023-10-17
1 change: 0 additions & 1 deletion tests/tests/append_entries/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/elect/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/life_cycle/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/log_store/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/membership/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/replication/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/snapshot_building/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
1 change: 0 additions & 1 deletion tests/tests/snapshot_streaming/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/snapshot_streaming/t10_api_install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn snapshot_arguments() -> Result<()> {

let n = router.remove_node(0).ok_or_else(|| anyhow::anyhow!("node not found"))?;
let make_req = || InstallSnapshotRequest {
/// force it to be a follower
// force it to be a follower
vote: Vote::new_committed(2, 1),
meta: SnapshotMeta {
snapshot_id: "ss1".into(),
Expand Down
1 change: 0 additions & 1 deletion tests/tests/state_machine/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
Expand Down
Loading