Skip to content

Commit

Permalink
Feature: Add DecomposeResult to simplify error handling
Browse files Browse the repository at this point in the history
This commit treats remote errors occurring during RPC, like a `Fatal`
error, as an `Unreachable` error. This is due to Openraft's current
inability to distinguish between an unreachable node and a broken node.

- **Helper trait `DecomposeResult`:** Introduced to simplify handling
  composite errors. It converts a result of the
  form `Result<R, ErrorAOrB>`
  into a nested result `Result<Result<R, ErrorA>, ErrorB>`.
  • Loading branch information
drmingdrmer committed Jun 21, 2024
1 parent afb1897 commit fb49efb
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 24 deletions.
6 changes: 4 additions & 2 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Error types exposed by this crate.
pub mod decompose;
pub mod into_ok;
mod replication_closed;
mod streaming_error;

Expand Down Expand Up @@ -256,7 +258,7 @@ where
StorageError(#[from] StorageError<NID>),

#[error(transparent)]
RPCError(#[from] RPCError<NID, N, RaftError<NID, Infallible>>),
RPCError(#[from] RPCError<NID, N>),
}

/// Error occurs when invoking a remote raft API.
Expand All @@ -270,7 +272,7 @@ where
serde(bound(serialize = "E: serde::Serialize")),
serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>"))
)]
pub enum RPCError<NID: NodeId, N: Node, E: Error> {
pub enum RPCError<NID: NodeId, N: Node, E: Error = Infallible> {
#[error(transparent)]
Timeout(#[from] Timeout<NID>),

Expand Down
96 changes: 96 additions & 0 deletions openraft/src/error/decompose.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::error::Error;

use crate::error::into_ok::into_ok;
use crate::error::Fatal;
use crate::error::Infallible;
use crate::error::RPCError;
use crate::error::RaftError;
use crate::error::StreamingError;
use crate::error::Unreachable;
use crate::RaftTypeConfig;

/// Simplifies error handling by extracting the inner error from a composite error.
/// For example, converting `Result<R, CompositeError>`
/// to `Result<Result<R, Self::InnerError>, OuterError>`,
/// where `SomeCompositeError` is a composite of `Self::InnerError` and `OuterError`.
pub trait DecomposeResult<C, R, OuterError>
where C: RaftTypeConfig
{
type InnerError;

fn decompose(self) -> Result<Result<R, Self::InnerError>, OuterError>;

/// Convert `Result<R, CompositeErr>`
/// to `Result<R, E>`,
/// if `Self::InnerError` is a infallible type.
fn decompose_infallible(self) -> Result<R, OuterError>
where
Self::InnerError: Into<Infallible>,
Self: Sized,
{
self.decompose().map(into_ok)
}
}

impl<C, R, E> DecomposeResult<C, R, RaftError<C::NodeId>> for Result<R, RaftError<C::NodeId, E>>
where C: RaftTypeConfig
{
type InnerError = E;

fn decompose(self) -> Result<Result<R, E>, RaftError<C::NodeId>> {
match self {
Ok(r) => Ok(Ok(r)),
Err(e) => match e {
RaftError::APIError(e) => Ok(Err(e)),
RaftError::Fatal(e) => Err(RaftError::Fatal(e)),
},
}
}
}

impl<C, R, E> DecomposeResult<C, R, RPCError<C::NodeId, C::Node>>
for Result<R, RPCError<C::NodeId, C::Node, RaftError<C::NodeId, E>>>
where
C: RaftTypeConfig,
E: Error,
{
type InnerError = E;

/// `RaftError::Fatal` is considered as `RPCError::Unreachable`.
fn decompose(self) -> Result<Result<R, E>, RPCError<C::NodeId, C::Node>> {
match self {
Ok(r) => Ok(Ok(r)),
Err(e) => match e {
RPCError::Timeout(e) => Err(RPCError::Timeout(e)),
RPCError::Unreachable(e) => Err(RPCError::Unreachable(e)),
RPCError::PayloadTooLarge(e) => Err(RPCError::PayloadTooLarge(e)),
RPCError::Network(e) => Err(RPCError::Network(e)),
RPCError::RemoteError(e) => match e.source {
RaftError::APIError(e) => Ok(Err(e)),
RaftError::Fatal(e) => Err(RPCError::Unreachable(Unreachable::new(&e))),
},
},
}
}
}

impl<C, R> DecomposeResult<C, R, StreamingError<C>> for Result<R, StreamingError<C, Fatal<C::NodeId>>>
where C: RaftTypeConfig
{
type InnerError = Infallible;

/// `Fatal` is considered as `RPCError::Unreachable`.
fn decompose(self) -> Result<Result<R, Self::InnerError>, StreamingError<C>> {
match self {
Ok(r) => Ok(Ok(r)),
Err(e) => match e {
StreamingError::Closed(e) => Err(StreamingError::Closed(e)),
StreamingError::StorageError(e) => Err(StreamingError::StorageError(e)),
StreamingError::Timeout(e) => Err(StreamingError::Timeout(e)),
StreamingError::Unreachable(e) => Err(StreamingError::Unreachable(e)),
StreamingError::Network(e) => Err(StreamingError::Network(e)),
StreamingError::RemoteError(e) => Err(StreamingError::Unreachable(Unreachable::new(&e.source))),
},
}
}
}
23 changes: 23 additions & 0 deletions openraft/src/error/into_ok.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use crate::error::Infallible;

/// Trait to convert `Result<T, E>` to `T`, if `E` is a `never` type.
pub(crate) trait UnwrapInfallible<T> {
fn into_ok(self) -> T;
}

impl<T, E> UnwrapInfallible<T> for Result<T, E>
where E: Into<Infallible>
{
fn into_ok(self) -> T {
match self {
Ok(t) => t,
Err(_) => unreachable!(),
}
}
}

/// Convert `Result<T, E>` to `T`, if `E` is a `never` type.
pub(crate) fn into_ok<T, E>(result: Result<T, E>) -> T
where E: Into<Infallible> {
UnwrapInfallible::into_ok(result)
}
36 changes: 22 additions & 14 deletions openraft/src/error/streaming_error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::error::Error;

use crate::error::Fatal;
use crate::error::Infallible;
use crate::error::NetworkError;
use crate::error::RPCError;
use crate::error::RaftError;
use crate::error::RemoteError;
use crate::error::ReplicationClosed;
use crate::error::ReplicationError;
Expand All @@ -21,7 +22,7 @@ use crate::StorageError;
serde(bound(serialize = "E: serde::Serialize")),
serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>"))
)]
pub enum StreamingError<C: RaftTypeConfig, E: Error> {
pub enum StreamingError<C: RaftTypeConfig, E: Error = Infallible> {
/// The replication stream is closed intentionally.
#[error(transparent)]
Closed(#[from] ReplicationClosed),
Expand All @@ -47,25 +48,32 @@ pub enum StreamingError<C: RaftTypeConfig, E: Error> {
RemoteError(#[from] RemoteError<C::NodeId, C::Node, E>),
}

impl<C: RaftTypeConfig, E> From<StreamingError<C, E>> for ReplicationError<C::NodeId, C::Node>
where
E: Error,
RaftError<C::NodeId>: From<E>,
{
fn from(e: StreamingError<C, E>) -> Self {
impl<C: RaftTypeConfig> From<StreamingError<C, Fatal<C::NodeId>>> for ReplicationError<C::NodeId, C::Node> {
fn from(e: StreamingError<C, Fatal<C::NodeId>>) -> Self {
match e {
StreamingError::Closed(e) => ReplicationError::Closed(e),
StreamingError::StorageError(e) => ReplicationError::StorageError(e),
StreamingError::Timeout(e) => ReplicationError::RPCError(RPCError::Timeout(e)),
StreamingError::Unreachable(e) => ReplicationError::RPCError(RPCError::Unreachable(e)),
StreamingError::Network(e) => ReplicationError::RPCError(RPCError::Network(e)),
StreamingError::RemoteError(e) => {
let remote_err = RemoteError {
target: e.target,
target_node: e.target_node,
source: RaftError::from(e.source),
};
ReplicationError::RPCError(RPCError::RemoteError(remote_err))
// Fatal on remote error is considered as unreachable.
ReplicationError::RPCError(RPCError::Unreachable(Unreachable::new(&e.source)))
}
}
}
}

impl<C: RaftTypeConfig> From<StreamingError<C>> for ReplicationError<C::NodeId, C::Node> {
fn from(e: StreamingError<C>) -> Self {
match e {
StreamingError::Closed(e) => ReplicationError::Closed(e),
StreamingError::StorageError(e) => ReplicationError::StorageError(e),
StreamingError::Timeout(e) => ReplicationError::RPCError(RPCError::Timeout(e)),
StreamingError::Unreachable(e) => ReplicationError::RPCError(RPCError::Unreachable(e)),
StreamingError::Network(e) => ReplicationError::RPCError(RPCError::Network(e)),
StreamingError::RemoteError(_e) => {
unreachable!("Infallible error should not be converted to ReplicationError")
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/replication/callbacks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Callbacks for ReplicationCore internal communication.
use core::fmt;

use crate::error::Fatal;
use crate::error::StreamingError;
use crate::raft::SnapshotResponse;
use crate::type_config::alias::InstantOf;
Expand All @@ -23,14 +22,14 @@ pub(crate) struct SnapshotCallback<C: RaftTypeConfig> {
pub(crate) snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,

/// The result of the snapshot replication.
pub(crate) result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
pub(crate) result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
}

impl<C: RaftTypeConfig> SnapshotCallback<C> {
pub(in crate::replication) fn new(
start_time: InstantOf<C>,
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
) -> Self {
Self {
start_time,
Expand Down
8 changes: 5 additions & 3 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use crate::config::Config;
use crate::core::notify::Notify;
use crate::core::sm::handle::SnapshotReader;
use crate::display_ext::DisplayOptionExt;
use crate::error::decompose::DecomposeResult;
use crate::error::HigherVote;
use crate::error::PayloadTooLarge;
use crate::error::RPCError;
use crate::error::RaftError;
use crate::error::ReplicationClosed;
use crate::error::ReplicationError;
use crate::error::Timeout;
Expand Down Expand Up @@ -449,7 +449,7 @@ where
RPCError::Timeout(to)
})?; // return Timeout error

let append_resp = append_res?;
let append_resp = DecomposeResult::<C, _, _>::decompose_infallible(append_res)?;

tracing::debug!(
req = display(&sending_range),
Expand Down Expand Up @@ -496,7 +496,7 @@ where

/// Send the error result to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>) {
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C::NodeId, C::Node>) {
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::Progress {
target: self.target,
Expand Down Expand Up @@ -779,6 +779,8 @@ where
tracing::warn!(error = display(e), "failed to send snapshot");
}

let res = res.decompose_infallible();

if let Some(tx_noty) = weak_tx.upgrade() {
let data = Data::new_snapshot_callback(request_id, start_time, meta, res);
let send_res = tx_noty.send(Replicate::new_data(data));
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/replication/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ where C: RaftTypeConfig
}

use crate::display_ext::DisplayOptionExt;
use crate::error::Fatal;
use crate::error::StreamingError;
use crate::log_id_range::LogIdRange;
use crate::raft::SnapshotResponse;
Expand Down Expand Up @@ -151,7 +150,7 @@ where C: RaftTypeConfig
request_id: RequestId,
start_time: InstantOf<C>,
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
) -> Self {
Self::SnapshotCallback(DataWithId::new(
request_id,
Expand Down

0 comments on commit fb49efb

Please sign in to comment.