From c1aa1b5ab551877ffc40975c4e3d3d376309524b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 10 Feb 2024 22:26:43 +0800 Subject: [PATCH] Feature: add `Raft::install_complete_snapshot()` to install a snapshot Using this method, the application provides a full snapshot to Openraft, which is then used to install and replace the state machine. It is entirely the responsibility of the application to acquire a snapshot through any means: be it in chunks, as a stream, or via shared storage solutions like S3. This method necessitates that the caller supplies a valid `Vote` to confirm the legitimacy of the leader, mirroring the requirements of other Raft protocol APIs such as `append_entries` and `vote`. - Part of #606 --- openraft/src/core/raft_core.rs | 3 + openraft/src/core/raft_msg/mod.rs | 11 ++ openraft/src/core/sm/command.rs | 24 +++- openraft/src/core/sm/mod.rs | 11 ++ openraft/src/engine/command.rs | 2 + openraft/src/engine/engine_impl.rs | 37 ++++++ .../engine/handler/following_handler/mod.rs | 54 +++++++++ openraft/src/raft/mod.rs | 18 +++ openraft/src/storage/mod.rs | 10 +- tests/tests/client_api/main.rs | 1 + .../t13_install_complete_snapshot.rs | 108 ++++++++++++++++++ 11 files changed, 275 insertions(+), 4 deletions(-) create mode 100644 tests/tests/client_api/t13_install_complete_snapshot.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index a27531293..f848b8c96 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1134,6 +1134,9 @@ where self.handle_install_snapshot_request(rpc, tx); } + RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx } => { + self.engine.handle_install_complete_snapshot(vote, snapshot, tx); + } RaftMsg::CheckIsLeaderRequest { tx } => { if self.engine.state.is_leader(&self.engine.config.id) { self.handle_check_is_leader_request(tx).await; diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 8af1e9fcb..3a935bf8d 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -22,6 +22,8 @@ use crate::type_config::alias::NodeOf; use crate::ChangeMembers; use crate::MessageSummary; use crate::RaftTypeConfig; +use crate::Snapshot; +use crate::Vote; pub(crate) mod external_command; @@ -67,6 +69,12 @@ where C: RaftTypeConfig tx: InstallSnapshotTx, }, + InstallCompleteSnapshot { + vote: Vote, + snapshot: Snapshot, + tx: ResultSender>, + }, + ClientWriteRequest { app_data: C::D, tx: ClientWriteTx, @@ -114,6 +122,9 @@ where C: RaftTypeConfig RaftMsg::InstallSnapshot { rpc, .. } => { format!("InstallSnapshot: {}", rpc.summary()) } + RaftMsg::InstallCompleteSnapshot { vote, snapshot, .. } => { + format!("InstallCompleteSnapshot: vote: {}, snapshot: {}", vote, snapshot) + } RaftMsg::ClientWriteRequest { .. } => "ClientWriteRequest".to_string(), RaftMsg::CheckIsLeaderRequest { .. } => "CheckIsLeaderRequest".to_string(), RaftMsg::Initialize { members, .. } => { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index b003058b1..6e9c0a28b 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -74,6 +74,11 @@ where C: RaftTypeConfig Command::new(payload) } + pub(crate) fn install_complete_snapshot(snapshot: Snapshot) -> Self { + let payload = CommandPayload::InstallCompleteSnapshot { snapshot }; + Command::new(payload) + } + pub(crate) fn cancel_snapshot(snapshot_meta: SnapshotMeta) -> Self { let payload = CommandPayload::FinalizeSnapshot { install: false, @@ -103,14 +108,18 @@ where C: RaftTypeConfig BuildSnapshot, /// Get the latest built snapshot. - GetSnapshot { tx: ResultSender>> }, + GetSnapshot { + tx: ResultSender>>, + }, /// Receive a chunk of snapshot. /// /// If it is the final chunk, the snapshot stream will be closed and saved. /// /// Installing a snapshot includes two steps: ReceiveSnapshotChunk and FinalizeSnapshot. - ReceiveSnapshotChunk { req: InstallSnapshotRequest }, + ReceiveSnapshotChunk { + req: InstallSnapshotRequest, + }, /// After receiving all chunks, finalize the snapshot by installing it or discarding it, /// if the snapshot is stale(the snapshot last log id is smaller than the local committed). @@ -120,8 +129,14 @@ where C: RaftTypeConfig snapshot_meta: SnapshotMeta, }, + InstallCompleteSnapshot { + snapshot: Snapshot, + }, + /// Apply the log entries to the state machine. - Apply { entries: Vec }, + Apply { + entries: Vec, + }, } impl Debug for CommandPayload @@ -137,6 +152,9 @@ where C: RaftTypeConfig CommandPayload::FinalizeSnapshot { install, snapshot_meta } => { write!(f, "FinalizeSnapshot: install:{} {:?}", install, snapshot_meta) } + CommandPayload::InstallCompleteSnapshot { snapshot } => { + write!(f, "InstallCompleteSnapshot: meta: {:?}", snapshot.meta) + } CommandPayload::Apply { entries } => write!(f, "Apply: {}", DisplaySlice::<_>(entries)), } } diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index 7d3fd885f..6e94587ec 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -166,6 +166,17 @@ where let res = CommandResult::new(cmd.seq, Ok(Response::InstallSnapshot(resp))); let _ = self.resp_tx.send(Notify::sm(res)); } + CommandPayload::InstallCompleteSnapshot { snapshot } => { + tracing::info!("{}: install complete snapshot", func_name!()); + + let meta = snapshot.meta.clone(); + self.state_machine.install_snapshot(&meta, snapshot.snapshot).await?; + + tracing::info!("Done install complete snapshot, meta: {}", meta); + + let res = CommandResult::new(cmd.seq, Ok(Response::InstallSnapshot(Some(meta)))); + let _ = self.resp_tx.send(Notify::sm(res)); + } CommandPayload::Apply { entries } => { let resp = self.apply(entries).await?; let res = CommandResult::new(cmd.seq, Ok(Response::Apply(resp))); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 72e5fb75e..10056b20c 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -229,6 +229,7 @@ where AppendEntries(ValueSender, Infallible>>), ReceiveSnapshotChunk(ValueSender>), InstallSnapshot(ValueSender, InstallSnapshotError>>), + InstallCompleteSnapshot(ValueSender, Infallible>>), Initialize(ValueSender>>), } @@ -251,6 +252,7 @@ where Respond::AppendEntries(x) => x.send(), Respond::ReceiveSnapshotChunk(x) => x.send(), Respond::InstallSnapshot(x) => x.send(), + Respond::InstallCompleteSnapshot(x) => x.send(), Respond::Initialize(x) => x.send(), } } diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index d23afc89a..3744555b5 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -45,6 +45,7 @@ use crate::LogIdOptionExt; use crate::Membership; use crate::RaftLogId; use crate::RaftTypeConfig; +use crate::Snapshot; use crate::SnapshotMeta; use crate::Vote; @@ -479,6 +480,42 @@ where C: RaftTypeConfig Some(()) } + /// Install a completely received snapshot on a follower. + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) fn handle_install_complete_snapshot( + &mut self, + vote: Vote, + snapshot: Snapshot, + tx: ResultSender>, + ) { + tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!()); + + let vote_res = self.vote_handler().accept_vote(&vote, tx, |state, _rejected| { + Ok(InstallSnapshotResponse { + vote: *state.vote_ref(), + }) + }); + + let Some(tx) = vote_res else { + return; + }; + + let mut fh = self.following_handler(); + fh.install_complete_snapshot(snapshot); + let res = Ok(InstallSnapshotResponse { + vote: *self.state.vote_ref(), + }); + + self.output.push_command(Command::Respond { + // When there is an error, there may still be queued IO, we need to run them before sending back + // response. + when: Some(Condition::StateMachineCommand { + command_seq: self.output.last_sm_seq(), + }), + resp: Respond::new(res, tx), + }); + } + #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn install_snapshot(&mut self, req: InstallSnapshotRequest) -> Result<(), InstallSnapshotError> { tracing::info!(req = display(req.summary()), "{}", func_name!()); diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 9e2cef7df..7adcca828 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -24,6 +24,7 @@ use crate::MessageSummary; use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; +use crate::Snapshot; use crate::SnapshotMeta; use crate::SnapshotSegmentId; use crate::StoredMembership; @@ -347,6 +348,59 @@ where C: RaftTypeConfig self.log_handler().purge_log(); } + /// Follower/Learner handles install-full-snapshot. + /// + /// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication) + /// for the reason the following workflow is needed. + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) fn install_complete_snapshot(&mut self, snapshot: Snapshot) { + let meta = &snapshot.meta; + tracing::info!("install_complete_snapshot: meta:{:?}", meta); + + let snap_last_log_id = meta.last_log_id; + + if snap_last_log_id.as_ref() <= self.state.committed() { + tracing::info!( + "No need to install snapshot; snapshot last_log_id({}) <= committed({})", + snap_last_log_id.summary(), + self.state.committed().summary() + ); + return; + } + + // snapshot_last_log_id can not be None + let snap_last_log_id = snap_last_log_id.unwrap(); + + // 1. Truncate all logs if conflict. + // 2. Install snapshot. + // 3. Purge logs the snapshot covers. + + let mut snap_handler = self.snapshot_handler(); + let updated = snap_handler.update_snapshot(meta.clone()); + if !updated { + return; + } + + let local = self.state.get_log_id(snap_last_log_id.index); + if let Some(local) = local { + if local != snap_last_log_id { + // Conflict, delete all non-committed logs. + self.truncate_logs(self.state.committed().next_index()); + } + } + + self.state.update_accepted(Some(snap_last_log_id)); + self.state.committed = Some(snap_last_log_id); + self.update_committed_membership(EffectiveMembership::new_from_stored_membership( + meta.last_membership.clone(), + )); + + self.output.push_command(Command::from(sm::Command::install_complete_snapshot(snapshot))); + + self.state.purge_upto = Some(snap_last_log_id); + self.log_handler().purge_log(); + } + /// Find the last 2 membership entries in a list of entries. /// /// A follower/learner reverts the effective membership to the previous one, diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 3eaab3b7f..e09bdda42 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -72,6 +72,7 @@ use crate::RaftState; pub use crate::RaftTypeConfig; use crate::Snapshot; use crate::StorageHelper; +use crate::Vote; /// Define types for a Raft type configuration. /// @@ -361,6 +362,23 @@ where C: RaftTypeConfig self.call_core(RaftMsg::ExternalCommand { cmd }, rx).await } + /// Install a completely received snapshot to the state machine. + /// + /// This method is used to implement a totally application defined snapshot transmission. + /// The application receives a snapshot from the leader, in chunks or a stream, and + /// then rebuild a snapshot, then pass the snapshot to Raft to install. + #[tracing::instrument(level = "debug", skip_all)] + pub async fn install_complete_snapshot( + &self, + vote: Vote, + snapshot: Snapshot, + ) -> Result, RaftError> { + tracing::debug!("Raft::install_complete_snapshot()"); + + let (tx, rx) = oneshot::channel(); + self.call_core(RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx }, rx).await + } + /// Submit an InstallSnapshot RPC to this Raft node. /// /// These RPCs are sent by the cluster leader in order to bring a new node or a slow node diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index 891148757..f17fcb1ef 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -99,7 +99,7 @@ where } /// The data associated with the current snapshot. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Snapshot where C: RaftTypeConfig { @@ -118,6 +118,14 @@ where C: RaftTypeConfig } } +impl fmt::Display for Snapshot +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Snapshot{{meta: {}}}", self.meta) + } +} + /// The state about logs. /// /// Invariance: last_purged_log_id <= last_applied <= last_log_id diff --git a/tests/tests/client_api/main.rs b/tests/tests/client_api/main.rs index 128ce2c65..3a81b34dc 100644 --- a/tests/tests/client_api/main.rs +++ b/tests/tests/client_api/main.rs @@ -11,6 +11,7 @@ mod t10_client_writes; mod t11_client_reads; mod t12_trigger_purge_log; mod t13_get_snapshot; +mod t13_install_complete_snapshot; mod t13_trigger_snapshot; mod t16_with_raft_state; mod t50_lagging_network_write; diff --git a/tests/tests/client_api/t13_install_complete_snapshot.rs b/tests/tests/client_api/t13_install_complete_snapshot.rs new file mode 100644 index 000000000..a647839ba --- /dev/null +++ b/tests/tests/client_api/t13_install_complete_snapshot.rs @@ -0,0 +1,108 @@ +use std::sync::Arc; +use std::time::Duration; + +use maplit::btreeset; +use openraft::testing::log_id; +use openraft::Config; +use openraft::Vote; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn install_complete_snapshot() -> anyhow::Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + enable_elect: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + tracing::info!(log_index, "--- isolate node 2 so that it can receive snapshot"); + router.set_unreachable(2, true); + + tracing::info!(log_index, "--- write to make node-0,1 have more logs"); + { + log_index += router.client_request_many(0, "foo", 3).await?; + router.wait(&0, timeout()).applied_index(Some(log_index), "write more log").await?; + router.wait(&1, timeout()).applied_index(Some(log_index), "write more log").await?; + } + + let snap; + + tracing::info!(log_index, "--- trigger and get snapshot from node-0"); + { + let n0 = router.get_raft_handle(&0)?; + n0.trigger().snapshot().await?; + + router.wait(&0, timeout()).snapshot(log_id(1, 0, log_index), "node-1 snapshot").await?; + + snap = n0.get_snapshot().await?.unwrap(); + } + + tracing::info!(log_index, "--- fails to install snapshot with smaller vote"); + { + let n1 = router.get_raft_handle(&1)?; + + let resp = n1.install_complete_snapshot(Vote::new(0, 0), snap.clone()).await?; + assert_eq!( + Vote::new_committed(1, 0), + resp.vote, + "node-1 vote is higher, and is returned" + ); + n1.with_raft_state(|state| { + assert_eq!( + None, state.snapshot_meta.last_log_id, + "node-1 snapshot is not installed" + ); + }) + .await?; + } + + tracing::info!( + log_index, + "--- no install snapshot on node-1 because of snapshot last log id equals committed" + ); + { + let n1 = router.get_raft_handle(&1)?; + + let resp = n1.install_complete_snapshot(Vote::new_committed(1, 0), snap.clone()).await?; + assert_eq!(Vote::new_committed(1, 0), resp.vote,); + n1.with_raft_state(move |state| { + assert_eq!( + None, state.snapshot_meta.last_log_id, + "node-1 snapshot is not installed" + ); + }) + .await?; + } + + tracing::info!(log_index, "--- succeed to install snapshot on node-2 with higher vote"); + { + let n2 = router.get_raft_handle(&2)?; + + let resp = n2.install_complete_snapshot(Vote::new_committed(1, 0), snap.clone()).await?; + assert_eq!(Vote::new_committed(1, 0), resp.vote,); + n2.with_raft_state(move |state| { + assert_eq!( + Some(log_id(1, 0, log_index)), + state.snapshot_meta.last_log_id, + "node-1 snapshot is installed" + ); + }) + .await?; + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +}