diff --git a/examples/raft-kv-memstore-generic-snapshot-data/README.md b/examples/raft-kv-memstore-generic-snapshot-data/README.md index da9d63e07..ad1f1ef03 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/README.md +++ b/examples/raft-kv-memstore-generic-snapshot-data/README.md @@ -7,7 +7,7 @@ This example is similar to the basic raft-kv-memstore example but focuses on how to handle snapshot with `generic-snapshot-data` enabled. Other aspects are minimized. -To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example. +To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example. To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs index f3e610f6a..89394adc5 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs @@ -60,7 +60,7 @@ pub async fn snapshot(app: &mut App, req: String) -> String { }; let res = app .raft - .install_complete_snapshot(vote, snapshot) + .install_full_snapshot(vote, snapshot) .await .map_err(typ::RaftError::::Fatal); encode(res) diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs index 7d1d59a02..20842ac8c 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs @@ -37,9 +37,10 @@ impl RaftNetworkFactory for Router { } impl RaftNetwork for Connection { - async fn send_append_entries( + async fn append_entries( &mut self, req: AppendEntriesRequest, + _option: RPCOption, ) -> Result, typ::RPCError> { let resp = self .router @@ -50,7 +51,7 @@ impl RaftNetwork for Connection { } /// A real application should replace this method with customized implementation. - async fn snapshot( + async fn full_snapshot( &mut self, vote: Vote, snapshot: Snapshot, @@ -65,7 +66,11 @@ impl RaftNetwork for Connection { Ok(resp) } - async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + async fn vote( + &mut self, + req: VoteRequest, + _option: RPCOption, + ) -> Result, typ::RPCError> { let resp = self .router .send(self.target, "/raft/vote", req) diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs index 9f14ee922..8affab779 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs @@ -37,9 +37,9 @@ pub fn log_panic(panic: &PanicInfo) { /// /// - Setup a single node cluster, write some logs, take a snapshot; /// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API: -/// - The sending end sends snapshot with `RaftNetwork::snapshot()`; +/// - The sending end sends snapshot with `RaftNetwork::full_snapshot()`; /// - The receiving end deliver the received snapshot to `Raft` with -/// `Raft::install_complete_snapshot()`. +/// `Raft::install_full_snapshot()`. #[tokio::test] async fn test_cluster() { std::panic::set_hook(Box::new(|panic| { diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml index 8683d818e..8a97ff0fa 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml +++ b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml @@ -17,6 +17,7 @@ license = "MIT OR Apache-2.0" repository = "https://github.com/datafuselabs/openraft" [dependencies] +memstore = { path = "../memstore", features = [] } openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] } serde = { version = "1.0.114", features = ["derive"] } diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/README.md b/examples/raft-kv-memstore-opendal-snapshot-data/README.md index 41fb2ce5a..e871f2f18 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/README.md +++ b/examples/raft-kv-memstore-opendal-snapshot-data/README.md @@ -9,7 +9,7 @@ This example is similar to the basic raft-kv-memstore example but focuses on how to store and fetch snapshot data from remote storage. Other aspects are minimized. -To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example. +To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example. To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs index f3e610f6a..89394adc5 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs @@ -60,7 +60,7 @@ pub async fn snapshot(app: &mut App, req: String) -> String { }; let res = app .raft - .install_complete_snapshot(vote, snapshot) + .install_full_snapshot(vote, snapshot) .await .map_err(typ::RaftError::::Fatal); encode(res) diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs index 069273af2..c37bebfcb 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs @@ -90,7 +90,7 @@ pub async fn new_raft(node_id: NodeId, router: Router, op: Operator) -> (typ::Ra let config = Arc::new(config.validate().unwrap()); // Create a instance of where the Raft logs will be stored. - let log_store = Arc::new(LogStore::default()); + let log_store = LogStore::default(); // Create a instance of where the state machine data will be stored. let state_machine_store = Arc::new(StateMachineStore::new(op.clone())); diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs index 7d1d59a02..20842ac8c 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs @@ -37,9 +37,10 @@ impl RaftNetworkFactory for Router { } impl RaftNetwork for Connection { - async fn send_append_entries( + async fn append_entries( &mut self, req: AppendEntriesRequest, + _option: RPCOption, ) -> Result, typ::RPCError> { let resp = self .router @@ -50,7 +51,7 @@ impl RaftNetwork for Connection { } /// A real application should replace this method with customized implementation. - async fn snapshot( + async fn full_snapshot( &mut self, vote: Vote, snapshot: Snapshot, @@ -65,7 +66,11 @@ impl RaftNetwork for Connection { Ok(resp) } - async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + async fn vote( + &mut self, + req: VoteRequest, + _option: RPCOption, + ) -> Result, typ::RPCError> { let resp = self .router .send(self.target, "/raft/vote", req) diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs index da907289d..fee80b0f7 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs @@ -1,26 +1,20 @@ use std::collections::BTreeMap; use std::fmt::Debug; -use std::ops::RangeBounds; use std::sync::Arc; use std::sync::Mutex; use opendal::Operator; -use openraft::storage::LogFlushed; -use openraft::storage::LogState; -use openraft::storage::RaftLogStorage; use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; use openraft::BasicNode; use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; -use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StoredMembership; -use openraft::Vote; use serde::Deserialize; use serde::Serialize; @@ -30,6 +24,8 @@ use crate::typ; use crate::NodeId; use crate::TypeConfig; +pub type LogStore = memstore::LogStore; + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum Request { Set { key: String, value: String }, @@ -95,30 +91,6 @@ impl StateMachineStore { } } -#[derive(Debug, Default)] -pub struct LogStore { - last_purged_log_id: Mutex>>, - - /// The Raft log. - log: Mutex>>, - - committed: Mutex>>, - - /// The current granted vote. - vote: Mutex>>, -} - -impl RaftLogReader for Arc { - async fn try_get_log_entries + Clone + Debug>( - &mut self, - range: RB, - ) -> Result>, StorageError> { - let log = self.log.lock().unwrap(); - let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); - Ok(response) - } -} - impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn build_snapshot(&mut self) -> Result, StorageError> { @@ -269,104 +241,3 @@ impl RaftStateMachine for Arc { self.clone() } } - -impl RaftLogStorage for Arc { - type LogReader = Self; - - async fn get_log_state(&mut self) -> Result, StorageError> { - let log = self.log.lock().unwrap(); - let last = log.iter().next_back().map(|(_, ent)| ent.log_id); - - let last_purged = *self.last_purged_log_id.lock().unwrap(); - - let last = match last { - None => last_purged, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last, - }) - } - - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { - let mut c = self.committed.lock().unwrap(); - *c = committed; - Ok(()) - } - - async fn read_committed(&mut self) -> Result>, StorageError> { - let committed = self.committed.lock().unwrap(); - Ok(*committed) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { - let mut v = self.vote.lock().unwrap(); - *v = Some(*vote); - Ok(()) - } - - async fn read_vote(&mut self) -> Result>, StorageError> { - Ok(*self.vote.lock().unwrap()) - } - - #[tracing::instrument(level = "trace", skip(self, entries, callback))] - async fn append( - &mut self, - entries: I, - callback: LogFlushed<::AsyncRuntime, NodeId>, - ) -> Result<(), StorageError> - where - I: IntoIterator>, - { - // Simple implementation that calls the flush-before-return `append_to_log`. - let mut log = self.log.lock().unwrap(); - for entry in entries { - log.insert(entry.log_id.index, entry); - } - callback.log_io_completed(Ok(())); - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: [{:?}, +oo)", log_id); - - let mut log = self.log.lock().unwrap(); - let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: (-oo, {:?}]", log_id); - - { - let mut ld = self.last_purged_log_id.lock().unwrap(); - assert!(*ld <= Some(log_id)); - *ld = Some(log_id); - } - - { - let mut log = self.log.lock().unwrap(); - - let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - } - - Ok(()) - } - - async fn get_log_reader(&mut self) -> Self::LogReader { - self.clone() - } -} diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs index c9d576ab6..87682a428 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs @@ -38,9 +38,9 @@ pub fn log_panic(panic: &PanicInfo) { /// /// - Setup a single node cluster, write some logs, take a snapshot; /// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API: -/// - The sending end sends snapshot with `RaftNetwork::snapshot()`; +/// - The sending end sends snapshot with `RaftNetwork::full_snapshot()`; /// - The receiving end deliver the received snapshot to `Raft` with -/// `Raft::install_complete_snapshot()`. +/// `Raft::install_full_snapshot()`. #[tokio::test] async fn test_cluster() { std::panic::set_hook(Box::new(|panic| { diff --git a/examples/raft-kv-memstore-singlethreaded/src/network.rs b/examples/raft-kv-memstore-singlethreaded/src/network.rs index 87f653f14..c7c7e39fa 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/network.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/network.rs @@ -1,5 +1,6 @@ use openraft::error::InstallSnapshotError; use openraft::error::RemoteError; +use openraft::network::RPCOption; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; use openraft::raft::InstallSnapshotRequest; @@ -32,9 +33,10 @@ impl RaftNetworkFactory for Router { } impl RaftNetwork for Connection { - async fn send_append_entries( + async fn append_entries( &mut self, req: AppendEntriesRequest, + _option: RPCOption, ) -> Result, typ::RPCError> { let resp = self .router @@ -44,9 +46,10 @@ impl RaftNetwork for Connection { Ok(resp) } - async fn send_install_snapshot( + async fn install_snapshot( &mut self, req: InstallSnapshotRequest, + _option: RPCOption, ) -> Result, typ::RPCError> { let resp = self .router @@ -56,7 +59,11 @@ impl RaftNetwork for Connection { Ok(resp) } - async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + async fn vote( + &mut self, + req: VoteRequest, + _option: RPCOption, + ) -> Result, typ::RPCError> { let resp = self .router .send(self.target, "/raft/vote", req) diff --git a/examples/raft-kv-memstore/src/network/raft_network_impl.rs b/examples/raft-kv-memstore/src/network/raft_network_impl.rs index cf2ab8707..db76ed109 100644 --- a/examples/raft-kv-memstore/src/network/raft_network_impl.rs +++ b/examples/raft-kv-memstore/src/network/raft_network_impl.rs @@ -1,6 +1,7 @@ use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; use openraft::error::RemoteError; +use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; @@ -77,21 +78,27 @@ pub struct NetworkConnection { } impl RaftNetwork for NetworkConnection { - async fn send_append_entries( + async fn append_entries( &mut self, req: AppendEntriesRequest, + _option: RPCOption, ) -> Result, typ::RPCError> { self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await } - async fn send_install_snapshot( + async fn install_snapshot( &mut self, req: InstallSnapshotRequest, + _option: RPCOption, ) -> Result, typ::RPCError> { self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await } - async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + async fn vote( + &mut self, + req: VoteRequest, + _option: RPCOption, + ) -> Result, typ::RPCError> { self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await } } diff --git a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs index a3cc85f55..40feacdcc 100644 --- a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs +++ b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs @@ -6,6 +6,7 @@ use openraft::error::NetworkError; use openraft::error::RPCError; use openraft::error::RaftError; use openraft::error::RemoteError; +use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; @@ -99,7 +100,7 @@ fn to_error(e: toy_rpc::Error, target: N // 99 | ) -> Result, RPCError>> // { // | ___________________________________________________________________________________________^ -// 100 | | tracing::debug!(req = debug(&req), "send_append_entries"); +// 100 | | tracing::debug!(req = debug(&req), "append_entries"); // 101 | | // 102 | | let c = self.c().await?; // ... | @@ -112,11 +113,12 @@ fn to_error(e: toy_rpc::Error, target: N #[allow(clippy::blocks_in_conditions)] impl RaftNetwork for NetworkConnection { #[tracing::instrument(level = "debug", skip_all, err(Debug))] - async fn send_append_entries( + async fn append_entries( &mut self, req: AppendEntriesRequest, + _option: RPCOption, ) -> Result, RPCError>> { - tracing::debug!(req = debug(&req), "send_append_entries"); + tracing::debug!(req = debug(&req), "append_entries"); let c = self.c().await?; tracing::debug!("got connection"); @@ -128,20 +130,22 @@ impl RaftNetwork for NetworkConnection { } #[tracing::instrument(level = "debug", skip_all, err(Debug))] - async fn send_install_snapshot( + async fn install_snapshot( &mut self, req: InstallSnapshotRequest, + _option: RPCOption, ) -> Result, RPCError>> { - tracing::debug!(req = debug(&req), "send_install_snapshot"); + tracing::debug!(req = debug(&req), "install_snapshot"); self.c().await?.raft().snapshot(req).await.map_err(|e| to_error(e, self.target)) } #[tracing::instrument(level = "debug", skip_all, err(Debug))] - async fn send_vote( + async fn vote( &mut self, req: VoteRequest, + _option: RPCOption, ) -> Result, RPCError>> { - tracing::debug!(req = debug(&req), "send_vote"); + tracing::debug!(req = debug(&req), "vote"); self.c().await?.raft().vote(req).await.map_err(|e| to_error(e, self.target)) } } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 95606f196..629126cac 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1132,8 +1132,8 @@ where RaftMsg::BeginReceivingSnapshot { vote, tx } => { self.engine.handle_begin_receiving_snapshot(vote, tx); } - RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx } => { - self.engine.handle_install_complete_snapshot(vote, snapshot, tx); + RaftMsg::InstallFullSnapshot { vote, snapshot, tx } => { + self.engine.handle_install_full_snapshot(vote, snapshot, tx); } RaftMsg::CheckIsLeaderRequest { tx } => { if self.engine.state.is_leader(&self.engine.config.id) { diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 5593a3dd0..39871d134 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -65,7 +65,7 @@ where C: RaftTypeConfig tx: VoteTx, C::NodeId>, }, - InstallCompleteSnapshot { + InstallFullSnapshot { vote: Vote, snapshot: Snapshot, tx: ResultSender, SnapshotResponse>, @@ -128,8 +128,8 @@ where C: RaftTypeConfig RaftMsg::BeginReceivingSnapshot { vote, .. } => { format!("BeginReceivingSnapshot: vote: {}", vote) } - RaftMsg::InstallCompleteSnapshot { vote, snapshot, .. } => { - format!("InstallCompleteSnapshot: vote: {}, snapshot: {}", vote, snapshot) + RaftMsg::InstallFullSnapshot { vote, snapshot, .. } => { + format!("InstallFullSnapshot: vote: {}, snapshot: {}", vote, snapshot) } RaftMsg::ClientWriteRequest { .. } => "ClientWriteRequest".to_string(), RaftMsg::CheckIsLeaderRequest { .. } => "CheckIsLeaderRequest".to_string(), diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index bed676629..5930322cf 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -66,8 +66,8 @@ where C: RaftTypeConfig Command::new(payload) } - pub(crate) fn install_complete_snapshot(snapshot: Snapshot) -> Self { - let payload = CommandPayload::InstallCompleteSnapshot { snapshot }; + pub(crate) fn install_full_snapshot(snapshot: Snapshot) -> Self { + let payload = CommandPayload::InstallFullSnapshot { snapshot }; Command::new(payload) } @@ -100,7 +100,7 @@ where C: RaftTypeConfig tx: ResultSender>, HigherVote>, }, - InstallCompleteSnapshot { + InstallFullSnapshot { snapshot: Snapshot, }, @@ -117,8 +117,8 @@ where C: RaftTypeConfig match self { CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"), CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"), - CommandPayload::InstallCompleteSnapshot { snapshot } => { - write!(f, "InstallCompleteSnapshot: meta: {:?}", snapshot.meta) + CommandPayload::InstallFullSnapshot { snapshot } => { + write!(f, "InstallFullSnapshot: meta: {:?}", snapshot.meta) } CommandPayload::BeginReceivingSnapshot { .. } => { write!(f, "BeginReceivingSnapshot") @@ -138,8 +138,8 @@ where C: RaftTypeConfig (CommandPayload::GetSnapshot { .. }, CommandPayload::GetSnapshot { .. }) => true, (CommandPayload::BeginReceivingSnapshot { .. }, CommandPayload::BeginReceivingSnapshot { .. }) => true, ( - CommandPayload::InstallCompleteSnapshot { snapshot: s1 }, - CommandPayload::InstallCompleteSnapshot { snapshot: s2 }, + CommandPayload::InstallFullSnapshot { snapshot: s1 }, + CommandPayload::InstallFullSnapshot { snapshot: s2 }, ) => s1.meta == s2.meta, (CommandPayload::Apply { entries: entries1 }, CommandPayload::Apply { entries: entries2 }) => { // Entry may not be `Eq`, we just compare log id. diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index d7f48c49e..78692202b 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -126,7 +126,7 @@ where self.get_snapshot(tx).await?; // GetSnapshot does not respond to RaftCore } - CommandPayload::InstallCompleteSnapshot { snapshot } => { + CommandPayload::InstallFullSnapshot { snapshot } => { tracing::info!("{}: install complete snapshot", func_name!()); let meta = snapshot.meta.clone(); diff --git a/openraft/src/docs/feature_flags/feature-flags.md b/openraft/src/docs/feature_flags/feature-flags.md index 21aa533d4..29fb35be4 100644 --- a/openraft/src/docs/feature_flags/feature-flags.md +++ b/openraft/src/docs/feature_flags/feature-flags.md @@ -35,11 +35,11 @@ This feature is introduced in 0.9.0 On the sending end (leader that sends snapshot to follower): -- Without `generic-snapshot-data`: [`RaftNetwork::snapshot()`] +- Without `generic-snapshot-data`: [`RaftNetwork::full_snapshot()`] provides a default implementation that invokes the chunk-based API [`RaftNetwork::install_snapshot()`] for transmit. -- With `generic-snapshot-data` enabled: [`RaftNetwork::snapshot()`] +- With `generic-snapshot-data` enabled: [`RaftNetwork::full_snapshot()`] must be implemented to provide application customized snapshot transmission. Application does not need to implement [`RaftNetwork::install_snapshot()`]. @@ -96,5 +96,5 @@ emit log record. See: [tracing doc: emitting-log-records](https://docs.rs/tracing/latest/tracing/#emitting-log-records) -[`RaftNetwork::snapshot()`]: crate::network::RaftNetwork::snapshot +[`RaftNetwork::full_snapshot()`]: crate::network::RaftNetwork::full_snapshot [`RaftNetwork::install_snapshot()`]: crate::network::RaftNetwork::install_snapshot \ No newline at end of file diff --git a/openraft/src/docs/getting_started/getting-started.md b/openraft/src/docs/getting_started/getting-started.md index 1d519d6bb..6bf00cd5f 100644 --- a/openraft/src/docs/getting_started/getting-started.md +++ b/openraft/src/docs/getting_started/getting-started.md @@ -184,11 +184,11 @@ and receiving messages between Raft nodes. Here is the list of methods that need to be implemented for the [`RaftNetwork`] trait: -| [`RaftNetwork`] method | forward request | to target | -|------------------------|--------------------------|---------------------------------------------------| -| [`append_entries()`] | [`AppendEntriesRequest`] | remote node [`Raft::append_entries()`] | -| [`snapshot()`] | [`Snapshot`] | remote node [`Raft::install_complete_snapshot()`] | -| [`vote()`] | [`VoteRequest`] | remote node [`Raft::vote()`] | +| [`RaftNetwork`] method | forward request | to target | +|------------------------|--------------------------|------------------------------------------------| +| [`append_entries()`] | [`AppendEntriesRequest`] | remote node [`Raft::append_entries()`] | +| [`full_snapshot()`] | [`Snapshot`] | remote node [`Raft::install_full_snapshot()`] | +| [`vote()`] | [`VoteRequest`] | remote node [`Raft::vote()`] | [Mem KV Network](https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/network/raft_network_impl.rs) demonstrates how to forward messages to other Raft nodes using [`reqwest`](https://docs.rs/reqwest/latest/reqwest/) as network transport layer. @@ -351,7 +351,7 @@ Additionally, two test scripts for setting up a cluster are available: [`Raft`]: `crate::Raft` [`Raft::append_entries()`]: `crate::Raft::append_entries` [`Raft::vote()`]: `crate::Raft::vote` -[`Raft::install_complete_snapshot()`]: `crate::Raft::install_complete_snapshot` +[`Raft::install_full_snapshot()`]: `crate::Raft::install_full_snapshot` [`AppendEntriesRequest`]: `crate::raft::AppendEntriesRequest` [`VoteRequest`]: `crate::raft::VoteRequest` @@ -398,7 +398,7 @@ Additionally, two test scripts for setting up a cluster are available: [`RaftNetwork`]: `crate::network::RaftNetwork` [`append_entries()`]: `crate::RaftNetwork::append_entries` [`vote()`]: `crate::RaftNetwork::vote` -[`snapshot()`]: `crate::RaftNetwork::snapshot` +[`full_snapshot()`]: `crate::RaftNetwork::full_snapshot` [`RaftSnapshotBuilder`]: `crate::storage::RaftSnapshotBuilder` diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index e3b28eb63..c988d11cd 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -231,7 +231,7 @@ where AppendEntries(ValueSender, Infallible>>), ReceiveSnapshotChunk(ValueSender>), InstallSnapshot(ValueSender, InstallSnapshotError>>), - InstallCompleteSnapshot(ValueSender, Infallible>>), + InstallFullSnapshot(ValueSender, Infallible>>), Initialize(ValueSender>>), } @@ -253,7 +253,7 @@ where (Respond::InstallSnapshot(first_sender), Respond::InstallSnapshot(second_sender)) => { first_sender.eq(second_sender) } - (Respond::InstallCompleteSnapshot(first_sender), Respond::InstallCompleteSnapshot(second_sender)) => { + (Respond::InstallFullSnapshot(first_sender), Respond::InstallFullSnapshot(second_sender)) => { first_sender.eq(second_sender) } (Respond::Initialize(first_sender), Respond::Initialize(second_sender)) => first_sender.eq(second_sender), @@ -290,7 +290,7 @@ where Respond::AppendEntries(x) => x.send(), Respond::ReceiveSnapshotChunk(x) => x.send(), Respond::InstallSnapshot(x) => x.send(), - Respond::InstallCompleteSnapshot(x) => x.send(), + Respond::InstallFullSnapshot(x) => x.send(), Respond::Initialize(x) => x.send(), } } diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 1f9f5fa1a..e79f26993 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -455,7 +455,7 @@ where C: RaftTypeConfig /// Install a completely received snapshot on a follower. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn handle_install_complete_snapshot( + pub(crate) fn handle_install_full_snapshot( &mut self, vote: Vote, snapshot: Snapshot, @@ -472,7 +472,7 @@ where C: RaftTypeConfig }; let mut fh = self.following_handler(); - fh.install_complete_snapshot(snapshot); + fh.install_full_snapshot(snapshot); let res = Ok(SnapshotResponse { vote: *self.state.vote_ref(), }); diff --git a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs index ade389fcc..57eade688 100644 --- a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs +++ b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs @@ -56,7 +56,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> { // `snapshot_meta.last_log_id`. let mut eng = eng(); - eng.following_handler().install_complete_snapshot(Snapshot { + eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(2, 1, 2)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -86,7 +86,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> { // Although in this case the state machine is not affected. let mut eng = eng(); - eng.following_handler().install_complete_snapshot(Snapshot { + eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(4, 1, 5)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -113,7 +113,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> { // Snapshot will be installed and there are no conflicting logs. let mut eng = eng(); - eng.following_handler().install_complete_snapshot(Snapshot { + eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(4, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -140,7 +140,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> { vec![ // Command::from( - sm::Command::install_complete_snapshot(Snapshot { + sm::Command::install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(4, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -187,7 +187,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { eng }; - eng.following_handler().install_complete_snapshot(Snapshot { + eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(5, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -215,7 +215,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { // Command::DeleteConflictLog { since: log_id(2, 1, 4) }, Command::from( - sm::Command::install_complete_snapshot(Snapshot { + sm::Command::install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(5, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -238,7 +238,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> { // Snapshot will be installed and there are no conflicting logs. let mut eng = eng(); - eng.following_handler().install_complete_snapshot(Snapshot { + eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -268,7 +268,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> { assert_eq!( vec![ Command::from( - sm::Command::install_complete_snapshot(Snapshot { + sm::Command::install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -293,7 +293,7 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> { // Snapshot will be installed and `accepted` should be updated. let mut eng = eng(); - eng.following_handler().install_complete_snapshot(Snapshot { + eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 7897e1bd7..8780dd77b 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -243,9 +243,9 @@ where C: RaftTypeConfig /// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication) /// for the reason the following workflow is needed. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn install_complete_snapshot(&mut self, snapshot: Snapshot) { + pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot) { let meta = &snapshot.meta; - tracing::info!("install_complete_snapshot: meta:{:?}", meta); + tracing::info!("install_full_snapshot: meta:{:?}", meta); let snap_last_log_id = meta.last_log_id; @@ -285,7 +285,7 @@ where C: RaftTypeConfig meta.last_membership.clone(), )); - self.output.push_command(Command::from(sm::Command::install_complete_snapshot(snapshot))); + self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot))); self.state.purge_upto = Some(snap_last_log_id); self.log_handler().purge_log(); diff --git a/openraft/src/error.rs b/openraft/src/error.rs index ba3bcba34..cf7f2a065 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -240,7 +240,7 @@ where Closed(#[from] ReplicationClosed), // TODO(xp): two sub type: StorageError / TransportError - // TODO(xp): a sub error for just send_append_entries() + // TODO(xp): a sub error for just append_entries() #[error(transparent)] StorageError(#[from] StorageError), diff --git a/openraft/src/network/network.rs b/openraft/src/network/network.rs index c019c5e8e..59eb860e9 100644 --- a/openraft/src/network/network.rs +++ b/openraft/src/network/network.rs @@ -31,18 +31,6 @@ use crate::Vote; /// /// A single network instance is used to connect to a single target node. The network instance is /// constructed by the [`RaftNetworkFactory`](`crate::network::RaftNetworkFactory`). -/// -/// ### 2023-05-03: New API with options -/// -/// - This trait introduced 3 new API `append_entries`, `install_snapshot` and `vote` which accept -/// an additional argument [`RPCOption`], and deprecated the old API `send_append_entries`, -/// `send_install_snapshot` and `send_vote`. -/// -/// - The old API will be **removed** in `0.9`. An application can still implement the old API -/// without any changes. Openraft calls only the new API and the default implementation will -/// delegate to the old API. -/// -/// - Implementing the new APIs will disable the old APIs. #[add_async_trait] pub trait RaftNetwork: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig @@ -52,37 +40,26 @@ where C: RaftTypeConfig &mut self, rpc: AppendEntriesRequest, option: RPCOption, - ) -> Result, RPCError>> { - let _ = option; - #[allow(deprecated)] - self.send_append_entries(rpc).await - } + ) -> Result, RPCError>>; /// Send an InstallSnapshot RPC to the target. + #[cfg(not(feature = "generic-snapshot-data"))] #[deprecated(since = "0.9.0", note = "use `snapshot()` instead for sending a complete snapshot")] async fn install_snapshot( &mut self, - rpc: InstallSnapshotRequest, - option: RPCOption, + _rpc: InstallSnapshotRequest, + _option: RPCOption, ) -> Result< InstallSnapshotResponse, RPCError>, - > { - let _ = option; - #[allow(deprecated)] - self.send_install_snapshot(rpc).await - } + >; /// Send a RequestVote RPC to the target. async fn vote( &mut self, rpc: VoteRequest, option: RPCOption, - ) -> Result, RPCError>> { - let _ = option; - #[allow(deprecated)] - self.send_vote(rpc).await - } + ) -> Result, RPCError>>; /// Send a complete Snapshot to the target. /// @@ -95,71 +72,32 @@ where C: RaftTypeConfig /// /// The `vote` is the leader vote which is used to check if the leader is still valid by a /// follower. - /// When the follower finished receiving snapshot, it calls `Raft::install_complete_snapshot()` + /// When the follower finished receiving snapshot, it calls `Raft::install_full_snapshot()` /// with this vote. /// /// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission. - async fn snapshot( + #[cfg(feature = "generic-snapshot-data")] + async fn full_snapshot( &mut self, vote: Vote, snapshot: Snapshot, cancel: impl Future + OptionalSend, option: RPCOption, - ) -> Result, StreamingError>> { - #[cfg(not(feature = "generic-snapshot-data"))] - { - use crate::network::stream_snapshot; - use crate::network::stream_snapshot::SnapshotTransport; - - let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; - Ok(resp) - } - #[cfg(feature = "generic-snapshot-data")] - { - let _ = (vote, snapshot, cancel, option); - unimplemented!( - "no default implementation for RaftNetwork::snapshot() if `generic-snapshot-data` feature is enabled" - ) - } - } - - /// Send an AppendEntries RPC to the target Raft node (§5). - #[deprecated( - since = "0.8.4", - note = "use `append_entries` instead. This method will be removed in 0.9" - )] - async fn send_append_entries( - &mut self, - rpc: AppendEntriesRequest, - ) -> Result, RPCError>> { - let _ = rpc; - unimplemented!("send_append_entries is deprecated") - } + ) -> Result, StreamingError>>; - /// Send an InstallSnapshot RPC to the target Raft node (§7). - #[deprecated( - since = "0.8.4", - note = "use `install_snapshot` instead. This method will be removed in 0.9" - )] - async fn send_install_snapshot( + #[cfg(not(feature = "generic-snapshot-data"))] + async fn full_snapshot( &mut self, - rpc: InstallSnapshotRequest, - ) -> Result< - InstallSnapshotResponse, - RPCError>, - > { - let _ = rpc; - unimplemented!("send_install_snapshot is deprecated") - } + vote: Vote, + snapshot: Snapshot, + cancel: impl Future + OptionalSend, + option: RPCOption, + ) -> Result, StreamingError>> { + use crate::network::stream_snapshot; + use crate::network::stream_snapshot::SnapshotTransport; - /// Send a RequestVote RPC to the target Raft node (§5). - #[deprecated(since = "0.8.4", note = "use `vote` instead. This method will be removed in 0.9")] - async fn send_vote( - &mut self, - rpc: VoteRequest, - ) -> Result, RPCError>> { - let _ = rpc; - unimplemented!("send_vote is deprecated") + let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; + Ok(resp) } /// Build a backoff instance if the target node is temporarily(or permanently) unreachable. diff --git a/openraft/src/network/stream_snapshot.rs b/openraft/src/network/stream_snapshot.rs index c625ea52d..3245c1b4a 100644 --- a/openraft/src/network/stream_snapshot.rs +++ b/openraft/src/network/stream_snapshot.rs @@ -58,7 +58,7 @@ impl SnapshotTransport for Chunked { /// Stream snapshot by chunks. /// /// This function is for backward compatibility and provides a default implement for - /// `RaftNetwork::snapshot()` upon `RafNetwork::install_snapshot()`. This implementation + /// `RaftNetwork::full_snapshot()` upon `RafNetwork::install_snapshot()`. This implementation /// requires `SnapshotData` to be `AsyncRead + AsyncSeek`. /// /// The argument `vote` is the leader's vote which is used to check if the leader is still valid @@ -96,7 +96,7 @@ impl SnapshotTransport for Chunked { snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?; // Safe unwrap(): this function is called only by default implementation of - // `RaftNetwork::snapshot()` and it is always set. + // `RaftNetwork::full_snapshot()` and it is always set. let chunk_size = option.snapshot_chunk_size().unwrap(); let mut buf = Vec::with_capacity(chunk_size); while buf.capacity() > buf.len() { diff --git a/openraft/src/raft/message/append_entries.rs b/openraft/src/raft/message/append_entries.rs index 5980110d8..0387ea2ed 100644 --- a/openraft/src/raft/message/append_entries.rs +++ b/openraft/src/raft/message/append_entries.rs @@ -53,11 +53,11 @@ impl MessageSummary> for AppendEntrie /// The response to an `AppendEntriesRequest`. /// -/// [`RaftNetwork::send_append_entries`] returns this type only when received an RPC reply. +/// [`RaftNetwork::append_entries`] returns this type only when received an RPC reply. /// Otherwise it should return [`RPCError`]. /// /// [`RPCError`]: crate::error::RPCError -/// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries +/// [`RaftNetwork::append_entries`]: crate::network::RaftNetwork::append_entries #[derive(Debug)] #[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] @@ -67,7 +67,7 @@ pub enum AppendEntriesResponse { /// Successfully sent the first portion of log entries. /// - /// [`RaftNetwork::send_append_entries`] can return a partial success. + /// [`RaftNetwork::append_entries`] can return a partial success. /// For example, it tries to send log entries `[1-2..3-10]`, the application is allowed to send /// just `[1-2..1-3]` and return `PartialSuccess(1-3)`, /// @@ -75,12 +75,12 @@ pub enum AppendEntriesResponse { /// /// The returned matching log id must be **greater than or equal to** the first log /// id([`AppendEntriesRequest::prev_log_id`]) of the entries to send. If no RPC reply is - /// received, [`RaftNetwork::send_append_entries`] must return an [`RPCError`] to inform + /// received, [`RaftNetwork::append_entries`] must return an [`RPCError`] to inform /// Openraft that the first log id([`AppendEntriesRequest::prev_log_id`]) may not match on /// the remote target node. /// /// [`RPCError`]: crate::error::RPCError - /// [`RaftNetwork::send_append_entries`]: crate::network::RaftNetwork::send_append_entries + /// [`RaftNetwork::append_entries`]: crate::network::RaftNetwork::append_entries PartialSuccess(Option>), /// The first log id([`AppendEntriesRequest::prev_log_id`]) of the entries to send does not diff --git a/openraft/src/raft/message/install_snapshot.rs b/openraft/src/raft/message/install_snapshot.rs index 7600e74e4..f48806db9 100644 --- a/openraft/src/raft/message/install_snapshot.rs +++ b/openraft/src/raft/message/install_snapshot.rs @@ -55,7 +55,7 @@ pub struct InstallSnapshotResponse { pub vote: Vote, } -/// The response to `Raft::install_complete_snapshot` API. +/// The response to `Raft::install_full_snapshot` API. #[derive(Debug)] #[derive(PartialEq, Eq)] #[derive(derive_more::Display)] diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index c51fbb553..3fece00cf 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -384,15 +384,15 @@ where C: RaftTypeConfig /// The application receives a snapshot from the leader, in chunks or a stream, and /// then rebuild a snapshot, then pass the snapshot to Raft to install. #[tracing::instrument(level = "debug", skip_all)] - pub async fn install_complete_snapshot( + pub async fn install_full_snapshot( &self, vote: Vote, snapshot: Snapshot, ) -> Result, Fatal> { - tracing::info!("Raft::install_complete_snapshot()"); + tracing::info!("Raft::install_full_snapshot()"); let (tx, rx) = C::AsyncRuntime::oneshot(); - let res = self.inner.call_core(RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx }, rx).await; + let res = self.inner.call_core(RaftMsg::InstallFullSnapshot { vote, snapshot, tx }, rx).await; match res { Ok(x) => Ok(x), Err(e) => { @@ -465,7 +465,7 @@ where C: RaftTypeConfig let snapshot = Chunked::receive_snapshot(&mut *streaming, req).await?; if let Some(snapshot) = snapshot { - let resp = self.install_complete_snapshot(req_vote, snapshot).await?; + let resp = self.install_full_snapshot(req_vote, snapshot).await?; Ok(resp.into()) } else { diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 9b8ed7831..a02e00b33 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -806,7 +806,7 @@ where ReplicationClosed::new("ReplicationCore is dropped") }; - let res = net.snapshot(vote, snapshot, cancel, option).await; + let res = net.full_snapshot(vote, snapshot, cancel, option).await; if let Err(e) = &res { tracing::warn!(error = display(e), "failed to send snapshot"); } diff --git a/tests/tests/client_api/main.rs b/tests/tests/client_api/main.rs index a8a378cdf..94711e97d 100644 --- a/tests/tests/client_api/main.rs +++ b/tests/tests/client_api/main.rs @@ -12,7 +12,7 @@ mod t11_client_reads; mod t12_trigger_purge_log; mod t13_begin_receiving_snapshot; mod t13_get_snapshot; -mod t13_install_complete_snapshot; +mod t13_install_full_snapshot; mod t13_trigger_snapshot; mod t16_with_raft_state; mod t50_lagging_network_write; diff --git a/tests/tests/client_api/t13_install_complete_snapshot.rs b/tests/tests/client_api/t13_install_full_snapshot.rs similarity index 89% rename from tests/tests/client_api/t13_install_complete_snapshot.rs rename to tests/tests/client_api/t13_install_full_snapshot.rs index a647839ba..e40098433 100644 --- a/tests/tests/client_api/t13_install_complete_snapshot.rs +++ b/tests/tests/client_api/t13_install_full_snapshot.rs @@ -10,7 +10,7 @@ use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] -async fn install_complete_snapshot() -> anyhow::Result<()> { +async fn install_full_snapshot() -> anyhow::Result<()> { let config = Arc::new( Config { enable_heartbeat: false, @@ -51,7 +51,7 @@ async fn install_complete_snapshot() -> anyhow::Result<()> { { let n1 = router.get_raft_handle(&1)?; - let resp = n1.install_complete_snapshot(Vote::new(0, 0), snap.clone()).await?; + let resp = n1.install_full_snapshot(Vote::new(0, 0), snap.clone()).await?; assert_eq!( Vote::new_committed(1, 0), resp.vote, @@ -73,7 +73,7 @@ async fn install_complete_snapshot() -> anyhow::Result<()> { { let n1 = router.get_raft_handle(&1)?; - let resp = n1.install_complete_snapshot(Vote::new_committed(1, 0), snap.clone()).await?; + let resp = n1.install_full_snapshot(Vote::new_committed(1, 0), snap.clone()).await?; assert_eq!(Vote::new_committed(1, 0), resp.vote,); n1.with_raft_state(move |state| { assert_eq!( @@ -88,7 +88,7 @@ async fn install_complete_snapshot() -> anyhow::Result<()> { { let n2 = router.get_raft_handle(&2)?; - let resp = n2.install_complete_snapshot(Vote::new_committed(1, 0), snap.clone()).await?; + let resp = n2.install_full_snapshot(Vote::new_committed(1, 0), snap.clone()).await?; assert_eq!(Vote::new_committed(1, 0), resp.vote,); n2.with_raft_state(move |state| { assert_eq!( diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 1ef43ffa3..58bcae1ad 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -32,6 +32,7 @@ use openraft::error::RaftError; use openraft::error::RemoteError; use openraft::error::Unreachable; use openraft::metrics::Wait; +use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; @@ -985,9 +986,10 @@ pub struct RaftRouterNetwork { impl RaftNetwork for RaftRouterNetwork { /// Send an AppendEntries RPC to the target Raft node (§5). - async fn send_append_entries( + async fn append_entries( &mut self, mut rpc: AppendEntriesRequest, + _option: RPCOption, ) -> Result, RPCError>> { let from_id = rpc.vote.leader_id().voted_for().unwrap(); @@ -1048,9 +1050,10 @@ impl RaftNetwork for RaftRouterNetwork { } /// Send an InstallSnapshot RPC to the target Raft node (§7). - async fn send_install_snapshot( + async fn install_snapshot( &mut self, rpc: InstallSnapshotRequest, + _option: RPCOption, ) -> Result, RPCError>> { let from_id = rpc.vote.leader_id().voted_for().unwrap(); @@ -1069,9 +1072,10 @@ impl RaftNetwork for RaftRouterNetwork { } /// Send a RequestVote RPC to the target Raft node (§5). - async fn send_vote( + async fn vote( &mut self, rpc: VoteRequest, + _option: RPCOption, ) -> Result, RPCError>> { let from_id = rpc.vote.leader_id().voted_for().unwrap(); diff --git a/tests/tests/replication/t10_append_entries_partial_success.rs b/tests/tests/replication/t10_append_entries_partial_success.rs index 6efc46f2a..a5573564f 100644 --- a/tests/tests/replication/t10_append_entries_partial_success.rs +++ b/tests/tests/replication/t10_append_entries_partial_success.rs @@ -8,7 +8,7 @@ use openraft::Config; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; -/// RaftNetwork::send_append_entries can return a partial success. +/// RaftNetwork::append_entries can return a partial success. /// For example, it tries to send log entries `[1-2..2-10]`, the application is allowed to send just /// `[1-2..1-3]` and return `PartialSuccess(1-3)`. #[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]