diff --git a/openraft/src/engine/leader_log_ids.rs b/openraft/src/engine/leader_log_ids.rs new file mode 100644 index 000000000..826ba7467 --- /dev/null +++ b/openraft/src/engine/leader_log_ids.rs @@ -0,0 +1,53 @@ +use std::fmt; + +use crate::type_config::alias::LogIdOf; +use crate::RaftTypeConfig; + +/// The first and the last log id belonging to a Leader. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct LeaderLogIds { + first_last: Option<(LogIdOf, LogIdOf)>, +} + +impl fmt::Display for LeaderLogIds +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.first_last { + None => write!(f, "None"), + Some((first, last)) => write!(f, "({}, {})", first, last), + } + } +} + +impl LeaderLogIds +where C: RaftTypeConfig +{ + pub(crate) fn new(log_ids: Option<(LogIdOf, LogIdOf)>) -> Self { + Self { first_last: log_ids } + } + + /// Used only in tests + #[allow(dead_code)] + pub(crate) fn new1(log_id: LogIdOf) -> Self { + Self { + first_last: Some((log_id.clone(), log_id)), + } + } + + /// Used only in tests + #[allow(dead_code)] + pub(crate) fn new2(first: LogIdOf, last: LogIdOf) -> Self { + Self { + first_last: Some((first, last)), + } + } + + pub(crate) fn first(&self) -> Option<&LogIdOf> { + self.first_last.as_ref().map(|x| &x.0) + } + + pub(crate) fn last(&self) -> Option<&LogIdOf> { + self.first_last.as_ref().map(|x| &x.1) + } +} diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 258e446c3..21b97adce 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -1,7 +1,10 @@ +use crate::engine::leader_log_ids::LeaderLogIds; use crate::log_id::RaftLogId; use crate::storage::RaftLogReaderExt; +use crate::type_config::alias::LogIdOf; use crate::LogId; use crate::LogIdOptionExt; +use crate::RaftLogReader; use crate::RaftTypeConfig; use crate::StorageError; @@ -43,24 +46,15 @@ where C: RaftTypeConfig /// A-------B-------C : find(A,B); find(B,C) // both find `B`, need to de-dup /// A-------C-------C : find(A,C) /// ``` - pub(crate) async fn load_log_ids( - last_purged_log_id: Option>, - last_log_id: Option>, - sto: &mut LRX, - ) -> Result, StorageError> + pub(crate) async fn get_key_log_ids( + first: LogId, + last: LogId, + sto: &mut LR, + ) -> Result>, StorageError> where - LRX: RaftLogReaderExt, + LR: RaftLogReader + ?Sized, { - let mut res = vec![]; - - let last = match last_log_id { - None => return Ok(LogIdList::new(res)), - Some(x) => x, - }; - let first = match last_purged_log_id { - None => sto.get_log_id(0).await?, - Some(x) => x, - }; + let mut res: Vec> = vec![]; // Recursion stack let mut stack = vec![(first, last.clone())]; @@ -114,13 +108,16 @@ where C: RaftTypeConfig res.push(last); } - Ok(LogIdList::new(res)) + Ok(res) } } impl LogIdList where C: RaftTypeConfig { + /// Create a new `LogIdList`. + /// + /// It stores the last purged log id, and a series of key log ids. pub fn new(key_log_ids: impl IntoIterator>) -> Self { Self { key_log_ids: key_log_ids.into_iter().collect(), @@ -310,18 +307,20 @@ where C: RaftTypeConfig /// Note that the 0-th log does not belong to any leader(but a membership log to initialize a /// cluster) but this method does not differentiate between them. #[allow(dead_code)] - pub(crate) fn by_last_leader(&self) -> &[LogId] { + pub(crate) fn by_last_leader(&self) -> LeaderLogIds { let ks = &self.key_log_ids; let l = ks.len(); if l < 2 { - return ks; + let last = self.last(); + return LeaderLogIds::new(last.map(|x| (x.clone(), x.clone()))); } // There are at most two(adjacent) key log ids with the same leader_id if ks[l - 1].leader_id() == ks[l - 2].leader_id() { - &ks[l - 2..] + LeaderLogIds::new(Some((ks[l - 2].clone(), ks[l - 1].clone()))) } else { - &ks[l - 1..] + let last = self.last().cloned().unwrap(); + LeaderLogIds::new(Some((last.clone(), last))) } } } diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index d75828cf9..e2567c063 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -31,11 +31,12 @@ mod command_kind; mod engine_config; mod engine_impl; mod engine_output; -mod log_id_list; mod replication_progress; pub(crate) mod command; pub(crate) mod handler; +pub(crate) mod leader_log_ids; +pub(crate) mod log_id_list; pub(crate) mod time_state; #[cfg(test)] diff --git a/openraft/src/engine/tests/log_id_list_test.rs b/openraft/src/engine/tests/log_id_list_test.rs index 96e367c60..88e543f4e 100644 --- a/openraft/src/engine/tests/log_id_list_test.rs +++ b/openraft/src/engine/tests/log_id_list_test.rs @@ -1,3 +1,4 @@ +use crate::engine::leader_log_ids::LeaderLogIds; use crate::engine::testing::UTConfig; use crate::engine::LogIdList; use crate::testing::log_id; @@ -357,23 +358,29 @@ fn test_log_id_list_get_log_id() -> anyhow::Result<()> { fn test_log_id_list_by_last_leader() -> anyhow::Result<()> { // len == 0 let ids = LogIdList::::default(); - assert_eq!(ids.by_last_leader(), &[]); + assert_eq!(ids.by_last_leader(), LeaderLogIds::new(None)); // len == 1 let ids = LogIdList::::new([log_id(1, 1, 1)]); - assert_eq!(&[log_id(1, 1, 1)], ids.by_last_leader()); + assert_eq!(LeaderLogIds::new1(log_id(1, 1, 1)), ids.by_last_leader()); // len == 2, the last leader has only one log let ids = LogIdList::::new([log_id(1, 1, 1), log_id(3, 1, 3)]); - assert_eq!(&[log_id(3, 1, 3)], ids.by_last_leader()); + assert_eq!(LeaderLogIds::new1(log_id(3, 1, 3)), ids.by_last_leader()); // len == 2, the last leader has two logs let ids = LogIdList::::new([log_id(1, 1, 1), log_id(1, 1, 3)]); - assert_eq!(&[log_id(1, 1, 1), log_id(1, 1, 3)], ids.by_last_leader()); + assert_eq!( + LeaderLogIds::new2(log_id(1, 1, 1), log_id(1, 1, 3)), + ids.by_last_leader() + ); // len > 2, the last leader has only more than one logs let ids = LogIdList::::new([log_id(1, 1, 1), log_id(7, 1, 8), log_id(7, 1, 10)]); - assert_eq!(&[log_id(7, 1, 8), log_id(7, 1, 10)], ids.by_last_leader()); + assert_eq!( + LeaderLogIds::new2(log_id(7, 1, 8), log_id(7, 1, 10)), + ids.by_last_leader() + ); Ok(()) } diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 2ec282816..44e9bc2b4 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -2,6 +2,7 @@ use std::fmt; use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; +use crate::engine::leader_log_ids::LeaderLogIds; use crate::progress::Progress; use crate::progress::VecProgress; use crate::proposer::Leader; @@ -110,8 +111,13 @@ where vote.into_committed() }; - let last_leader_log_ids = self.last_log_id().cloned().into_iter().collect::>(); + // TODO: tricky: the new LeaderId is different from the last log id + // Thus only the last().index is used. + // Thus the first() is ignored. + // But we should not fake the first() there. + let last = self.last_log_id(); + let last_leader_log_ids = LeaderLogIds::new(last.map(|last| (last.clone(), last.clone()))); - Leader::new(vote, self.quorum_set.clone(), self.learner_ids, &last_leader_log_ids) + Leader::new(vote, self.quorum_set.clone(), self.learner_ids, last_leader_log_ids) } } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 843ad9c06..ddccc6f01 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -1,7 +1,7 @@ use std::fmt; use crate::display_ext::DisplayInstantExt; -use crate::display_ext::DisplaySliceExt; +use crate::engine::leader_log_ids::LeaderLogIds; use crate::progress::entry::ProgressEntry; use crate::progress::Progress; use crate::progress::VecProgress; @@ -82,19 +82,19 @@ where vote: CommittedVote, quorum_set: QS, learner_ids: impl IntoIterator, - last_leader_log_id: &[LogIdOf], + last_leader_log_id: LeaderLogIds, ) -> Self { debug_assert!( Some(vote.committed_leader_id()) >= last_leader_log_id.last().map(|x| x.committed_leader_id().clone()), "vote {} must GE last_leader_log_id.last() {}", vote, - last_leader_log_id.display() + last_leader_log_id ); debug_assert!( Some(vote.committed_leader_id()) >= last_leader_log_id.first().map(|x| x.committed_leader_id().clone()), "vote {} must GE last_leader_log_id.first() {}", vote, - last_leader_log_id.display() + last_leader_log_id ); let learner_ids = learner_ids.into_iter().collect::>(); @@ -222,6 +222,7 @@ where #[cfg(test)] mod tests { + use crate::engine::leader_log_ids::LeaderLogIds; use crate::engine::testing::UTConfig; use crate::entry::RaftEntry; use crate::progress::Progress; @@ -238,7 +239,12 @@ mod tests { tracing::info!("--- vote greater than last log id, create new noop_log_id"); { let vote = Vote::new(2, 2).into_committed(); - let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]); + let leader = Leader::::new( + vote, + vec![1, 2, 3], + vec![], + LeaderLogIds::new2(log_id(1, 2, 1), log_id(1, 2, 3)), + ); assert_eq!(leader.noop_log_id(), Some(&log_id(2, 2, 4))); assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3))); @@ -247,7 +253,12 @@ mod tests { tracing::info!("--- vote equals last log id, reuse noop_log_id"); { let vote = Vote::new(1, 2).into_committed(); - let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 1), log_id(1, 2, 3)]); + let leader = Leader::::new( + vote, + vec![1, 2, 3], + vec![], + LeaderLogIds::new2(log_id(1, 2, 1), log_id(1, 2, 3)), + ); assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 1))); assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3))); @@ -256,7 +267,7 @@ mod tests { tracing::info!("--- vote equals last log id, reuse noop_log_id, last_leader_log_id.len()==1"); { let vote = Vote::new(1, 2).into_committed(); - let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]); + let leader = Leader::::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new1(log_id(1, 2, 3))); assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 3))); assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 3))); @@ -265,7 +276,7 @@ mod tests { tracing::info!("--- no last log ids, create new noop_log_id, last_leader_log_id.len()==0"); { let vote = Vote::new(1, 2).into_committed(); - let leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[]); + let leader = Leader::::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new(None)); assert_eq!(leader.noop_log_id(), Some(&log_id(1, 2, 0))); assert_eq!(leader.last_log_id(), None); @@ -275,7 +286,7 @@ mod tests { #[test] fn test_leader_established() { let vote = Vote::new(2, 2).into_committed(); - let mut leader = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 2, 3)]); + let mut leader = Leader::::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new1(log_id(1, 2, 3))); let mut entries = vec![Entry::::new_blank(log_id(5, 5, 2))]; leader.assign_log_ids(&mut entries); @@ -291,7 +302,7 @@ mod tests { #[test] fn test_1_entry_none_last_log_id() { let vote = Vote::new(0, 0).into_committed(); - let mut leading = Leader::::new(vote, vec![1, 2, 3], vec![], &[]); + let mut leading = Leader::::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new(None)); let mut entries: Vec> = vec![blank_ent(1, 1, 1)]; leading.assign_log_ids(&mut entries); @@ -303,7 +314,7 @@ mod tests { #[test] fn test_no_entries_provided() { let vote = Vote::new(2, 2).into_committed(); - let mut leading = Leader::::new(vote, vec![1, 2, 3], vec![], &[log_id(1, 1, 8)]); + let mut leading = Leader::::new(vote, vec![1, 2, 3], vec![], LeaderLogIds::new1(log_id(1, 1, 8))); let mut entries: Vec> = vec![]; leading.assign_log_ids(&mut entries); @@ -313,7 +324,7 @@ mod tests { #[test] fn test_multiple_entries() { let vote = Vote::new(2, 2).into_committed(); - let mut leading = Leader::::new(vote, vec![1, 2, 3], [], &[log_id(1, 1, 8)]); + let mut leading = Leader::::new(vote, vec![1, 2, 3], [], LeaderLogIds::new1(log_id(1, 1, 8))); let mut entries: Vec> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)]; @@ -326,7 +337,12 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_voter() { - let mut leading = Leader::>::new(Vote::new(2, 1).into_committed(), vec![1, 2, 3], [4], &[]); + let mut leading = Leader::>::new( + Vote::new(2, 1).into_committed(), + vec![1, 2, 3], + [4], + LeaderLogIds::new(None), + ); let now1 = UTConfig::<()>::now(); @@ -337,7 +353,12 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_learner() { - let mut leading = Leader::>::new(Vote::new(2, 4).into_committed(), vec![1, 2, 3], [4], &[]); + let mut leading = Leader::>::new( + Vote::new(2, 4).into_committed(), + vec![1, 2, 3], + [4], + LeaderLogIds::new(None), + ); let t2 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); @@ -352,7 +373,12 @@ mod tests { #[test] fn test_leading_last_quorum_acked_time_leader_is_not_member() { - let mut leading = Leader::>::new(Vote::new(2, 5).into_committed(), vec![1, 2, 3], [4], &[]); + let mut leading = Leader::>::new( + Vote::new(2, 5).into_committed(), + vec![1, 2, 3], + [4], + LeaderLogIds::new(None), + ); let t2 = UTConfig::<()>::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index c50aa84a3..d7b79c95e 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -10,8 +10,10 @@ use crate::engine::LogIdList; use crate::entry::RaftPayload; use crate::log_id::RaftLogId; use crate::raft_state::IOState; +use crate::storage::log_reader_ext::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; +use crate::type_config::alias::LogIdOf; use crate::type_config::TypeConfigExt; use crate::utime::Leased; use crate::EffectiveMembership; @@ -120,7 +122,8 @@ where last_purged_log_id.display(), last_log_id.display() ); - let log_ids = LogIdList::load_log_ids(last_purged_log_id.clone(), last_log_id, &mut log_reader).await?; + + let log_id_list = self.get_key_log_ids(last_purged_log_id.clone(), last_log_id.clone()).await?; let snapshot = self.state_machine.get_current_snapshot().await?; @@ -162,7 +165,7 @@ where // before serving. vote: Leased::new(now, Duration::default(), vote), purged_next: last_purged_log_id.next_index(), - log_ids, + log_ids: log_id_list, membership_state: mem_state, snapshot_meta, @@ -309,4 +312,45 @@ where Ok(res) } + + // TODO: store purged: Option separately. + /// Get key-log-ids from the log store. + /// + /// Key-log-ids are the first log id of each Leader. + async fn get_key_log_ids( + &mut self, + purged: Option>, + last: Option>, + ) -> Result, StorageError> { + let mut log_reader = self.log_store.get_log_reader().await; + + let last = match last { + None => return Ok(LogIdList::new(vec![])), + Some(x) => x, + }; + + if purged.index() == Some(last.index) { + return Ok(LogIdList::new(vec![last])); + } + + let first = log_reader.get_log_id(purged.next_index()).await?; + + let mut log_ids = log_reader.get_key_log_ids(first, last).await?; + + if !log_ids.is_empty() { + if let Some(purged) = purged { + if purged.leader_id() == log_ids[0].leader_id() { + if log_ids.len() >= 2 { + log_ids[0] = purged; + } else { + log_ids.insert(0, purged); + } + } else { + log_ids.insert(0, purged); + } + } + } + + Ok(LogIdList::new(log_ids)) + } } diff --git a/openraft/src/storage/log_reader_ext.rs b/openraft/src/storage/log_reader_ext.rs index 137500dfa..55509b71b 100644 --- a/openraft/src/storage/log_reader_ext.rs +++ b/openraft/src/storage/log_reader_ext.rs @@ -37,6 +37,6 @@ where C: RaftTypeConfig impl RaftLogReaderExt for LR where C: RaftTypeConfig, - LR: RaftLogReader, + LR: RaftLogReader + ?Sized, { } diff --git a/openraft/src/storage/v2/raft_log_reader.rs b/openraft/src/storage/v2/raft_log_reader.rs index 09708b4f5..6b936b140 100644 --- a/openraft/src/storage/v2/raft_log_reader.rs +++ b/openraft/src/storage/v2/raft_log_reader.rs @@ -4,6 +4,8 @@ use std::ops::RangeBounds; use openraft_macros::add_async_trait; use openraft_macros::since; +use crate::engine::LogIdList; +use crate::LogId; use crate::OptionalSend; use crate::OptionalSync; use crate::RaftTypeConfig; @@ -62,4 +64,51 @@ where C: RaftTypeConfig async fn limited_get_log_entries(&mut self, start: u64, end: u64) -> Result, StorageError> { self.try_get_log_entries(start..end).await } + + /// Retrieves a list of key log ids that mark the beginning of each Leader. + /// + /// This method returns log entries that represent leadership transitions in the log history, + /// including: + /// - The first log entry in the storage (regardless of Leader); + /// - The first log entry from each Leader; + /// - The last log entry in the storage (regardless of Leader); + /// + /// # Example + /// + /// Given: + /// Log entries: `[(2,2), (2,3), (5,4), (5,5)]` (format: `(term, index)`) + /// + /// Returns: `[(2,2), (5,4), (5,5)]` + /// + /// # Usage + /// + /// This method is called only during node startup to build an initial log index. + /// + /// # Implementation Notes + /// + /// - Optional method: If your [`RaftLogStorage`] implementation doesn't maintain this + /// information, do not implement it and use the default implementation. + /// - Default implementation: Uses a binary search algorithm to find key log entries + /// - Time complexity: `O(k * log(n))` where: + /// - `k` = average number of unique Leaders + /// - `n` = average number of logs per Leader + /// + /// # Arguments + /// + /// - `first`: the first log id to return. + /// - `last`: the last log id to return. + /// + /// # Returns + /// + /// Returns a vector of log entries marking leadership transitions and boundaries. + /// + /// [`RaftLogStorage`]: crate::storage::RaftLogStorage + #[since(version = "0.10.0")] + async fn get_key_log_ids( + &mut self, + first: LogId, + last: LogId, + ) -> Result>, StorageError> { + LogIdList::get_key_log_ids(first, last, self).await + } } diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index a5771273c..0b50b47c9 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -719,7 +719,28 @@ where ); } - tracing::info!("--- log terms: [x,x,x,x,x,x,x], last_purged_log_id: (3,6), e.g., all purged expect [(3,6)]"); + tracing::info!("--- (case: purge(T1),T2,T2) log terms: [x,x,x,x,x,3,3], last_purged_log_id: (2,4), expect [(2,4),(3,5),(3,6)]"); + { + store.purge(log_id(2, 0, 4)).await?; + + let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; + assert_eq!( + vec![log_id(2, 0, 4), log_id(3, 0, 5), log_id(3, 0, 6)], + initial.log_ids.key_log_ids() + ); + } + + tracing::info!( + "--- (case: purge(T2),T2) log terms: [x,x,x,x,x,x,3], last_purged_log_id: (3,5), expect [(3,5),(3,6)]" + ); + { + store.purge(log_id(3, 0, 5)).await?; + + let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; + assert_eq!(vec![log_id(3, 0, 5), log_id(3, 0, 6)], initial.log_ids.key_log_ids()); + } + + tracing::info!("--- (case: purge(T2)) log terms: [x,x,x,x,x,x,x], last_purged_log_id: (3,6), e.g., all purged expect [(3,6)]"); { store.purge(log_id(3, 0, 6)).await?; @@ -727,6 +748,16 @@ where assert_eq!(vec![log_id(3, 0, 6)], initial.log_ids.key_log_ids()); } + tracing::info!( + "--- (case: purge(T2),T2,T2) log terms: [x,x,x,x,x,x,x,3,3], last_purged_log_id: (3,6), e.g., all purged expect [(3,6),(3,8)]" + ); + { + append(&mut store, [blank_ent_0::(3, 7), blank_ent_0::(3, 8)]).await?; + + let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; + assert_eq!(vec![log_id(3, 0, 6), log_id(3, 0, 8)], initial.log_ids.key_log_ids()); + } + Ok(()) }