diff --git a/openraft/src/network/mod.rs b/openraft/src/network/mod.rs index c2c7a2ec1..b4cb9915b 100644 --- a/openraft/src/network/mod.rs +++ b/openraft/src/network/mod.rs @@ -5,7 +5,8 @@ mod factory; #[allow(clippy::module_inception)] mod network; mod rpc_option; mod rpc_type; -#[cfg(not(feature = "generic-snapshot-data"))] pub(crate) mod stream_snapshot; + +pub mod snapshot_transport; pub use backoff::Backoff; pub use factory::RaftNetworkFactory; diff --git a/openraft/src/network/network.rs b/openraft/src/network/network.rs index 96f48ae94..d319e37bf 100644 --- a/openraft/src/network/network.rs +++ b/openraft/src/network/network.rs @@ -39,9 +39,25 @@ where C: RaftTypeConfig option: RPCOption, ) -> Result, RPCError>>; + /// Send an InstallSnapshot RPC to the target. + #[cfg(feature = "generic-snapshot-data")] + #[deprecated( + since = "0.9.0", + note = "with `generic-snapshot-data` enabled, use `full_snapshot()` instead to send full snapshot" + )] + async fn install_snapshot( + &mut self, + _rpc: crate::raft::InstallSnapshotRequest, + _option: RPCOption, + ) -> Result< + crate::raft::InstallSnapshotResponse, + RPCError>, + > { + unimplemented!() + } + /// Send an InstallSnapshot RPC to the target. #[cfg(not(feature = "generic-snapshot-data"))] - #[deprecated(since = "0.9.0", note = "use `snapshot()` instead for sending a complete snapshot")] async fn install_snapshot( &mut self, _rpc: crate::raft::InstallSnapshotRequest, @@ -82,6 +98,23 @@ where C: RaftTypeConfig option: RPCOption, ) -> Result, StreamingError>>; + /// Send a complete Snapshot to the target. + /// + /// This method is responsible to fragment the snapshot and send it to the target node. + /// Before returning from this method, the snapshot should be completely transmitted and + /// installed on the target node, or rejected because of `vote` being smaller than the + /// remote one. + /// + /// The default implementation just calls several `install_snapshot` RPC for each fragment. + /// + /// The `vote` is the leader vote which is used to check if the leader is still valid by a + /// follower. + /// When the follower finished receiving snapshot, it calls `Raft::install_full_snapshot()` + /// with this vote. + /// + /// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission. + // If generic-snapshot-data disabled, + // provide a default implementation that relies on AsyncRead + AsyncSeek + Unpin #[cfg(not(feature = "generic-snapshot-data"))] async fn full_snapshot( &mut self, @@ -90,10 +123,10 @@ where C: RaftTypeConfig cancel: impl Future + OptionalSend, option: RPCOption, ) -> Result, StreamingError>> { - use crate::network::stream_snapshot; - use crate::network::stream_snapshot::SnapshotTransport; + use crate::network::snapshot_transport::Chunked; + use crate::network::snapshot_transport::SnapshotTransport; - let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; + let resp = Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; Ok(resp) } diff --git a/openraft/src/network/stream_snapshot.rs b/openraft/src/network/snapshot_transport.rs similarity index 82% rename from openraft/src/network/stream_snapshot.rs rename to openraft/src/network/snapshot_transport.rs index 3245c1b4a..134c0b737 100644 --- a/openraft/src/network/stream_snapshot.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -1,3 +1,6 @@ +//! Provide a default chunked snapshot transport implementation for SnapshotData that implements +//! AsyncWrite + AsyncRead + AsyncSeek + Unpin. + use std::future::Future; use std::io::SeekFrom; use std::time::Duration; @@ -28,8 +31,13 @@ use crate::StorageIOError; use crate::ToStorageResult; use crate::Vote; +/// Defines the sending and receiving API for snapshot transport. #[add_async_trait] -pub(crate) trait SnapshotTransport { +pub trait SnapshotTransport { + /// Send a snapshot to a target node via `Net` in chunks. + /// + /// The implementation should watch the `cancel` future and return at once if it is ready. + // TODO: remove dependency on RaftNetwork async fn send_snapshot( _net: &mut Net, _vote: Vote, @@ -38,23 +46,21 @@ pub(crate) trait SnapshotTransport { _option: RPCOption, ) -> Result, StreamingError>> where - Net: RaftNetwork + ?Sized, - { - unimplemented!("send_snapshot is only implemented with SnapshotData with AsyncRead + AsyncSeek ...") - } + Net: RaftNetwork + ?Sized; + /// Receive a chunk of snapshot. If the snapshot is done, return the snapshot. async fn receive_snapshot( _streaming: &mut Option>, _req: InstallSnapshotRequest, - ) -> Result>, StorageError> { - unimplemented!("receive_snapshot is only implemented with SnapshotData with AsyncWrite + AsyncSeek ...") - } + ) -> Result>, StorageError>; } /// Send and Receive snapshot by chunks. -pub(crate) struct Chunked {} +pub struct Chunked {} -impl SnapshotTransport for Chunked { +impl SnapshotTransport for Chunked +where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin +{ /// Stream snapshot by chunks. /// /// This function is for backward compatibility and provides a default implement for @@ -175,7 +181,7 @@ impl SnapshotTransport for Chunked { if done { let streaming = streaming.take().unwrap(); - let mut data = streaming.snapshot_data; + let mut data = streaming.into_snapshot_data(); data.as_mut() .shutdown() @@ -191,23 +197,23 @@ impl SnapshotTransport for Chunked { } /// The Raft node is streaming in a snapshot from the leader. -pub(crate) struct Streaming +pub struct Streaming where C: RaftTypeConfig { /// The offset of the last byte written to the snapshot. - pub(crate) offset: u64, + offset: u64, /// The ID of the snapshot being written. - pub(crate) snapshot_id: SnapshotId, + snapshot_id: SnapshotId, /// A handle to the snapshot writer. - pub(crate) snapshot_data: Box, + snapshot_data: Box, } impl Streaming where C: RaftTypeConfig { - pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { + pub fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { Self { offset: 0, snapshot_id, @@ -215,8 +221,23 @@ where C: RaftTypeConfig } } + pub fn snapshot_id(&self) -> &SnapshotId { + &self.snapshot_id + } + + /// Consumes the `Streaming` and returns the snapshot data. + pub fn into_snapshot_data(self) -> Box { + self.snapshot_data + } +} + +impl Streaming +where + C: RaftTypeConfig, + C::SnapshotData: tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, +{ /// Receive a chunk of snapshot data. - pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { + pub async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { // TODO: check id? // Always seek to the target offset if not an exact match. diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 3fece00cf..5ba1caabf 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -254,7 +254,6 @@ where C: RaftTypeConfig tx_shutdown: Mutex::new(Some(tx_shutdown)), core_state: Mutex::new(CoreState::Running(core_handle)), - #[cfg(not(feature = "generic-snapshot-data"))] snapshot: Mutex::new(None), }; @@ -409,7 +408,13 @@ where C: RaftTypeConfig /// /// If receiving is finished `done == true`, it installs the snapshot to the state machine. /// Nothing will be done if the input snapshot is older than the state machine. - #[cfg(not(feature = "generic-snapshot-data"))] + #[cfg_attr( + feature = "generic-snapshot-data", + deprecated( + since = "0.9.0", + note = "with `generic-snapshot-shot` enabled, use `Raft::install_full_snapshot()` instead" + ) + )] #[tracing::instrument(level = "debug", skip_all)] pub async fn install_snapshot( &self, @@ -418,8 +423,8 @@ where C: RaftTypeConfig where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, { - use crate::network::stream_snapshot::Chunked; - use crate::network::stream_snapshot::SnapshotTransport; + use crate::network::snapshot_transport::Chunked; + use crate::network::snapshot_transport::SnapshotTransport; tracing::debug!(req = display(&req), "Raft::install_snapshot()"); @@ -428,7 +433,7 @@ where C: RaftTypeConfig let mut streaming = self.inner.snapshot.lock().await; - let curr_id = streaming.as_ref().map(|s| &s.snapshot_id); + let curr_id = streaming.as_ref().map(|s| s.snapshot_id()); if curr_id != Some(&req.meta.snapshot_id) { if req.offset != 0 { @@ -457,7 +462,7 @@ where C: RaftTypeConfig } } }; - *streaming = Some(crate::network::stream_snapshot::Streaming::new( + *streaming = Some(crate::network::snapshot_transport::Streaming::new( req.meta.snapshot_id.clone(), snapshot_data, )); diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index d13fdfeec..a1bf298b6 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -44,8 +44,7 @@ where C: RaftTypeConfig pub(in crate::raft) core_state: Mutex>, /// The ongoing snapshot transmission. - #[cfg(not(feature = "generic-snapshot-data"))] - pub(in crate::raft) snapshot: Mutex>>, + pub(in crate::raft) snapshot: Mutex>>, } impl RaftInner