Skip to content

Commit

Permalink
Refactor: even when generic-snapshot-data is enabled, the old chunk…
Browse files Browse the repository at this point in the history
…ed transport be still available

If `generic-snapshot-data` is enabled, the old chunk based transport
such as `RaftNetwork::install_snapshot()` for sending and
`Raft::install_snapshot()` for receiving should be still be available
but just deprecated.
This way the application can upgrade Openraft without modification,
except several `#[allow(deprecated)]` attributes.
  • Loading branch information
drmingdrmer committed Mar 6, 2024
1 parent 44cc427 commit 897bb37
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 30 deletions.
3 changes: 2 additions & 1 deletion openraft/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ mod factory;
#[allow(clippy::module_inception)] mod network;
mod rpc_option;
mod rpc_type;
#[cfg(not(feature = "generic-snapshot-data"))] pub(crate) mod stream_snapshot;

pub mod snapshot_transport;

pub use backoff::Backoff;
pub use factory::RaftNetworkFactory;
Expand Down
41 changes: 37 additions & 4 deletions openraft/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,25 @@ where C: RaftTypeConfig
option: RPCOption,
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;

/// Send an InstallSnapshot RPC to the target.
#[cfg(feature = "generic-snapshot-data")]
#[deprecated(
since = "0.9.0",
note = "with `generic-snapshot-data` enabled, use `full_snapshot()` instead to send full snapshot"
)]
async fn install_snapshot(
&mut self,
_rpc: crate::raft::InstallSnapshotRequest<C>,
_option: RPCOption,
) -> Result<
crate::raft::InstallSnapshotResponse<C::NodeId>,
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, crate::error::InstallSnapshotError>>,
> {
unimplemented!()
}

