diff --git a/Cargo.toml b/Cargo.toml index 25cd75bae..933bdfbbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ version = "0.9.14" edition = "2021" authors = [ "Databend Authors ", - "Anthony Dodd " + "Anthony Dodd ", ] categories = ["algorithms", "asynchronous", "data-structures"] description = "Advanced Raft consensus" @@ -18,16 +18,16 @@ repository = "https://github.com/datafuselabs/openraft" anyerror = { version = "0.1.10" } anyhow = "1.0.63" async-entry = "0.3.1" -byte-unit = "4.0.12" +byte-unit = "5.1.4" bytes = "1.0" chrono = { version = "0.4" } clap = { version = "4.1.11", features = ["derive", "env"] } -derive_more = { version="0.99.9" } +derive_more = { version = "1.0", features = ["std", "from", "try_into", "display"] } futures = "0.3" lazy_static = "1.4.0" maplit = "1.0.2" pretty_assertions = "1.0.0" -proc-macro2 = { version = ">=1.0.0,<1.0.80", features = [] } +proc-macro2 = "1.0" quote = "1.0" rand = "0.8" semver = "1.0.14" @@ -36,7 +36,14 @@ serde_json = "1.0.57" syn = "2.0" tempfile = { version = "3.4.0" } thiserror = "1.0.49" -tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] } +tokio = { version = "1.22", default-features = false, features = [ + "io-util", + "macros", + "rt", + "rt-multi-thread", + "sync", + "time", +] } tracing = { version = "0.1.40" } tracing-appender = "0.2.0" tracing-futures = "0.2.4" diff --git a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml index d6702cb7e..5f0dc746b 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml +++ b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml @@ -19,8 +19,6 @@ repository = "https://github.com/datafuselabs/openraft" memstore = { path = "../memstore", features = [] } openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] } -clap = { version = "4.1.11", features = ["derive", "env"] } -reqwest = { version = "0.11.9", features = ["json"] } serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.57" tokio = { version = "1.0", default-features = false, features = ["sync"] } diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs index 8affab779..73d1d0475 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs @@ -1,5 +1,6 @@ use std::backtrace::Backtrace; use std::collections::BTreeMap; +#[allow(deprecated)] use std::panic::PanicInfo; use std::time::Duration; @@ -12,6 +13,7 @@ use tokio::task; use tokio::task::LocalSet; use tracing_subscriber::EnvFilter; +#[allow(deprecated)] pub fn log_panic(panic: &PanicInfo) { let backtrace = format!("{:?}", Backtrace::force_capture()); diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml index 8a97ff0fa..149a08acf 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml +++ b/examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" authors = [ "drdr xp ", "Pedro Paulo de Amorim ", - "Xuanwo " + "Xuanwo ", ] categories = ["algorithms", "asynchronous", "data-structures"] description = "An example distributed key-value store built upon `openraft`." @@ -20,12 +20,13 @@ repository = "https://github.com/datafuselabs/openraft" memstore = { path = "../memstore", features = [] } openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] } +bytes = "1.0" serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.57" tokio = { version = "1.0", default-features = false, features = ["sync"] } tracing = "0.1.29" tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } -opendal = "0.45.0" +opendal = "0.48.0" [dev-dependencies] maplit = "1.0.2" diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs index 848af5577..f489f5474 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs @@ -3,6 +3,8 @@ use std::sync::Arc; +use bytes::Buf; +use opendal::Buffer; use opendal::Operator; use openraft::Config; @@ -69,6 +71,10 @@ pub fn decode(s: &str) -> T { serde_json::from_str(s).unwrap() } +pub fn decode_buffer(b: Buffer) -> T { + serde_json::from_reader(b.reader()).unwrap() +} + pub async fn new_raft(node_id: NodeId, router: Router, op: Operator) -> (typ::Raft, App) { // Create a configuration for the raft instance. let config = Config { diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs index fee80b0f7..81e60283d 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs @@ -18,7 +18,7 @@ use openraft::StoredMembership; use serde::Deserialize; use serde::Serialize; -use crate::decode; +use crate::decode_buffer; use crate::encode; use crate::typ; use crate::NodeId; @@ -212,7 +212,7 @@ impl RaftStateMachine for Arc { // Update the state machine. { let bs = self.storage.read(&new_snapshot.data).await.unwrap(); - let updated_state_machine: StateMachineData = decode(&String::from_utf8_lossy(&bs)); + let updated_state_machine: StateMachineData = decode_buffer(bs); let mut state_machine = self.state_machine.lock().unwrap(); *state_machine = updated_state_machine; } diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs index 87682a428..89460bda7 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/tests/cluster/test_cluster.rs @@ -1,6 +1,7 @@ use std::backtrace::Backtrace; use std::collections::BTreeMap; use std::collections::HashMap; +#[allow(deprecated)] use std::panic::PanicInfo; use std::time::Duration; @@ -13,6 +14,7 @@ use tokio::task; use tokio::task::LocalSet; use tracing_subscriber::EnvFilter; +#[allow(deprecated)] pub fn log_panic(panic: &PanicInfo) { let backtrace = format!("{:?}", Backtrace::force_capture()); @@ -57,7 +59,7 @@ async fn test_cluster() { // This test only use memory service for simplicity. // Feel free to test against fs or s3. - let op = opendal::Operator::via_map(opendal::Scheme::Memory, HashMap::default()).unwrap(); + let op = opendal::Operator::via_iter(opendal::Scheme::Memory, HashMap::::default()).unwrap(); let router = Router::default(); diff --git a/examples/raft-kv-memstore-singlethreaded/Cargo.toml b/examples/raft-kv-memstore-singlethreaded/Cargo.toml index f1ce56fa4..3b7d71fdb 100644 --- a/examples/raft-kv-memstore-singlethreaded/Cargo.toml +++ b/examples/raft-kv-memstore-singlethreaded/Cargo.toml @@ -16,10 +16,12 @@ license = "MIT OR Apache-2.0" repository = "https://github.com/datafuselabs/openraft" [dependencies] -openraft = { path = "../../openraft", features = ["serde", "storage-v2", "singlethreaded"] } +openraft = { path = "../../openraft", features = [ + "serde", + "singlethreaded", + "storage-v2", +] } -clap = { version = "4.1.11", features = ["derive", "env"] } -reqwest = { version = "0.11.9", features = ["json"] } serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.57" tokio = { version = "1.0", default-features = false, features = ["sync"] } diff --git a/examples/raft-kv-memstore-singlethreaded/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-singlethreaded/tests/cluster/test_cluster.rs index ab3a3d829..c2613e6c5 100644 --- a/examples/raft-kv-memstore-singlethreaded/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore-singlethreaded/tests/cluster/test_cluster.rs @@ -1,6 +1,7 @@ use std::backtrace::Backtrace; use std::collections::BTreeMap; use std::collections::BTreeSet; +#[allow(deprecated)] use std::panic::PanicInfo; use std::time::Duration; @@ -21,6 +22,7 @@ use tokio::task; use tokio::task::LocalSet; use tracing_subscriber::EnvFilter; +#[allow(deprecated)] pub fn log_panic(panic: &PanicInfo) { let backtrace = format!("{:?}", Backtrace::force_capture()); diff --git a/examples/raft-kv-memstore/Cargo.toml b/examples/raft-kv-memstore/Cargo.toml index b9004d830..5d1820e3b 100644 --- a/examples/raft-kv-memstore/Cargo.toml +++ b/examples/raft-kv-memstore/Cargo.toml @@ -25,7 +25,7 @@ openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } actix-web = "4.0.0-rc.2" clap = { version = "4.1.11", features = ["derive", "env"] } -reqwest = { version = "0.11.9", features = ["json"] } +reqwest = { version = "0.12.5", features = ["json"] } serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.57" tokio = { version = "1.0", default-features = false, features = ["sync"] } diff --git a/examples/raft-kv-memstore/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore/tests/cluster/test_cluster.rs index 1c8a8f14f..486fed51a 100644 --- a/examples/raft-kv-memstore/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore/tests/cluster/test_cluster.rs @@ -1,5 +1,6 @@ use std::backtrace::Backtrace; use std::collections::BTreeMap; +#[allow(deprecated)] use std::panic::PanicInfo; use std::thread; use std::time::Duration; @@ -13,6 +14,7 @@ use raft_kv_memstore::store::Request; use tokio::runtime::Runtime; use tracing_subscriber::EnvFilter; +#[allow(deprecated)] pub fn log_panic(panic: &PanicInfo) { let backtrace = { format!("{:?}", Backtrace::force_capture()) diff --git a/examples/raft-kv-rocksdb/Cargo.toml b/examples/raft-kv-rocksdb/Cargo.toml index 29477d6ee..8f7355066 100644 --- a/examples/raft-kv-rocksdb/Cargo.toml +++ b/examples/raft-kv-rocksdb/Cargo.toml @@ -5,9 +5,9 @@ readme = "README.md" edition = "2021" authors = [ - "drdr xp ", - "Pedro Paulo de Amorim ", - "The Tremor Team", + "drdr xp ", + "Pedro Paulo de Amorim ", + "The Tremor Team", ] categories = ["algorithms", "asynchronous", "data-structures"] description = "An example distributed key-value store built upon `openraft`." @@ -26,14 +26,14 @@ openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } tokio = { version = "1.35.1", features = ["full"] } byteorder = "1.4.3" clap = { version = "4.1.11", features = ["derive", "env"] } -reqwest = { version = "0.11.9", features = ["json"] } +reqwest = { version = "0.12.5", features = ["json"] } rocksdb = "0.22.0" serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.57" tide = { version = "0.16" } # for toy-rpc, use `serde_json` instead of the default `serde_bincode`: # bincode which enabled by default by toy-rpc, does not support `#[serde(flatten)]`: https://docs.rs/bincode/2.0.0-alpha.1/bincode/serde/index.html#known-issues -toy-rpc = { version = "0.8.6", features = [ +toy-rpc = { version = "0.10.0", features = [ "ws_tokio", "server", "client", diff --git a/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs b/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs index aa79926b7..6f2f415ba 100644 --- a/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs @@ -1,5 +1,6 @@ use std::backtrace::Backtrace; use std::collections::BTreeMap; +#[allow(deprecated)] use std::panic::PanicInfo; use std::thread; use std::time::Duration; @@ -13,6 +14,7 @@ use raft_kv_rocksdb::Node; use tokio::runtime::Handle; use tracing_subscriber::EnvFilter; +#[allow(deprecated)] pub fn log_panic(panic: &PanicInfo) { let backtrace = { format!("{:?}", Backtrace::force_capture()) }; diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 7dffad6ac..96284290f 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -27,15 +27,12 @@ maplit = { workspace = true } rand = { workspace = true } serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } -tempfile = { workspace = true, optional = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-futures = { workspace = true } validit = { workspace = true } -or07 = { package = "openraft", version = "0.7.4", optional = true } - [dev-dependencies] anyhow = { workspace = true } async-entry = { workspace = true } diff --git a/openraft/src/config/config.rs b/openraft/src/config/config.rs index 813c5d9d7..747c68634 100644 --- a/openraft/src/config/config.rs +++ b/openraft/src/config/config.rs @@ -1,6 +1,7 @@ //! Raft runtime configuration. use std::ops::Deref; +use std::str::FromStr; use std::sync::atomic::AtomicBool; use std::time::Duration; @@ -54,7 +55,7 @@ fn parse_bytes_with_unit(src: &str) -> Result { reason: e.to_string(), })?; - Ok(res.get_bytes() as u64) + Ok(res.as_u64()) } fn parse_snapshot_policy(src: &str) -> Result { diff --git a/openraft/src/docs/docs.md b/openraft/src/docs/docs.md index e41b37a55..5c8511bfe 100644 --- a/openraft/src/docs/docs.md +++ b/openraft/src/docs/docs.md @@ -41,4 +41,4 @@ Contributors who want to understand the internals of Openraft can find relevant Finally, the archived and discarded documents: - [`obsolete`](crate::docs::obsolete) describes obsolete design documents and why they are discarded; - [`blank-log-heartbeeat`](`crate::docs::obsolete::heartbeat`); - - [`fast-commit`](`crate::docs::obsolete::fast_commit`); + - [`fast-commit`](`crate::docs::obsolete::fast_commit`); \ No newline at end of file diff --git a/openraft/src/docs/mod.rs b/openraft/src/docs/mod.rs index 987855876..33ff577fe 100644 --- a/openraft/src/docs/mod.rs +++ b/openraft/src/docs/mod.rs @@ -1,3 +1,4 @@ +#![allow(rustdoc::redundant_explicit_links)] #![doc = include_str!("docs.md")] #[rustfmt::skip] diff --git a/openraft/src/docs/protocol/snapshot_replication.md b/openraft/src/docs/protocol/snapshot_replication.md index 8bdfdf958..40b169b7e 100644 --- a/openraft/src/docs/protocol/snapshot_replication.md +++ b/openraft/src/docs/protocol/snapshot_replication.md @@ -112,8 +112,7 @@ Installing snapshot includes two steps: The final step is to purge logs up to [`snapshot_meta.last_log_id`]. This step is necessary because: -- 1) A local log that is <= [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used - anymore. +- 1) A local log that is <= [`snapshot_meta.last_log_id`] may conflict with the leader, and can not be used anymore. - 2) There may be a hole in the logs, if `snapshot_last_log_id > local_last_log_id`: diff --git a/openraft/src/error/streaming_error.rs b/openraft/src/error/streaming_error.rs index 0a91b63e6..992e080e1 100644 --- a/openraft/src/error/streaming_error.rs +++ b/openraft/src/error/streaming_error.rs @@ -66,6 +66,7 @@ impl From>> for Replicatio impl From> for ReplicationError { fn from(e: StreamingError) -> Self { + #[allow(unreachable_patterns)] match e { StreamingError::Closed(e) => ReplicationError::Closed(e), StreamingError::StorageError(e) => ReplicationError::StorageError(e), diff --git a/openraft/src/raft/message/append_entries.rs b/openraft/src/raft/message/append_entries.rs index 0387ea2ed..0e4c0bf40 100644 --- a/openraft/src/raft/message/append_entries.rs +++ b/openraft/src/raft/message/append_entries.rs @@ -26,9 +26,7 @@ pub struct AppendEntriesRequest { pub leader_commit: Option>, } -impl fmt::Debug for AppendEntriesRequest -where C::D: fmt::Debug -{ +impl fmt::Debug for AppendEntriesRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("AppendEntriesRequest") .field("vote", &self.vote) diff --git a/openraft/src/raft/message/install_snapshot.rs b/openraft/src/raft/message/install_snapshot.rs index f48806db9..abd750e42 100644 --- a/openraft/src/raft/message/install_snapshot.rs +++ b/openraft/src/raft/message/install_snapshot.rs @@ -49,7 +49,7 @@ impl MessageSummary> for InstallSna #[derive(Debug)] #[derive(PartialEq, Eq)] #[derive(derive_more::Display)] -#[display(fmt = "{{vote:{}}}", vote)] +#[display("{{vote:{}}}", vote)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct InstallSnapshotResponse { pub vote: Vote, @@ -59,7 +59,7 @@ pub struct InstallSnapshotResponse { #[derive(Debug)] #[derive(PartialEq, Eq)] #[derive(derive_more::Display)] -#[display(fmt = "SnapshotResponse{{vote:{}}}", vote)] +#[display("SnapshotResponse{{vote:{}}}", vote)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct SnapshotResponse { pub vote: Vote, diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index abccb4b88..4aeebd490 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -156,11 +156,8 @@ where C: RaftTypeConfig message_summary.unwrap_or_default() ); - match core_res { - // A normal quit is still an unexpected "stop" to the caller. - Ok(_) => Fatal::Stopped, - Err(e) => e, - } + // Safe unwrap: Infallible is unreachable + core_res.unwrap_err() } /// Wait for `RaftCore` task to finish and record the returned value from the task. diff --git a/rust-toolchain b/rust-toolchain index a69346241..bf9d90f10 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2024-03-20 +nightly-2024-08-14 diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 0880cfc90..f7fe6e151 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -9,6 +9,7 @@ use std::collections::BTreeSet; use std::collections::HashMap; use std::env; use std::fmt; +#[allow(deprecated)] use std::panic::PanicInfo; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -114,6 +115,7 @@ pub fn set_panic_hook() { })); } +#[allow(deprecated)] pub fn log_panic(panic: &PanicInfo) { let backtrace = { #[cfg(feature = "bt")] @@ -203,14 +205,19 @@ impl RPCErrorType { /// Pre-hook result, which does not return remote Error. pub type PreHookResult = Result<(), RPCError>; +#[derive(Debug)] #[derive(derive_more::From, derive_more::TryInto)] -pub enum RPCRequest { +pub enum RPCRequest +where C::SnapshotData: fmt::Debug +{ AppendEntries(AppendEntriesRequest), InstallSnapshot(InstallSnapshotRequest), Vote(VoteRequest), } -impl RPCRequest { +impl RPCRequest +where C::SnapshotData: fmt::Debug +{ pub fn get_type(&self) -> RPCTypes { match self { RPCRequest::AppendEntries(_) => RPCTypes::AppendEntries, @@ -560,6 +567,7 @@ impl TypedRaftRouter { RPCError::Unreachable(e) => e.into(), RPCError::PayloadTooLarge(e) => e.into(), RPCError::Network(e) => e.into(), + #[allow(unreachable_patterns)] RPCError::RemoteError(e) => { unreachable!("unexpected RemoteError: {:?}", e); }