diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 58d929bc2..f5b13d884 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -63,44 +63,6 @@ jobs: args: --features bench nothing-to-run --manifest-path openraft/Cargo.toml - # Run feature specific test for crate openraft. - openraft-features: - runs-on: ubuntu-latest - - strategy: - fail-fast: false - matrix: - include: - - toolchain: "nightly" - features: "generic-snapshot-data" - - - steps: - - name: Setup | Checkout - uses: actions/checkout@v2 - - - - name: Setup | Toolchain - uses: actions-rs/toolchain@v1.0.6 - with: - toolchain: "${{ matrix.toolchain }}" - override: true - - - # - A store with defensive checks returns error when unexpected accesses are sent to RaftStore. - # - Raft should not depend on defensive error to work correctly. - - name: Test crate `openraft/` - uses: actions-rs/cargo@v1 - with: - command: test - args: --features "${{ matrix.features }}" --manifest-path openraft/Cargo.toml - env: - # Parallel tests block each other and result in timeout. - RUST_TEST_THREADS: 2 - RUST_LOG: debug - RUST_BACKTRACE: full - - # Run openraft unit test `openraft/` and integration test `tests/`. openraft-test: runs-on: ubuntu-latest @@ -409,7 +371,7 @@ jobs: example: - "memstore" - "raft-kv-memstore" - - "raft-kv-memstore-generic-snapshot-data" + - "raft-kv-memstore-network-v2" - "raft-kv-memstore-opendal-snapshot-data" - "raft-kv-memstore-singlethreaded" - "raft-kv-rocksdb" diff --git a/Cargo.toml b/Cargo.toml index ef96f9368..10706e9ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ exclude = [ "examples/memstore", "examples/raft-kv-memstore", "examples/raft-kv-memstore-singlethreaded", - "examples/raft-kv-memstore-generic-snapshot-data", + "examples/raft-kv-memstore-network-v2", "examples/raft-kv-memstore-opendal-snapshot-data", "examples/raft-kv-rocksdb", ] diff --git a/examples/raft-kv-memstore-generic-snapshot-data/.gitignore b/examples/raft-kv-memstore-network-v2/.gitignore similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/.gitignore rename to examples/raft-kv-memstore-network-v2/.gitignore diff --git a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-network-v2/Cargo.toml similarity index 87% rename from examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml rename to examples/raft-kv-memstore-network-v2/Cargo.toml index 7748a7be5..d755ca49f 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml +++ b/examples/raft-kv-memstore-network-v2/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "raft-kv-memstore-generic-snapshot-data" +name = "raft-kv-memstore-network-v2" version = "0.1.0" readme = "README.md" @@ -17,7 +17,7 @@ repository = "https://github.com/datafuselabs/openraft" [dependencies] memstore = { path = "../memstore", features = [] } -openraft = { path = "../../openraft", features = ["serde", "generic-snapshot-data", "type-alias"] } +openraft = { path = "../../openraft", features = ["serde", "type-alias"] } clap = { version = "4.1.11", features = ["derive", "env"] } reqwest = { version = "0.11.9", features = ["json"] } diff --git a/examples/raft-kv-memstore-generic-snapshot-data/README.md b/examples/raft-kv-memstore-network-v2/README.md similarity index 57% rename from examples/raft-kv-memstore-generic-snapshot-data/README.md rename to examples/raft-kv-memstore-network-v2/README.md index 44d879371..84ba7cfb2 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/README.md +++ b/examples/raft-kv-memstore-network-v2/README.md @@ -1,13 +1,13 @@ -# Example Openraft kv-store with `generic-snapshot-data` enabled +# Example Openraft kv-store using `RaftNetworkV2` -With `generic-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data, +With `RaftNetworkV2`, Openraft allows application to use any data type for snapshot data, instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds. This example is similar to the basic raft-kv-memstore example -but focuses on how to handle snapshot with `generic-snapshot-data` enabled. +but focuses on how to handle snapshot with `RaftNetworkV2::full_snapshot()`. Other aspects are minimized. -To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example. +To send a complete snapshot, Refer to implementation of `RaftNetworkV2::full_snapshot()` in this example. To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs b/examples/raft-kv-memstore-network-v2/src/api.rs similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/src/api.rs rename to examples/raft-kv-memstore-network-v2/src/api.rs diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/app.rs b/examples/raft-kv-memstore-network-v2/src/app.rs similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/src/app.rs rename to examples/raft-kv-memstore-network-v2/src/app.rs diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-network-v2/src/lib.rs similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs rename to examples/raft-kv-memstore-network-v2/src/lib.rs diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs b/examples/raft-kv-memstore-network-v2/src/network.rs similarity index 96% rename from examples/raft-kv-memstore-generic-snapshot-data/src/network.rs rename to examples/raft-kv-memstore-network-v2/src/network.rs index 75343a811..cf78e0fb8 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs +++ b/examples/raft-kv-memstore-network-v2/src/network.rs @@ -2,6 +2,7 @@ use std::future::Future; use openraft::error::RemoteError; use openraft::error::ReplicationClosed; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; @@ -10,7 +11,6 @@ use openraft::raft::VoteRequest; use openraft::raft::VoteResponse; use openraft::BasicNode; use openraft::OptionalSend; -use openraft::RaftNetwork; use openraft::RaftNetworkFactory; use openraft::Snapshot; use openraft::Vote; @@ -36,7 +36,7 @@ impl RaftNetworkFactory for Router { } } -impl RaftNetwork for Connection { +impl RaftNetworkV2 for Connection { async fn append_entries( &mut self, req: AppendEntriesRequest, diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/router.rs b/examples/raft-kv-memstore-network-v2/src/router.rs similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/src/router.rs rename to examples/raft-kv-memstore-network-v2/src/router.rs diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs b/examples/raft-kv-memstore-network-v2/src/store.rs similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/src/store.rs rename to examples/raft-kv-memstore-network-v2/src/store.rs diff --git a/examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh b/examples/raft-kv-memstore-network-v2/test-cluster.sh similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh rename to examples/raft-kv-memstore-network-v2/test-cluster.sh diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs b/examples/raft-kv-memstore-network-v2/tests/cluster/main.rs similarity index 100% rename from examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs rename to examples/raft-kv-memstore-network-v2/tests/cluster/main.rs diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-network-v2/tests/cluster/test_cluster.rs similarity index 94% rename from examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs rename to examples/raft-kv-memstore-network-v2/tests/cluster/test_cluster.rs index 8affab779..9d0742b63 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore-network-v2/tests/cluster/test_cluster.rs @@ -4,10 +4,10 @@ use std::panic::PanicInfo; use std::time::Duration; use openraft::BasicNode; -use raft_kv_memstore_generic_snapshot_data::new_raft; -use raft_kv_memstore_generic_snapshot_data::router::Router; -use raft_kv_memstore_generic_snapshot_data::store::Request; -use raft_kv_memstore_generic_snapshot_data::typ; +use raft_kv_memstore_network_v2::new_raft; +use raft_kv_memstore_network_v2::router::Router; +use raft_kv_memstore_network_v2::store::Request; +use raft_kv_memstore_network_v2::typ; use tokio::task; use tokio::task::LocalSet; use tracing_subscriber::EnvFilter; diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml index 35d9efb43..44a57723c 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml +++ b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml @@ -18,7 +18,7 @@ repository = "https://github.com/datafuselabs/openraft" [dependencies] memstore = { path = "../memstore", features = [] } -openraft = { path = "../../openraft", features = ["serde", "generic-snapshot-data", "type-alias"] } +openraft = { path = "../../openraft", features = ["serde", "type-alias"] } serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.57" diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/README.md b/examples/raft-kv-memstore-opendal-snapshot-data/README.md index b7942ad78..86919fe2a 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/README.md +++ b/examples/raft-kv-memstore-opendal-snapshot-data/README.md @@ -1,15 +1,13 @@ # Example Openraft kv-store with snapshot stored in remote storage -With `generic-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data, -instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds. - -This example shows how to save and retrieve snapshot data from remote storage, allowing users to follow a similar pattern for implementing business logic such as snapshot backups. +This example shows how to save and retrieve snapshot data from remote storage, +allowing users to follow a similar pattern for implementing business logic such as snapshot backups. This example is similar to the basic raft-kv-memstore example but focuses on how to store and fetch snapshot data from remote storage. Other aspects are minimized. -To send a complete snapshot, Refer to implementation of `RaftNetwork::full_snapshot()` in this example. +To send a complete snapshot, Refer to implementation of `RaftNetworkV2::full_snapshot()` in this example. To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs index 75343a811..cf78e0fb8 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs @@ -2,6 +2,7 @@ use std::future::Future; use openraft::error::RemoteError; use openraft::error::ReplicationClosed; +use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; @@ -10,7 +11,6 @@ use openraft::raft::VoteRequest; use openraft::raft::VoteResponse; use openraft::BasicNode; use openraft::OptionalSend; -use openraft::RaftNetwork; use openraft::RaftNetworkFactory; use openraft::Snapshot; use openraft::Vote; @@ -36,7 +36,7 @@ impl RaftNetworkFactory for Router { } } -impl RaftNetwork for Connection { +impl RaftNetworkV2 for Connection { async fn append_entries( &mut self, req: AppendEntriesRequest, diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 330e13935..05c3993e2 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -93,18 +93,6 @@ singlethreaded = ["openraft-macros/singlethreaded"] loosen-follower-log-revert = [] -# Enable this feature flag to eliminate the `AsyncRead + AsyncWrite + AsyncSeek -# + Unpin` bound from `RaftTypeConfig::SnapshotData`. -# -# Enabling this feature allows applications to use a custom snapshot data format -# and transport fragmentation, diverging from the default implementation which -# typically relies on a single-file structure . -# -# By default it is off. -# This feature is introduced in 0.9.0 -generic-snapshot-data = [] - - # Enables "log" feature in `tracing` crate, to let tracing events emit log # record. # See: https://docs.rs/tracing/latest/tracing/#emitting-log-records @@ -119,7 +107,6 @@ tracing-log = [ "tracing/log" ] features = [ "bt", "compat", - "generic-snapshot-data", "loosen-follower-log-revert", "serde", "tracing-log", diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index ca6e9b616..f2ca41e8c 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -59,9 +59,9 @@ use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; use crate::metrics::RaftServerMetrics; use crate::metrics::ReplicationMetrics; +use crate::network::v2::RaftNetworkV2; use crate::network::RPCOption; use crate::network::RPCTypes; -use crate::network::RaftNetwork; use crate::network::RaftNetworkFactory; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; diff --git a/openraft/src/docs/feature_flags/feature-flags-toc.md b/openraft/src/docs/feature_flags/feature-flags-toc.md index e889d6dd5..c2fc31e55 100644 --- a/openraft/src/docs/feature_flags/feature-flags-toc.md +++ b/openraft/src/docs/feature_flags/feature-flags-toc.md @@ -1,7 +1,6 @@ - [feature-flag `bench`](#feature-flag-bench) - [feature-flag `bt`](#feature-flag-bt) - [feature-flag `compat`](#feature-flag-compat) -- [feature-flag `generic-snapshot-data`](#feature-flag-generic-snapshot-data) - [feature-flag `loosen-follower-log-revert`](#feature-flag-loosen-follower-log-revert) - [feature-flag `serde`](#feature-flag-serde) - [feature-flag `single-term-leader`](#feature-flag-single-term-leader) diff --git a/openraft/src/docs/feature_flags/feature-flags.md b/openraft/src/docs/feature_flags/feature-flags.md index d87ef9d47..1f83fc29f 100644 --- a/openraft/src/docs/feature_flags/feature-flags.md +++ b/openraft/src/docs/feature_flags/feature-flags.md @@ -17,33 +17,6 @@ This feature works ONLY with nightly rust, because it requires unstable feature Enables compatibility supporting types. -## feature-flag `generic-snapshot-data` - -Enable this feature flag -to eliminate the `AsyncRead + AsyncWrite + AsyncSeek + Unpin` bound -from [`RaftTypeConfig::SnapshotData`](crate::RaftTypeConfig::SnapshotData) -Enabling this feature allows applications to use a custom snapshot data format and transport fragmentation, -diverging from the default implementation which typically relies on a single-file structure. - -By default, it is off. -This feature is introduced in 0.9.0 - -On the sending end (leader that sends snapshot to follower): - -- Without `generic-snapshot-data`: [`RaftNetwork::full_snapshot()`] - provides a default implementation that invokes the chunk-based API - [`RaftNetwork::install_snapshot()`] for transmit. - -- With `generic-snapshot-data` enabled: [`RaftNetwork::full_snapshot()`] - must be implemented to provide application customized snapshot transmission. - Application does not need to implement [`RaftNetwork::install_snapshot()`]. - -On the receiving end(follower): - -- `Raft::install_snapshot()` is available only when `generic-snapshot-data` is disabled. - -Refer to example `examples/raft-kv-memstore-generic-snapshot-data` with `generic-snapshot-data` enabled. - ## feature-flag `loosen-follower-log-revert` Permit the follower's log to roll back to an earlier state without causing the leader to panic. diff --git a/openraft/src/docs/getting_started/getting-started.md b/openraft/src/docs/getting_started/getting-started.md index 7574a0ffb..a1d7348ac 100644 --- a/openraft/src/docs/getting_started/getting-started.md +++ b/openraft/src/docs/getting_started/getting-started.md @@ -166,16 +166,22 @@ The caller always assumes a completed writing is persistent. The raft correctness highly depends on a reliable store. -## 4. Implement [`RaftNetwork`] +## 4. Implement [`RaftNetwork`] or [`RaftNetworkV2`]. + +Raft nodes communicate with each other to achieve consensus about the logs. +The trait [`RaftNetwork`] and [`RaftNetworkV2`] defines the data transmission protocol. + +Your application can use either [`RaftNetwork`] or [`RaftNetworkV2`]. +The only difference between them is: +- [`RaftNetwork`] sends snapshot in chunks with [`RaftNetwork::install_snapshot()`][`install_snapshot()`], +- while [`RaftNetworkV2`] sends snapshot in one piece with [`RaftNetworkV2::full_snapshot()`][`full_snapshot()`]. -Raft nodes need to communicate with each other to achieve consensus about the logs. -The trait [`RaftNetwork`] defines the data transmission requirements. ```ignore pub trait RaftNetwork: Send + Sync + 'static { async fn vote(&mut self, rpc: VoteRequest) -> Result<...>; async fn append_entries(&mut self, rpc: AppendEntriesRequest) -> Result<...>; - async fn snapshot(&mut self, vote: Vote, snapshot: Snapshot) -> Result<...>; + async fn install_snapshot(&mut self, vote: Vote, snapshot: Snapshot) -> Result<...>; } ``` @@ -186,11 +192,12 @@ and receiving messages between Raft nodes. Here is the list of methods that need to be implemented for the [`RaftNetwork`] trait: -| [`RaftNetwork`] method | forward request | to target | -|------------------------|--------------------------|------------------------------------------------| -| [`append_entries()`] | [`AppendEntriesRequest`] | remote node [`Raft::append_entries()`] | -| [`full_snapshot()`] | [`Snapshot`] | remote node [`Raft::install_full_snapshot()`] | -| [`vote()`] | [`VoteRequest`] | remote node [`Raft::vote()`] | +| [`RaftNetwork`] method | forward request | to target | +|------------------------|----------------------------|-----------------------------------------------| +| [`append_entries()`] | [`AppendEntriesRequest`] | remote node [`Raft::append_entries()`] | +| [`vote()`] | [`VoteRequest`] | remote node [`Raft::vote()`] | +| [`install_snapshot()`] | [`InstallSnapshotRequest`] | remote node [`Raft::install_snapshot()`] | +| [`full_snapshot()`] | [`Snapshot`] | remote node [`Raft::install_full_snapshot()`] | [Mem KV Network](https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/network/raft_network_impl.rs) demonstrates how to forward messages to other Raft nodes using [`reqwest`](https://docs.rs/reqwest/latest/reqwest/) as network transport layer. @@ -202,9 +209,9 @@ When the server receives a Raft RPC, it simply passes it to its `raft` instance For a real-world implementation, you may want to use [Tonic gRPC](https://github.com/hyperium/tonic) to handle gRPC-based communication between Raft nodes. The [databend-meta](https://github.com/datafuselabs/databend/blob/6603392a958ba8593b1f4b01410bebedd484c6a9/metasrv/src/network.rs#L89) project provides an excellent real-world example of a Tonic gRPC-based Raft network implementation. -### Implement [`RaftNetworkFactory`] +### Implement [`RaftNetworkFactory`]. -[`RaftNetworkFactory`] is a singleton responsible for creating [`RaftNetwork`] instances for each replication target node. +[`RaftNetworkFactory`] is a singleton responsible for creating [`RaftNetworkV2`] instances for each replication target node. ```ignore pub trait RaftNetworkFactory: Send + Sync + 'static { @@ -353,7 +360,8 @@ Additionally, two test scripts for setting up a cluster are available: [`Raft`]: `crate::Raft` [`Raft::append_entries()`]: `crate::Raft::append_entries` [`Raft::vote()`]: `crate::Raft::vote` -[`Raft::install_full_snapshot()`]: `crate::Raft::install_full_snapshot` +[`Raft::install_full_snapshot()`]: `crate::Raft::install_full_snapshot` +[`Raft::install_snapshot()`]: `crate::Raft::install_snapshot` [`AppendEntriesRequest`]: `crate::raft::AppendEntriesRequest` [`VoteRequest`]: `crate::raft::VoteRequest` @@ -396,11 +404,13 @@ Additionally, two test scripts for setting up a cluster are available: [`get_snapshot_builder()`]: `crate::storage::RaftStateMachine::get_snapshot_builder` [`RaftNetworkFactory`]: `crate::network::RaftNetworkFactory` -[`RaftNetworkFactory::new_client()`]: `crate::network::RaftNetworkFactory::new_client` [`RaftNetwork`]: `crate::network::RaftNetwork` +[`RaftNetworkFactory::new_client()`]: `crate::network::RaftNetworkFactory::new_client` [`append_entries()`]: `crate::RaftNetwork::append_entries` [`vote()`]: `crate::RaftNetwork::vote` -[`full_snapshot()`]: `crate::RaftNetwork::full_snapshot` +[`install_snapshot()`]: `crate::RaftNetwork::install_snapshot` +[`full_snapshot()`]: `crate::network::v2::RaftNetworkV2::full_snapshot` +[`RaftNetworkV2`]: `crate::network::v2::RaftNetworkV2` [`RaftSnapshotBuilder`]: `crate::storage::RaftSnapshotBuilder` diff --git a/openraft/src/docs/upgrade_guide/upgrade-v08-v09.md b/openraft/src/docs/upgrade_guide/upgrade-v08-v09.md index 326d28dd2..fe17e1594 100644 --- a/openraft/src/docs/upgrade_guide/upgrade-v08-v09.md +++ b/openraft/src/docs/upgrade_guide/upgrade-v08-v09.md @@ -193,7 +193,7 @@ To use arbitrary snapshot data, the application needs to: [`Raft::install_full_snapshot()`]: `crate::Raft::install_full_snapshot` [`RaftNetwork`]: `crate::network::RaftNetwork` -[`RaftNetwork::full_snapshot()`]: `crate::network::RaftNetwork::full_snapshot` +[`RaftNetwork::full_snapshot()`]: `crate::network::v2::RaftNetworkV2::full_snapshot` [`RaftLogStorage::save_committed()`]: `crate::storage::RaftLogStorage::save_committed` diff --git a/openraft/src/network/mod.rs b/openraft/src/network/mod.rs index b4cb9915b..ad2bdd029 100644 --- a/openraft/src/network/mod.rs +++ b/openraft/src/network/mod.rs @@ -1,15 +1,16 @@ //! The Raft network interface. mod backoff; -mod factory; -#[allow(clippy::module_inception)] mod network; mod rpc_option; mod rpc_type; +pub mod v1; +pub mod v2; + pub mod snapshot_transport; pub use backoff::Backoff; -pub use factory::RaftNetworkFactory; -pub use network::RaftNetwork; pub use rpc_option::RPCOption; pub use rpc_type::RPCTypes; +pub use v1::RaftNetwork; +pub use v1::RaftNetworkFactory; diff --git a/openraft/src/network/snapshot_transport.rs b/openraft/src/network/snapshot_transport.rs index 19e262637..3c3c93f0c 100644 --- a/openraft/src/network/snapshot_transport.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -349,21 +349,16 @@ where } } -#[cfg(feature = "generic-snapshot-data")] #[cfg(test)] mod tests { - use std::future::Future; use std::io::Cursor; use std::time::Duration; use crate::engine::testing::UTConfig; - use crate::error::Fatal; use crate::error::InstallSnapshotError; use crate::error::RPCError; use crate::error::RaftError; - use crate::error::ReplicationClosed; use crate::error::SnapshotMismatch; - use crate::error::StreamingError; use crate::network::snapshot_transport::Chunked; use crate::network::snapshot_transport::SnapshotTransport; use crate::network::RPCOption; @@ -371,10 +366,8 @@ mod tests { use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotRequest; use crate::raft::InstallSnapshotResponse; - use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; - use crate::OptionalSend; use crate::RaftNetwork; use crate::RaftTypeConfig; use crate::Snapshot; @@ -406,16 +399,6 @@ mod tests { unimplemented!() } - async fn full_snapshot( - &mut self, - _vote: Vote, - _snapshot: Snapshot, - _cancel: impl Future + OptionalSend, - _option: RPCOption, - ) -> Result, StreamingError>> { - unimplemented!() - } - async fn install_snapshot( &mut self, rpc: InstallSnapshotRequest, @@ -440,7 +423,7 @@ mod tests { }, }; let err = RaftError::APIError(InstallSnapshotError::SnapshotMismatch(mismatch)); - return Err(RPCError::RemoteError(crate::error::RemoteError::new(0, err))); + Err(RPCError::RemoteError(crate::error::RemoteError::new(0, err))) } else { Ok(InstallSnapshotResponse { vote: rpc.vote }) } diff --git a/openraft/src/network/factory.rs b/openraft/src/network/v1/factory.rs similarity index 95% rename from openraft/src/network/factory.rs rename to openraft/src/network/v1/factory.rs index dbcdfe95e..11913e073 100644 --- a/openraft/src/network/factory.rs +++ b/openraft/src/network/v1/factory.rs @@ -1,6 +1,6 @@ use openraft_macros::add_async_trait; -use crate::network::RaftNetwork; +use crate::network::v2::RaftNetworkV2; use crate::OptionalSend; use crate::OptionalSync; use crate::RaftTypeConfig; @@ -18,7 +18,7 @@ pub trait RaftNetworkFactory: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Actual type of the network handling a single connection. - type Network: RaftNetwork; + type Network: RaftNetworkV2; /// Create a new network instance sending RPCs to the target node. /// diff --git a/openraft/src/network/v1/mod.rs b/openraft/src/network/v1/mod.rs new file mode 100644 index 000000000..5f8ef9410 --- /dev/null +++ b/openraft/src/network/v1/mod.rs @@ -0,0 +1,5 @@ +mod factory; +mod network; + +pub use factory::RaftNetworkFactory; +pub use network::RaftNetwork; diff --git a/openraft/src/network/v1/network.rs b/openraft/src/network/v1/network.rs new file mode 100644 index 000000000..58f31c739 --- /dev/null +++ b/openraft/src/network/v1/network.rs @@ -0,0 +1,63 @@ +use std::time::Duration; + +use openraft_macros::add_async_trait; + +use crate::error::RPCError; +use crate::error::RaftError; +use crate::network::rpc_option::RPCOption; +use crate::network::Backoff; +use crate::raft::AppendEntriesRequest; +use crate::raft::AppendEntriesResponse; +use crate::raft::VoteRequest; +use crate::raft::VoteResponse; +use crate::OptionalSend; +use crate::OptionalSync; +use crate::RaftTypeConfig; + +/// A trait defining the interface for a Raft network between cluster members. +/// +/// See the [network chapter of the guide](crate::docs::getting_started#4-implement-raftnetwork) +/// for details and discussion on this trait and how to implement it. +/// +/// A single network instance is used to connect to a single target node. The network instance is +/// constructed by the [`RaftNetworkFactory`](`crate::network::RaftNetworkFactory`). +#[add_async_trait] +pub trait RaftNetwork: OptionalSend + OptionalSync + 'static +where C: RaftTypeConfig +{ + /// Send an AppendEntries RPC to the target. + async fn append_entries( + &mut self, + rpc: AppendEntriesRequest, + option: RPCOption, + ) -> Result, RPCError>>; + + /// Send an InstallSnapshot RPC to the target. + async fn install_snapshot( + &mut self, + _rpc: crate::raft::InstallSnapshotRequest, + _option: RPCOption, + ) -> Result, RPCError>>; + + /// Send a RequestVote RPC to the target. + async fn vote( + &mut self, + rpc: VoteRequest, + option: RPCOption, + ) -> Result, RPCError>>; + + /// Build a backoff instance if the target node is temporarily(or permanently) unreachable. + /// + /// When a [`Unreachable`](`crate::error::Unreachable`) error is returned from the `Network` + /// methods, Openraft does not retry connecting to a node immediately. Instead, it sleeps + /// for a while and retries. The duration of the sleep is determined by the backoff + /// instance. + /// + /// The backoff is an infinite iterator that returns the ith sleep interval before the ith + /// retry. The returned instance will be dropped if a successful RPC is made. + /// + /// By default it returns a constant backoff of 500 ms. + fn backoff(&self) -> Backoff { + Backoff::new(std::iter::repeat(Duration::from_millis(500))) + } +} diff --git a/openraft/src/network/v2/adapt_v1.rs b/openraft/src/network/v2/adapt_v1.rs new file mode 100644 index 000000000..746ee62d2 --- /dev/null +++ b/openraft/src/network/v2/adapt_v1.rs @@ -0,0 +1,61 @@ +use std::future::Future; + +use crate::error::Fatal; +use crate::error::RPCError; +use crate::error::RaftError; +use crate::error::ReplicationClosed; +use crate::error::StreamingError; +use crate::network::v2::RaftNetworkV2; +use crate::network::Backoff; +use crate::network::RPCOption; +use crate::raft::AppendEntriesRequest; +use crate::raft::AppendEntriesResponse; +use crate::raft::SnapshotResponse; +use crate::raft::VoteRequest; +use crate::raft::VoteResponse; +use crate::OptionalSend; +use crate::RaftNetwork; +use crate::RaftTypeConfig; +use crate::Snapshot; +use crate::Vote; + +impl RaftNetworkV2 for V1 +where + C: RaftTypeConfig, + V1: RaftNetwork, + C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, +{ + async fn append_entries( + &mut self, + rpc: AppendEntriesRequest, + option: RPCOption, + ) -> Result, RPCError>> { + RaftNetwork::::append_entries(self, rpc, option).await + } + + async fn vote( + &mut self, + rpc: VoteRequest, + option: RPCOption, + ) -> Result, RPCError>> { + RaftNetwork::::vote(self, rpc, option).await + } + + async fn full_snapshot( + &mut self, + vote: Vote, + snapshot: Snapshot, + cancel: impl Future + OptionalSend, + option: RPCOption, + ) -> Result, StreamingError>> { + use crate::network::snapshot_transport::Chunked; + use crate::network::snapshot_transport::SnapshotTransport; + + let resp = Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; + Ok(resp) + } + + fn backoff(&self) -> Backoff { + RaftNetwork::::backoff(self) + } +} diff --git a/openraft/src/network/v2/mod.rs b/openraft/src/network/v2/mod.rs new file mode 100644 index 000000000..5680df60b --- /dev/null +++ b/openraft/src/network/v2/mod.rs @@ -0,0 +1,4 @@ +mod adapt_v1; +mod network; + +pub use network::RaftNetworkV2; diff --git a/openraft/src/network/network.rs b/openraft/src/network/v2/network.rs similarity index 56% rename from openraft/src/network/network.rs rename to openraft/src/network/v2/network.rs index 50ee678f4..5c17af78f 100644 --- a/openraft/src/network/network.rs +++ b/openraft/src/network/v2/network.rs @@ -8,8 +8,8 @@ use crate::error::RPCError; use crate::error::RaftError; use crate::error::ReplicationClosed; use crate::error::StreamingError; -use crate::network::rpc_option::RPCOption; use crate::network::Backoff; +use crate::network::RPCOption; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::SnapshotResponse; @@ -28,8 +28,16 @@ use crate::Vote; /// /// A single network instance is used to connect to a single target node. The network instance is /// constructed by the [`RaftNetworkFactory`](`crate::network::RaftNetworkFactory`). +/// +/// V2 network API removes `install_snapshot()` method that sends snapshot in chunks +/// and introduces `full_snapshot()` method that let application fully customize snapshot transmit. +/// +/// Compatibility: [`RaftNetworkV2`] is automatically implemented for [`RaftNetwork`] +/// implementations. +/// +/// [`RaftNetwork`]: crate::network::v1::RaftNetwork #[add_async_trait] -pub trait RaftNetwork: OptionalSend + OptionalSync + 'static +pub trait RaftNetworkV2: OptionalSend + OptionalSync + 'static where C: RaftTypeConfig { /// Send an AppendEntries RPC to the target. @@ -39,29 +47,6 @@ where C: RaftTypeConfig option: RPCOption, ) -> Result, RPCError>>; - /// Send an InstallSnapshot RPC to the target. - #[cfg(feature = "generic-snapshot-data")] - #[deprecated( - since = "0.9.0", - note = "with `generic-snapshot-data` enabled, use `full_snapshot()` instead to send full snapshot" - )] - async fn install_snapshot( - &mut self, - _rpc: crate::raft::InstallSnapshotRequest, - _option: RPCOption, - ) -> Result, RPCError>> - { - unimplemented!() - } - - /// Send an InstallSnapshot RPC to the target. - #[cfg(not(feature = "generic-snapshot-data"))] - async fn install_snapshot( - &mut self, - _rpc: crate::raft::InstallSnapshotRequest, - _option: RPCOption, - ) -> Result, RPCError>>; - /// Send a RequestVote RPC to the target. async fn vote( &mut self, @@ -84,7 +69,6 @@ where C: RaftTypeConfig /// with this vote. /// /// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission. - #[cfg(feature = "generic-snapshot-data")] async fn full_snapshot( &mut self, vote: Vote, @@ -93,38 +77,6 @@ where C: RaftTypeConfig option: RPCOption, ) -> Result, StreamingError>>; - /// Send a complete Snapshot to the target. - /// - /// This method is responsible to fragment the snapshot and send it to the target node. - /// Before returning from this method, the snapshot should be completely transmitted and - /// installed on the target node, or rejected because of `vote` being smaller than the - /// remote one. - /// - /// The default implementation just calls several `install_snapshot` RPC for each fragment. - /// - /// The `vote` is the leader vote which is used to check if the leader is still valid by a - /// follower. - /// When the follower finished receiving snapshot, it calls `Raft::install_full_snapshot()` - /// with this vote. - /// - /// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission. - // If generic-snapshot-data disabled, - // provide a default implementation that relies on AsyncRead + AsyncSeek + Unpin - #[cfg(not(feature = "generic-snapshot-data"))] - async fn full_snapshot( - &mut self, - vote: Vote, - snapshot: Snapshot, - cancel: impl Future + OptionalSend, - option: RPCOption, - ) -> Result, StreamingError>> { - use crate::network::snapshot_transport::Chunked; - use crate::network::snapshot_transport::SnapshotTransport; - - let resp = Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; - Ok(resp) - } - /// Build a backoff instance if the target node is temporarily(or permanently) unreachable. /// /// When a [`Unreachable`](`crate::error::Unreachable`) error is returned from the `Network` diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 8f0d704d9..b91aba44a 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -62,7 +62,6 @@ use crate::metrics::RaftMetrics; use crate::metrics::RaftServerMetrics; use crate::metrics::Wait; use crate::metrics::WaitError; -use crate::network::RaftNetworkFactory; use crate::raft::raft_inner::RaftInner; use crate::raft::responder::Responder; pub use crate::raft::runtime_config_handle::RuntimeConfigHandle; @@ -78,6 +77,7 @@ use crate::AsyncRuntime; use crate::LogId; use crate::LogIdOptionExt; use crate::OptionalSend; +use crate::RaftNetworkFactory; use crate::RaftState; pub use crate::RaftTypeConfig; use crate::Snapshot; @@ -210,13 +210,14 @@ where C: RaftTypeConfig /// Raft's runtime config. See the docs on the `Config` object for more details. /// /// ### `network` - /// An implementation of the `RaftNetworkFactory` trait which will be used by Raft for sending - /// RPCs to peer nodes within the cluster. See the docs on the `RaftNetworkFactory` trait - /// for more details. + /// An implementation of the [`RaftNetworkFactory`] trait which will be used by Raft for + /// sending RPCs to peer nodes within the cluster. /// /// ### `storage` /// An implementation of the [`RaftLogStorage`] and [`RaftStateMachine`] trait which will be /// used by Raft for data storage. + /// + /// [`RaftNetworkFactory`]: crate::network::RaftNetworkFactory #[tracing::instrument(level="debug", skip_all, fields(cluster=%config.cluster_name))] pub async fn new( id: C::NodeId, @@ -423,13 +424,6 @@ where C: RaftTypeConfig /// /// If receiving is finished `done == true`, it installs the snapshot to the state machine. /// Nothing will be done if the input snapshot is older than the state machine. - #[cfg_attr( - feature = "generic-snapshot-data", - deprecated( - since = "0.9.0", - note = "with `generic-snapshot-shot` enabled, use `Raft::install_full_snapshot()` instead" - ) - )] #[tracing::instrument(level = "debug", skip_all)] pub async fn install_snapshot( &self, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 6731fa4fa..0e2544ebf 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -37,11 +37,10 @@ use crate::error::ReplicationError; use crate::error::Timeout; use crate::log_id::LogIdOptionExt; use crate::log_id_range::LogIdRange; +use crate::network::v2::RaftNetworkV2; use crate::network::Backoff; use crate::network::RPCOption; use crate::network::RPCTypes; -use crate::network::RaftNetwork; -use crate::network::RaftNetworkFactory; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::replication::callbacks::SnapshotCallback; @@ -58,6 +57,7 @@ use crate::AsyncRuntime; use crate::Instant; use crate::LogId; use crate::RaftLogId; +use crate::RaftNetworkFactory; use crate::RaftTypeConfig; use crate::StorageError; use crate::StorageIOError; diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index f6676019a..379be536c 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -60,14 +60,6 @@ pub trait RaftTypeConfig: /// See the [storage chapter of the guide][sto] for details on log compaction / snapshotting. /// /// [sto]: crate::docs::getting_started#3-implement-raftlogstorage-and-raftstatemachine - #[cfg(not(feature = "generic-snapshot-data"))] - type SnapshotData: tokio::io::AsyncRead - + tokio::io::AsyncWrite - + tokio::io::AsyncSeek - + OptionalSend - + Unpin - + 'static; - #[cfg(feature = "generic-snapshot-data")] type SnapshotData: OptionalSend + 'static; /// Asynchronous runtime type.