/// Send an InstallSnapshot RPC to the target.
#[cfg(not(feature = "generic-snapshot-data"))]
#[deprecated(since = "0.9.0", note = "use `snapshot()` instead for sending a complete snapshot")]
async fn install_snapshot(
&mut self,
_rpc: crate::raft::InstallSnapshotRequest<C>,
Expand Down Expand Up @@ -82,6 +98,23 @@ where C: RaftTypeConfig
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>;

/// Send a complete Snapshot to the target.
///
/// This method is responsible to fragment the snapshot and send it to the target node.
/// Before returning from this method, the snapshot should be completely transmitted and
/// installed on the target node, or rejected because of `vote` being smaller than the
/// remote one.
///
/// The default implementation just calls several `install_snapshot` RPC for each fragment.
///
/// The `vote` is the leader vote which is used to check if the leader is still valid by a
/// follower.
/// When the follower finished receiving snapshot, it calls `Raft::install_full_snapshot()`
/// with this vote.
///
/// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission.
// If generic-snapshot-data disabled,
// provide a default implementation that relies on AsyncRead + AsyncSeek + Unpin
#[cfg(not(feature = "generic-snapshot-data"))]
async fn full_snapshot(
&mut self,
Expand All @@ -90,10 +123,10 @@ where C: RaftTypeConfig
cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>> {
use crate::network::stream_snapshot;
use crate::network::stream_snapshot::SnapshotTransport;
use crate::network::snapshot_transport::Chunked;
use crate::network::snapshot_transport::SnapshotTransport;

let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
let resp = Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
Ok(resp)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! Provide a default chunked snapshot transport implementation for SnapshotData that implements
//! AsyncWrite + AsyncRead + AsyncSeek + Unpin.
use std::future::Future;
use std::io::SeekFrom;
use std::time::Duration;
Expand Down Expand Up @@ -28,8 +31,13 @@ use crate::StorageIOError;
use crate::ToStorageResult;
use crate::Vote;

/// Defines the sending and receiving API for snapshot transport.
#[add_async_trait]
pub(crate) trait SnapshotTransport<C: RaftTypeConfig> {
pub trait SnapshotTransport<C: RaftTypeConfig> {
/// Send a snapshot to a target node via `Net` in chunks.
///
/// The implementation should watch the `cancel` future and return at once if it is ready.
// TODO: remove dependency on RaftNetwork
async fn send_snapshot<Net>(
_net: &mut Net,
_vote: Vote<C::NodeId>,
Expand All @@ -38,23 +46,21 @@ pub(crate) trait SnapshotTransport<C: RaftTypeConfig> {
_option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>
where
Net: RaftNetwork<C> + ?Sized,
{
unimplemented!("send_snapshot is only implemented with SnapshotData with AsyncRead + AsyncSeek ...")
}
Net: RaftNetwork<C> + ?Sized;

/// Receive a chunk of snapshot. If the snapshot is done, return the snapshot.
async fn receive_snapshot(
_streaming: &mut Option<Streaming<C>>,
_req: InstallSnapshotRequest<C>,
) -> Result<Option<Snapshot<C>>, StorageError<C::NodeId>> {
unimplemented!("receive_snapshot is only implemented with SnapshotData with AsyncWrite + AsyncSeek ...")
}
) -> Result<Option<Snapshot<C>>, StorageError<C::NodeId>>;
}

/// Send and Receive snapshot by chunks.
pub(crate) struct Chunked {}
pub struct Chunked {}

impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {
impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked
where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin
{
/// Stream snapshot by chunks.
///
/// This function is for backward compatibility and provides a default implement for
Expand Down Expand Up @@ -175,7 +181,7 @@ impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {

if done {
let streaming = streaming.take().unwrap();
let mut data = streaming.snapshot_data;
let mut data = streaming.into_snapshot_data();

data.as_mut()
.shutdown()
Expand All @@ -191,32 +197,47 @@ impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {
}

/// The Raft node is streaming in a snapshot from the leader.
pub(crate) struct Streaming<C>
pub struct Streaming<C>
where C: RaftTypeConfig
{
/// The offset of the last byte written to the snapshot.
pub(crate) offset: u64,
offset: u64,

/// The ID of the snapshot being written.
pub(crate) snapshot_id: SnapshotId,
snapshot_id: SnapshotId,

/// A handle to the snapshot writer.
pub(crate) snapshot_data: Box<C::SnapshotData>,
snapshot_data: Box<C::SnapshotData>,
}

impl<C> Streaming<C>
where C: RaftTypeConfig
{
pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box<C::SnapshotData>) -> Self {
pub fn new(snapshot_id: SnapshotId, snapshot_data: Box<C::SnapshotData>) -> Self {
Self {
offset: 0,
snapshot_id,
snapshot_data,
}
}

pub fn snapshot_id(&self) -> &SnapshotId {
&self.snapshot_id
}

/// Consumes the `Streaming` and returns the snapshot data.
pub fn into_snapshot_data(self) -> Box<C::SnapshotData> {
self.snapshot_data
}
}

impl<C> Streaming<C>
where
C: RaftTypeConfig,
C::SnapshotData: tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin,
{
/// Receive a chunk of snapshot data.
pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest<C>) -> Result<bool, StorageError<C::NodeId>> {
pub async fn receive(&mut self, req: InstallSnapshotRequest<C>) -> Result<bool, StorageError<C::NodeId>> {
// TODO: check id?

// Always seek to the target offset if not an exact match.
Expand Down
17 changes: 11 additions & 6 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ where C: RaftTypeConfig
tx_shutdown: Mutex::new(Some(tx_shutdown)),
core_state: Mutex::new(CoreState::Running(core_handle)),

#[cfg(not(feature = "generic-snapshot-data"))]
snapshot: Mutex::new(None),
};

Expand Down Expand Up @@ -409,7 +408,13 @@ where C: RaftTypeConfig
///
/// If receiving is finished `done == true`, it installs the snapshot to the state machine.
/// Nothing will be done if the input snapshot is older than the state machine.
#[cfg(not(feature = "generic-snapshot-data"))]
#[cfg_attr(
feature = "generic-snapshot-data",
deprecated(
since = "0.9.0",
note = "with `generic-snapshot-shot` enabled, use `Raft::install_full_snapshot()` instead"
)
)]
#[tracing::instrument(level = "debug", skip_all)]
pub async fn install_snapshot(
&self,
Expand All @@ -418,8 +423,8 @@ where C: RaftTypeConfig
where
C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin,
{
use crate::network::stream_snapshot::Chunked;
use crate::network::stream_snapshot::SnapshotTransport;
use crate::network::snapshot_transport::Chunked;
use crate::network::snapshot_transport::SnapshotTransport;

tracing::debug!(req = display(&req), "Raft::install_snapshot()");

Expand All @@ -428,7 +433,7 @@ where C: RaftTypeConfig

let mut streaming = self.inner.snapshot.lock().await;

let curr_id = streaming.as_ref().map(|s| &s.snapshot_id);
let curr_id = streaming.as_ref().map(|s| s.snapshot_id());

if curr_id != Some(&req.meta.snapshot_id) {
if req.offset != 0 {
Expand Down Expand Up @@ -457,7 +462,7 @@ where C: RaftTypeConfig
}
}
};
*streaming = Some(crate::network::stream_snapshot::Streaming::new(
*streaming = Some(crate::network::snapshot_transport::Streaming::new(
req.meta.snapshot_id.clone(),
snapshot_data,
));
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ where C: RaftTypeConfig
pub(in crate::raft) core_state: Mutex<CoreState<C::NodeId, C::AsyncRuntime>>,

/// The ongoing snapshot transmission.
#[cfg(not(feature = "generic-snapshot-data"))]
pub(in crate::raft) snapshot: Mutex<Option<crate::network::stream_snapshot::Streaming<C>>>,
pub(in crate::raft) snapshot: Mutex<Option<crate::network::snapshot_transport::Streaming<C>>>,
}

impl<C> RaftInner<C>
Expand Down

0 comments on commit 897bb37

Please sign in to comment.