From fb49efb37e209ab0cd9ea7c4bc243f18f4e9a658 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 20 Jun 2024 20:00:18 +0800 Subject: [PATCH] Feature: Add `DecomposeResult` to simplify error handling This commit treats remote errors occurring during RPC, like a `Fatal` error, as an `Unreachable` error. This is due to Openraft's current inability to distinguish between an unreachable node and a broken node. - **Helper trait `DecomposeResult`:** Introduced to simplify handling composite errors. It converts a result of the form `Result` into a nested result `Result, ErrorB>`. --- openraft/src/error.rs | 6 +- openraft/src/error/decompose.rs | 96 +++++++++++++++++++++++++++ openraft/src/error/into_ok.rs | 23 +++++++ openraft/src/error/streaming_error.rs | 36 ++++++---- openraft/src/replication/callbacks.rs | 5 +- openraft/src/replication/mod.rs | 8 ++- openraft/src/replication/request.rs | 3 +- 7 files changed, 153 insertions(+), 24 deletions(-) create mode 100644 openraft/src/error/decompose.rs create mode 100644 openraft/src/error/into_ok.rs diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 628cfe72e..4c56d5ee0 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -1,5 +1,7 @@ //! Error types exposed by this crate. +pub mod decompose; +pub mod into_ok; mod replication_closed; mod streaming_error; @@ -256,7 +258,7 @@ where StorageError(#[from] StorageError), #[error(transparent)] - RPCError(#[from] RPCError>), + RPCError(#[from] RPCError), } /// Error occurs when invoking a remote raft API. @@ -270,7 +272,7 @@ where serde(bound(serialize = "E: serde::Serialize")), serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>")) )] -pub enum RPCError { +pub enum RPCError { #[error(transparent)] Timeout(#[from] Timeout), diff --git a/openraft/src/error/decompose.rs b/openraft/src/error/decompose.rs new file mode 100644 index 000000000..52a4022ae --- /dev/null +++ b/openraft/src/error/decompose.rs @@ -0,0 +1,96 @@ +use std::error::Error; + +use crate::error::into_ok::into_ok; +use crate::error::Fatal; +use crate::error::Infallible; +use crate::error::RPCError; +use crate::error::RaftError; +use crate::error::StreamingError; +use crate::error::Unreachable; +use crate::RaftTypeConfig; + +/// Simplifies error handling by extracting the inner error from a composite error. +/// For example, converting `Result` +/// to `Result, OuterError>`, +/// where `SomeCompositeError` is a composite of `Self::InnerError` and `OuterError`. +pub trait DecomposeResult +where C: RaftTypeConfig +{ + type InnerError; + + fn decompose(self) -> Result, OuterError>; + + /// Convert `Result` + /// to `Result`, + /// if `Self::InnerError` is a infallible type. + fn decompose_infallible(self) -> Result + where + Self::InnerError: Into, + Self: Sized, + { + self.decompose().map(into_ok) + } +} + +impl DecomposeResult> for Result> +where C: RaftTypeConfig +{ + type InnerError = E; + + fn decompose(self) -> Result, RaftError> { + match self { + Ok(r) => Ok(Ok(r)), + Err(e) => match e { + RaftError::APIError(e) => Ok(Err(e)), + RaftError::Fatal(e) => Err(RaftError::Fatal(e)), + }, + } + } +} + +impl DecomposeResult> + for Result>> +where + C: RaftTypeConfig, + E: Error, +{ + type InnerError = E; + + /// `RaftError::Fatal` is considered as `RPCError::Unreachable`. + fn decompose(self) -> Result, RPCError> { + match self { + Ok(r) => Ok(Ok(r)), + Err(e) => match e { + RPCError::Timeout(e) => Err(RPCError::Timeout(e)), + RPCError::Unreachable(e) => Err(RPCError::Unreachable(e)), + RPCError::PayloadTooLarge(e) => Err(RPCError::PayloadTooLarge(e)), + RPCError::Network(e) => Err(RPCError::Network(e)), + RPCError::RemoteError(e) => match e.source { + RaftError::APIError(e) => Ok(Err(e)), + RaftError::Fatal(e) => Err(RPCError::Unreachable(Unreachable::new(&e))), + }, + }, + } + } +} + +impl DecomposeResult> for Result>> +where C: RaftTypeConfig +{ + type InnerError = Infallible; + + /// `Fatal` is considered as `RPCError::Unreachable`. + fn decompose(self) -> Result, StreamingError> { + match self { + Ok(r) => Ok(Ok(r)), + Err(e) => match e { + StreamingError::Closed(e) => Err(StreamingError::Closed(e)), + StreamingError::StorageError(e) => Err(StreamingError::StorageError(e)), + StreamingError::Timeout(e) => Err(StreamingError::Timeout(e)), + StreamingError::Unreachable(e) => Err(StreamingError::Unreachable(e)), + StreamingError::Network(e) => Err(StreamingError::Network(e)), + StreamingError::RemoteError(e) => Err(StreamingError::Unreachable(Unreachable::new(&e.source))), + }, + } + } +} diff --git a/openraft/src/error/into_ok.rs b/openraft/src/error/into_ok.rs new file mode 100644 index 000000000..a023ef541 --- /dev/null +++ b/openraft/src/error/into_ok.rs @@ -0,0 +1,23 @@ +use crate::error::Infallible; + +/// Trait to convert `Result` to `T`, if `E` is a `never` type. +pub(crate) trait UnwrapInfallible { + fn into_ok(self) -> T; +} + +impl UnwrapInfallible for Result +where E: Into +{ + fn into_ok(self) -> T { + match self { + Ok(t) => t, + Err(_) => unreachable!(), + } + } +} + +/// Convert `Result` to `T`, if `E` is a `never` type. +pub(crate) fn into_ok(result: Result) -> T +where E: Into { + UnwrapInfallible::into_ok(result) +} diff --git a/openraft/src/error/streaming_error.rs b/openraft/src/error/streaming_error.rs index bf2584a91..0a91b63e6 100644 --- a/openraft/src/error/streaming_error.rs +++ b/openraft/src/error/streaming_error.rs @@ -1,8 +1,9 @@ use std::error::Error; +use crate::error::Fatal; +use crate::error::Infallible; use crate::error::NetworkError; use crate::error::RPCError; -use crate::error::RaftError; use crate::error::RemoteError; use crate::error::ReplicationClosed; use crate::error::ReplicationError; @@ -21,7 +22,7 @@ use crate::StorageError; serde(bound(serialize = "E: serde::Serialize")), serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>")) )] -pub enum StreamingError { +pub enum StreamingError { /// The replication stream is closed intentionally. #[error(transparent)] Closed(#[from] ReplicationClosed), @@ -47,12 +48,8 @@ pub enum StreamingError { RemoteError(#[from] RemoteError), } -impl From> for ReplicationError -where - E: Error, - RaftError: From, -{ - fn from(e: StreamingError) -> Self { +impl From>> for ReplicationError { + fn from(e: StreamingError>) -> Self { match e { StreamingError::Closed(e) => ReplicationError::Closed(e), StreamingError::StorageError(e) => ReplicationError::StorageError(e), @@ -60,12 +57,23 @@ where StreamingError::Unreachable(e) => ReplicationError::RPCError(RPCError::Unreachable(e)), StreamingError::Network(e) => ReplicationError::RPCError(RPCError::Network(e)), StreamingError::RemoteError(e) => { - let remote_err = RemoteError { - target: e.target, - target_node: e.target_node, - source: RaftError::from(e.source), - }; - ReplicationError::RPCError(RPCError::RemoteError(remote_err)) + // Fatal on remote error is considered as unreachable. + ReplicationError::RPCError(RPCError::Unreachable(Unreachable::new(&e.source))) + } + } + } +} + +impl From> for ReplicationError { + fn from(e: StreamingError) -> Self { + match e { + StreamingError::Closed(e) => ReplicationError::Closed(e), + StreamingError::StorageError(e) => ReplicationError::StorageError(e), + StreamingError::Timeout(e) => ReplicationError::RPCError(RPCError::Timeout(e)), + StreamingError::Unreachable(e) => ReplicationError::RPCError(RPCError::Unreachable(e)), + StreamingError::Network(e) => ReplicationError::RPCError(RPCError::Network(e)), + StreamingError::RemoteError(_e) => { + unreachable!("Infallible error should not be converted to ReplicationError") } } } diff --git a/openraft/src/replication/callbacks.rs b/openraft/src/replication/callbacks.rs index 867c7b331..c2ee789b4 100644 --- a/openraft/src/replication/callbacks.rs +++ b/openraft/src/replication/callbacks.rs @@ -1,7 +1,6 @@ //! Callbacks for ReplicationCore internal communication. use core::fmt; -use crate::error::Fatal; use crate::error::StreamingError; use crate::raft::SnapshotResponse; use crate::type_config::alias::InstantOf; @@ -23,14 +22,14 @@ pub(crate) struct SnapshotCallback { pub(crate) snapshot_meta: SnapshotMeta, /// The result of the snapshot replication. - pub(crate) result: Result, StreamingError>>, + pub(crate) result: Result, StreamingError>, } impl SnapshotCallback { pub(in crate::replication) fn new( start_time: InstantOf, snapshot_meta: SnapshotMeta, - result: Result, StreamingError>>, + result: Result, StreamingError>, ) -> Self { Self { start_time, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index db5007387..4d68508a5 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -28,10 +28,10 @@ use crate::config::Config; use crate::core::notify::Notify; use crate::core::sm::handle::SnapshotReader; use crate::display_ext::DisplayOptionExt; +use crate::error::decompose::DecomposeResult; use crate::error::HigherVote; use crate::error::PayloadTooLarge; use crate::error::RPCError; -use crate::error::RaftError; use crate::error::ReplicationClosed; use crate::error::ReplicationError; use crate::error::Timeout; @@ -449,7 +449,7 @@ where RPCError::Timeout(to) })?; // return Timeout error - let append_resp = append_res?; + let append_resp = DecomposeResult::::decompose_infallible(append_res)?; tracing::debug!( req = display(&sending_range), @@ -496,7 +496,7 @@ where /// Send the error result to RaftCore. /// RaftCore will then submit another replication command. - fn send_progress_error(&mut self, request_id: RequestId, err: RPCError>) { + fn send_progress_error(&mut self, request_id: RequestId, err: RPCError) { let _ = self.tx_raft_core.send(Notify::Network { response: Response::Progress { target: self.target, @@ -779,6 +779,8 @@ where tracing::warn!(error = display(e), "failed to send snapshot"); } + let res = res.decompose_infallible(); + if let Some(tx_noty) = weak_tx.upgrade() { let data = Data::new_snapshot_callback(request_id, start_time, meta, res); let send_res = tx_noty.send(Replicate::new_data(data)); diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index f2980629a..975fb5793 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -52,7 +52,6 @@ where C: RaftTypeConfig } use crate::display_ext::DisplayOptionExt; -use crate::error::Fatal; use crate::error::StreamingError; use crate::log_id_range::LogIdRange; use crate::raft::SnapshotResponse; @@ -151,7 +150,7 @@ where C: RaftTypeConfig request_id: RequestId, start_time: InstantOf, snapshot_meta: SnapshotMeta, - result: Result, StreamingError>>, + result: Result, StreamingError>, ) -> Self { Self::SnapshotCallback(DataWithId::new( request_id,