Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: even when generic-snapshot-data is enabled, the old chunked transport be still available #1036

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion openraft/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ mod factory;
#[allow(clippy::module_inception)] mod network;
mod rpc_option;
mod rpc_type;
#[cfg(not(feature = "generic-snapshot-data"))] pub(crate) mod stream_snapshot;

pub mod snapshot_transport;

pub use backoff::Backoff;
pub use factory::RaftNetworkFactory;
Expand Down
41 changes: 37 additions & 4 deletions openraft/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,25 @@ where C: RaftTypeConfig
option: RPCOption,
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;

/// Send an InstallSnapshot RPC to the target.
#[cfg(feature = "generic-snapshot-data")]
#[deprecated(
since = "0.9.0",
note = "with `generic-snapshot-data` enabled, use `full_snapshot()` instead to send full snapshot"
)]
async fn install_snapshot(
&mut self,
_rpc: crate::raft::InstallSnapshotRequest<C>,
_option: RPCOption,
) -> Result<
crate::raft::InstallSnapshotResponse<C::NodeId>,
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, crate::error::InstallSnapshotError>>,
> {
unimplemented!()
}

/// Send an InstallSnapshot RPC to the target.
#[cfg(not(feature = "generic-snapshot-data"))]
#[deprecated(since = "0.9.0", note = "use `snapshot()` instead for sending a complete snapshot")]
async fn install_snapshot(
&mut self,
_rpc: crate::raft::InstallSnapshotRequest<C>,
Expand Down Expand Up @@ -82,6 +98,23 @@ where C: RaftTypeConfig
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>;

/// Send a complete Snapshot to the target.
///
/// This method is responsible to fragment the snapshot and send it to the target node.
/// Before returning from this method, the snapshot should be completely transmitted and
/// installed on the target node, or rejected because of `vote` being smaller than the
/// remote one.
///
/// The default implementation just calls several `install_snapshot` RPC for each fragment.
///
/// The `vote` is the leader vote which is used to check if the leader is still valid by a
/// follower.
/// When the follower finished receiving snapshot, it calls `Raft::install_full_snapshot()`
/// with this vote.
///
/// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission.
// If generic-snapshot-data disabled,
// provide a default implementation that relies on AsyncRead + AsyncSeek + Unpin
#[cfg(not(feature = "generic-snapshot-data"))]
async fn full_snapshot(
&mut self,
Expand All @@ -90,10 +123,10 @@ where C: RaftTypeConfig
cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>> {
use crate::network::stream_snapshot;
use crate::network::stream_snapshot::SnapshotTransport;
use crate::network::snapshot_transport::Chunked;
use crate::network::snapshot_transport::SnapshotTransport;

let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
let resp = Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
Ok(resp)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! Provide a default chunked snapshot transport implementation for SnapshotData that implements
//! AsyncWrite + AsyncRead + AsyncSeek + Unpin.
use std::future::Future;
use std::io::SeekFrom;
use std::time::Duration;
Expand Down Expand Up @@ -28,8 +31,13 @@ use crate::StorageIOError;
use crate::ToStorageResult;
use crate::Vote;

/// Defines the sending and receiving API for snapshot transport.
#[add_async_trait]
pub(crate) trait SnapshotTransport<C: RaftTypeConfig> {
pub trait SnapshotTransport<C: RaftTypeConfig> {
/// Send a snapshot to a target node via `Net` in chunks.
///
/// The implementation should watch the `cancel` future and return at once if it is ready.
// TODO: remove dependency on RaftNetwork
async fn send_snapshot<Net>(
_net: &mut Net,
_vote: Vote<C::NodeId>,
Expand All @@ -38,23 +46,21 @@ pub(crate) trait SnapshotTransport<C: RaftTypeConfig> {
_option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>
where
Net: RaftNetwork<C> + ?Sized,
{
unimplemented!("send_snapshot is only implemented with SnapshotData with AsyncRead + AsyncSeek ...")
}
Net: RaftNetwork<C> + ?Sized;

/// Receive a chunk of snapshot. If the snapshot is done, return the snapshot.
async fn receive_snapshot(
_streaming: &mut Option<Streaming<C>>,
_req: InstallSnapshotRequest<C>,
) -> Result<Option<Snapshot<C>>, StorageError<C::NodeId>> {
unimplemented!("receive_snapshot is only implemented with SnapshotData with AsyncWrite + AsyncSeek ...")
}
) -> Result<Option<Snapshot<C>>, StorageError<C::NodeId>>;
}

/// Send and Receive snapshot by chunks.
pub(crate) struct Chunked {}
pub struct Chunked {}

impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {
impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked
where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin
{
/// Stream snapshot by chunks.
///
/// This function is for backward compatibility and provides a default implement for
Expand Down Expand Up @@ -175,7 +181,7 @@ impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {

if done {
let streaming = streaming.take().unwrap();
let mut data = streaming.snapshot_data;
let mut data = streaming.into_snapshot_data();

data.as_mut()
.shutdown()
Expand All @@ -191,32 +197,47 @@ impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked {
}

/// The Raft node is streaming in a snapshot from the leader.
pub(crate) struct Streaming<C>
pub struct Streaming<C>
where C: RaftTypeConfig
{
/// The offset of the last byte written to the snapshot.
pub(crate) offset: u64,
offset: u64,

/// The ID of the snapshot being written.
pub(crate) snapshot_id: SnapshotId,
snapshot_id: SnapshotId,

/// A handle to the snapshot writer.
pub(crate) snapshot_data: Box<C::SnapshotData>,
snapshot_data: Box<C::SnapshotData>,
}

impl<C> Streaming<C>
where C: RaftTypeConfig
{
pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box<C::SnapshotData>) -> Self {
pub fn new(snapshot_id: SnapshotId, snapshot_data: Box<C::SnapshotData>) -> Self {
Self {
offset: 0,
snapshot_id,
snapshot_data,
}
}

pub fn snapshot_id(&self) -> &SnapshotId {
&self.snapshot_id
}

/// Consumes the `Streaming` and returns the snapshot data.
pub fn into_snapshot_data(self) -> Box<C::SnapshotData> {
self.snapshot_data
}
}

impl<C> Streaming<C>
where
C: RaftTypeConfig,
C::SnapshotData: tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin,
{
/// Receive a chunk of snapshot data.
pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest<C>) -> Result<bool, StorageError<C::NodeId>> {
pub async fn receive(&mut self, req: InstallSnapshotRequest<C>) -> Result<bool, StorageError<C::NodeId>> {
// TODO: check id?

// Always seek to the target offset if not an exact match.
Expand Down
17 changes: 11 additions & 6 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ where C: RaftTypeConfig
tx_shutdown: Mutex::new(Some(tx_shutdown)),
core_state: Mutex::new(CoreState::Running(core_handle)),

#[cfg(not(feature = "generic-snapshot-data"))]
snapshot: Mutex::new(None),
};

Expand Down Expand Up @@ -409,7 +408,13 @@ where C: RaftTypeConfig
///
/// If receiving is finished `done == true`, it installs the snapshot to the state machine.
/// Nothing will be done if the input snapshot is older than the state machine.
#[cfg(not(feature = "generic-snapshot-data"))]
#[cfg_attr(
feature = "generic-snapshot-data",
deprecated(
since = "0.9.0",
note = "with `generic-snapshot-shot` enabled, use `Raft::install_full_snapshot()` instead"
)
)]
#[tracing::instrument(level = "debug", skip_all)]
pub async fn install_snapshot(
&self,
Expand All @@ -418,8 +423,8 @@ where C: RaftTypeConfig
where
C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin,
{
use crate::network::stream_snapshot::Chunked;
use crate::network::stream_snapshot::SnapshotTransport;
use crate::network::snapshot_transport::Chunked;
use crate::network::snapshot_transport::SnapshotTransport;

tracing::debug!(req = display(&req), "Raft::install_snapshot()");

Expand All @@ -428,7 +433,7 @@ where C: RaftTypeConfig

let mut streaming = self.inner.snapshot.lock().await;

let curr_id = streaming.as_ref().map(|s| &s.snapshot_id);
let curr_id = streaming.as_ref().map(|s| s.snapshot_id());

if curr_id != Some(&req.meta.snapshot_id) {
if req.offset != 0 {
Expand Down Expand Up @@ -457,7 +462,7 @@ where C: RaftTypeConfig
}
}
};
*streaming = Some(crate::network::stream_snapshot::Streaming::new(
*streaming = Some(crate::network::snapshot_transport::Streaming::new(
req.meta.snapshot_id.clone(),
snapshot_data,
));
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ where C: RaftTypeConfig
pub(in crate::raft) core_state: Mutex<CoreState<C::NodeId, C::AsyncRuntime>>,

/// The ongoing snapshot transmission.
#[cfg(not(feature = "generic-snapshot-data"))]
pub(in crate::raft) snapshot: Mutex<Option<crate::network::stream_snapshot::Streaming<C>>>,
pub(in crate::raft) snapshot: Mutex<Option<crate::network::snapshot_transport::Streaming<C>>>,
}

impl<C> RaftInner<C>
Expand Down
Loading