diff --git a/Cargo.toml b/Cargo.toml index be63a939d..69ef1b8fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ repository = "https://github.com/datafuselabs/openraft" [workspace.dependencies] -anyerror = { version = "0.1.8" } +anyerror = { version = "0.1.10" } anyhow = "1.0.63" async-entry = "0.3.1" async-trait = "0.1.36" @@ -32,7 +32,7 @@ rand = "0.8" serde = { version="1.0.114", features=["derive", "rc"]} serde_json = "1.0.57" tempfile = { version = "3.4.0" } -thiserror = "1.0.33" +thiserror = "1.0.49" tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] } tracing = { version = "0.1.29" } tracing-appender = "0.2.0" diff --git a/cluster_benchmark/tests/benchmark/main.rs b/cluster_benchmark/tests/benchmark/main.rs index 9230edbdd..d6f9b7b7c 100644 --- a/cluster_benchmark/tests/benchmark/main.rs +++ b/cluster_benchmark/tests/benchmark/main.rs @@ -1,7 +1,6 @@ #![deny(unused_crate_dependencies)] #![deny(unused_qualifications)] #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] pub(crate) mod network; pub(crate) mod store; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 3489d7f31..972a51872 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -521,7 +521,7 @@ where vote: *st.io_state().vote(), last_log_index: st.last_log_id().index(), last_applied: st.io_applied().copied(), - snapshot: st.snapshot_meta.last_log_id, + snapshot: st.io_snapshot_last_log_id().copied(), purged: st.io_purged().copied(), // --- cluster --- @@ -1311,7 +1311,15 @@ where meta.summary(), func_name!() ); + + // Update in-memory state first, then the io state. + // In-memory state should always be ahead or equal to the io state. + + let last_log_id = meta.last_log_id; self.engine.finish_building_snapshot(meta); + + let st = self.engine.state.io_state_mut(); + st.update_snapshot(last_log_id); } sm::Response::ReceiveSnapshotChunk(_) => { tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!()); @@ -1324,7 +1332,9 @@ where ); if let Some(meta) = meta { - self.engine.state.io_state_mut().update_applied(meta.last_log_id); + let st = self.engine.state.io_state_mut(); + st.update_applied(meta.last_log_id); + st.update_snapshot(meta.last_log_id); } } sm::Response::Apply(res) => { diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index e39b652a3..4a50e1daf 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -228,6 +228,7 @@ where let cmd_res = CommandResult::new(seq, res); let _ = resp_tx.send(Notify::sm(cmd_res)); }); + tracing::info!("{} returning; spawned building snapshot task", func_name!()); } #[tracing::instrument(level = "info", skip_all)] diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 79a44964e..b0e4c89ea 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -1,6 +1,5 @@ #![doc = include_str!("../README.md")] #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #![cfg_attr(feature = "bench", feature(test))] // TODO: `clippy::result-large-err`: StorageError is 136 bytes, try to reduce the size. #![allow(clippy::bool_assert_comparison, clippy::type_complexity, clippy::result_large_err)] @@ -16,7 +15,7 @@ //! `RUSTC_BOOTSTRAP=1`. //! //! - `bt`: Enable backtrace: generate backtrace for errors. This requires unstable feature -//! `error_generic_member_access` and `provide_any` thus it can not be used with stable rust. +//! `error_generic_member_access` thus it can not be used with stable rust. //! //! - `serde`: Add serde::Serialize and serde:Deserialize bound to data types. If you'd like to use //! `serde` to serialize messages. diff --git a/openraft/src/progress/bench/vec_progress_update.rs b/openraft/src/progress/bench/vec_progress_update.rs index 6d23127ed..814e81de5 100644 --- a/openraft/src/progress/bench/vec_progress_update.rs +++ b/openraft/src/progress/bench/vec_progress_update.rs @@ -14,7 +14,7 @@ fn progress_update_01234_567(b: &mut Bencher) { let mut progress = VecProgress::::new(quorum_set, 0..=7, 0); let mut id = 0u64; - let mut values = vec![0, 1, 2, 3, 4, 5, 6, 7]; + let mut values = [0, 1, 2, 3, 4, 5, 6, 7]; b.iter(|| { id = (id + 1) & 7; values[id as usize] += 1; diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index 136ba22b2..af8186e81 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -121,6 +121,10 @@ impl LogStateReader for LogState { todo!() } + fn io_snapshot_last_log_id(&self) -> Option<&LogId> { + todo!() + } + fn io_purged(&self) -> Option<&LogId> { todo!() } diff --git a/openraft/src/progress/mod.rs b/openraft/src/progress/mod.rs index e91b2e0bc..1234c6cf8 100644 --- a/openraft/src/progress/mod.rs +++ b/openraft/src/progress/mod.rs @@ -370,7 +370,7 @@ mod t { #[test] fn vec_progress_new() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); + let progress = VecProgress::::new(quorum_set, [6, 7], 0); assert_eq!( vec![ @@ -393,7 +393,7 @@ mod t { #[test] fn vec_progress_get() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6, 7], 0); let _ = progress.update(&6, 5); assert_eq!(&5, progress.get(&6)); @@ -414,7 +414,7 @@ mod t { #[test] fn vec_progress_iter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6, 7], 0); let _ = progress.update(&7, 7); let _ = progress.update(&3, 3); @@ -441,10 +441,10 @@ mod t { #[test] fn vec_progress_move_up() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6], 0); // initial: 0-0, 1-0, 2-0, 3-0, 4-0 - let cases = vec![ + let cases = [ ((1, 2), &[(1, 2), (0, 0), (2, 0), (3, 0), (4, 0), (6, 0)], 0), // ((2, 3), &[(2, 3), (1, 2), (0, 0), (3, 0), (4, 0), (6, 0)], 0), // ((1, 3), &[(2, 3), (1, 3), (0, 0), (3, 0), (4, 0), (6, 0)], 1), // no move @@ -473,7 +473,7 @@ mod t { #[test] fn vec_progress_update() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6], 0); // initial: 0,0,0,0,0 let cases = vec![ @@ -515,10 +515,10 @@ mod t { let pv = |p, user_data| ProgressEntry { progress: p, user_data }; let quorum_set: Vec = vec![0, 1, 2]; - let mut progress = VecProgress::::new(quorum_set, [3].into_iter(), pv(0, "foo")); + let mut progress = VecProgress::::new(quorum_set, [3], pv(0, "foo")); // initial: 0,0,0,0 - let cases = vec![ + let cases = [ (3, pv(9, "a"), Ok(&0)), // 0,0,0,9 // learner won't affect granted (1, pv(2, "b"), Ok(&0)), // 0,2,0,9 (2, pv(3, "c"), Ok(&2)), // 0,2,3,9 @@ -543,7 +543,7 @@ mod t { #[test] fn vec_progress_update_does_not_move_learner_elt() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let mut progress = VecProgress::::new(quorum_set, [6].into_iter(), 0); + let mut progress = VecProgress::::new(quorum_set, [6], 0); assert_eq!(Some(5), progress.index(&6)); @@ -563,7 +563,7 @@ mod t { // Initially, committed is 5 - let mut p012 = VecProgress::::new(qs012, [5].into_iter(), 0); + let mut p012 = VecProgress::::new(qs012, [5], 0); let _ = p012.update(&0, 5); let _ = p012.update(&1, 6); @@ -597,7 +597,7 @@ mod t { #[test] fn vec_progress_is_voter() -> anyhow::Result<()> { let quorum_set: Vec = vec![0, 1, 2, 3, 4]; - let progress = VecProgress::::new(quorum_set, [6, 7].into_iter(), 0); + let progress = VecProgress::::new(quorum_set, [6, 7], 0); assert_eq!(Some(true), progress.is_voter(&1)); assert_eq!(Some(true), progress.is_voter(&3)); diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index b322a37e5..6e21318aa 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -17,6 +17,17 @@ pub(crate) struct LogIOId { /// /// These states are updated only when the io complete and thus may fall behind to the state stored /// in [`RaftState`](`crate::RaftState`),. +/// +/// The log ids that are tracked includes: +/// +/// ```text +/// | log ids +/// | *------------+---------+---------+---------+------------------> +/// | | | | `---> flushed +/// | | | `-------------> applied +/// | | `-----------------------> snapshot +/// | `---------------------------------> purged +/// ``` #[derive(Debug, Clone, Copy)] #[derive(Default)] #[derive(PartialEq, Eq)] @@ -24,7 +35,7 @@ pub(crate) struct IOState { /// Whether it is building a snapshot building_snapshot: bool, - // The last flushed vote. + /// The last flushed vote. pub(crate) vote: Vote, /// The last log id that has been flushed to storage. @@ -33,6 +44,9 @@ pub(crate) struct IOState { /// The last log id that has been applied to state machine. pub(crate) applied: Option>, + /// The last log id in the currently persisted snapshot. + pub(crate) snapshot: Option>, + /// The last log id that has been purged from storage. /// /// `RaftState::last_purged_log_id()` @@ -46,6 +60,7 @@ impl IOState { vote: Vote, flushed: LogIOId, applied: Option>, + snapshot: Option>, purged: Option>, ) -> Self { Self { @@ -53,6 +68,7 @@ impl IOState { vote, flushed, applied, + snapshot, purged, } } @@ -83,6 +99,23 @@ impl IOState { self.applied.as_ref() } + pub(crate) fn update_snapshot(&mut self, log_id: Option>) { + tracing::debug!(snapshot = display(DisplayOption(&log_id)), "{}", func_name!()); + + debug_assert!( + log_id > self.snapshot, + "snapshot log id should be monotonically increasing: current: {:?}, update: {:?}", + self.snapshot, + log_id + ); + + self.snapshot = log_id; + } + + pub(crate) fn snapshot(&self) -> Option<&LogId> { + self.snapshot.as_ref() + } + pub(crate) fn set_building_snapshot(&mut self, building: bool) { self.building_snapshot = building; } diff --git a/openraft/src/raft_state/log_state_reader.rs b/openraft/src/raft_state/log_state_reader.rs index bfd489c8d..4f8814121 100644 --- a/openraft/src/raft_state/log_state_reader.rs +++ b/openraft/src/raft_state/log_state_reader.rs @@ -54,6 +54,11 @@ pub(crate) trait LogStateReader { /// This is actually happened io-state which might fall behind committed log id. fn io_applied(&self) -> Option<&LogId>; + /// The last log id in the last persisted snapshot. + /// + /// This is actually happened io-state which might fall behind `Self::snapshot_last_log_id()`. + fn io_snapshot_last_log_id(&self) -> Option<&LogId>; + /// The last known purged log id, inclusive. /// /// This is actually purged log id from storage. diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index e14aafd24..a37675b9d 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -138,6 +138,10 @@ where self.io_state.applied() } + fn io_snapshot_last_log_id(&self) -> Option<&LogId> { + self.io_state.snapshot() + } + fn io_purged(&self) -> Option<&LogId> { self.io_state.purged() } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 06bd27ec4..6c43b0280 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -125,9 +125,6 @@ where ); let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?; - // TODO: `flushed` is not set. - let io_state = IOState::new(vote, LogIOId::default(), last_applied, last_purged_log_id); - let snapshot = self.state_machine.get_current_snapshot().await?; // If there is not a snapshot and there are logs purged, which means the snapshot is not persisted, @@ -146,6 +143,15 @@ where }; let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default(); + // TODO: `flushed` is not set. + let io_state = IOState::new( + vote, + LogIOId::default(), + last_applied, + snapshot_meta.last_log_id, + last_purged_log_id, + ); + let now = ::Instant::now(); Ok(RaftState { diff --git a/rust-toolchain b/rust-toolchain index df18b6ae3..86cef4f63 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2023-06-01 +nightly-2023-10-17 diff --git a/tests/tests/append_entries/main.rs b/tests/tests/append_entries/main.rs index ebd4b51ce..517affc56 100644 --- a/tests/tests/append_entries/main.rs +++ b/tests/tests/append_entries/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/client_api/main.rs b/tests/tests/client_api/main.rs index cee88f732..f77c67b18 100644 --- a/tests/tests/client_api/main.rs +++ b/tests/tests/client_api/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/elect/main.rs b/tests/tests/elect/main.rs index 0a8a87c4e..f5deb8425 100644 --- a/tests/tests/elect/main.rs +++ b/tests/tests/elect/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/life_cycle/main.rs b/tests/tests/life_cycle/main.rs index 56064993c..61556ecda 100644 --- a/tests/tests/life_cycle/main.rs +++ b/tests/tests/life_cycle/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/log_store/main.rs b/tests/tests/log_store/main.rs index 7e6af4778..5f348fb6f 100644 --- a/tests/tests/log_store/main.rs +++ b/tests/tests/log_store/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/membership/main.rs b/tests/tests/membership/main.rs index 5a77e8376..09f04eca4 100644 --- a/tests/tests/membership/main.rs +++ b/tests/tests/membership/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/metrics/main.rs b/tests/tests/metrics/main.rs index f84e6ef06..409d07b78 100644 --- a/tests/tests/metrics/main.rs +++ b/tests/tests/metrics/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/replication/main.rs b/tests/tests/replication/main.rs index 72c1c97b5..f02e67012 100644 --- a/tests/tests/replication/main.rs +++ b/tests/tests/replication/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/snapshot_building/main.rs b/tests/tests/snapshot_building/main.rs index a0e8afe57..d6ad74cbf 100644 --- a/tests/tests/snapshot_building/main.rs +++ b/tests/tests/snapshot_building/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/snapshot_streaming/main.rs b/tests/tests/snapshot_streaming/main.rs index 8310b4ecd..c7a75da05 100644 --- a/tests/tests/snapshot_streaming/main.rs +++ b/tests/tests/snapshot_streaming/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"] diff --git a/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs b/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs index 6b00e189d..f018608b0 100644 --- a/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs +++ b/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs @@ -38,7 +38,7 @@ async fn snapshot_arguments() -> Result<()> { let n = router.remove_node(0).ok_or_else(|| anyhow::anyhow!("node not found"))?; let make_req = || InstallSnapshotRequest { - /// force it to be a follower + // force it to be a follower vote: Vote::new_committed(2, 1), meta: SnapshotMeta { snapshot_id: "ss1".into(), diff --git a/tests/tests/state_machine/main.rs b/tests/tests/state_machine/main.rs index dc3382996..ac1439200 100644 --- a/tests/tests/state_machine/main.rs +++ b/tests/tests/state_machine/main.rs @@ -1,5 +1,4 @@ #![cfg_attr(feature = "bt", feature(error_generic_member_access))] -#![cfg_attr(feature = "bt", feature(provide_any))] #[macro_use] #[path = "../fixtures/mod.rs"]