Skip to content

Commit

Permalink
Merge pull request #951 from drmingdrmer/32-handle-failed-append
Browse files Browse the repository at this point in the history
Feature: add `PayloadTooLarge` error
  • Loading branch information
drmingdrmer authored Nov 26, 2023
2 parents 9c04cb0 + 961469c commit f4d047f
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 47 deletions.
160 changes: 160 additions & 0 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::collections::BTreeSet;
use std::error::Error;
use std::fmt;
use std::fmt::Debug;
use std::time::Duration;

Expand Down Expand Up @@ -256,6 +257,10 @@ pub enum RPCError<NID: NodeId, N: Node, E: Error> {
#[error(transparent)]
Unreachable(#[from] Unreachable),

/// The RPC payload is too large and should be split into smaller chunks.
#[error(transparent)]
PayloadTooLarge(#[from] PayloadTooLarge),

/// Failed to send the RPC request and should retry immediately.
#[error(transparent)]
Network(#[from] NetworkError),
Expand All @@ -276,6 +281,7 @@ where
match self {
RPCError::Timeout(_) => None,
RPCError::Unreachable(_) => None,
RPCError::PayloadTooLarge(_) => None,
RPCError::Network(_) => None,
RPCError::RemoteError(remote_err) => remote_err.source.forward_to_leader(),
}
Expand Down Expand Up @@ -359,6 +365,136 @@ impl Unreachable {
}
}

/// Error indicating that an RPC is too large and cannot be sent.
///
/// This is a retryable error:
/// A [`RaftNetwork`] implementation returns this error to inform Openraft to divide an
/// [`AppendEntriesRequest`] into smaller chunks.
/// Openraft will immediately retry sending in smaller chunks.
/// If the request cannot be divided(contains only one entry), Openraft interprets it as
/// [`Unreachable`].
///
/// A hint can be provided to help Openraft in splitting the request.
///
/// The application should also set an appropriate value for [`Config::max_payload_entries`] to
/// avoid returning this error if possible.
///
/// Example:
///
/// ```ignore
/// impl<C: RaftTypeConfig> RaftNetwork<C> for MyNetwork {
/// fn append_entries(&self,
/// rpc: AppendEntriesRequest<C>,
/// option: RPCOption
/// ) -> Result<_, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
/// if rpc.entries.len() > 10 {
/// return Err(PayloadTooLarge::new_entries_hint(10).into());
/// }
/// // ...
/// }
/// }
/// ```
///
/// [`RaftNetwork`]: crate::network::RaftNetwork
/// [`AppendEntriesRequest`]: crate::raft::AppendEntriesRequest
/// [`Config::max_payload_entries`]: crate::config::Config::max_payload_entries
///
/// [`InstallSnapshotRequest`]: crate::raft::InstallSnapshotRequest
/// [`Config::snapshot_max_chunk_size`]: crate::config::Config::snapshot_max_chunk_size
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct PayloadTooLarge {
action: RPCTypes,

/// An optional hint indicating the anticipated number of entries.
/// Used only for append-entries replication.
entries_hint: u64,

/// An optional hint indicating the anticipated size in bytes.
/// Used for snapshot replication.
bytes_hint: u64,

#[source]
source: Option<AnyError>,
}

impl fmt::Display for PayloadTooLarge {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RPC",)?;
write!(f, "({})", self.action)?;
write!(f, " payload too large:",)?;

write!(f, " hint:(")?;
match self.action {
RPCTypes::Vote => {
unreachable!("vote rpc should not have payload")
}
RPCTypes::AppendEntries => {
write!(f, "entries:{}", self.entries_hint)?;
}
RPCTypes::InstallSnapshot => {
write!(f, "bytes:{}", self.bytes_hint)?;
}
}
write!(f, ")")?;

if let Some(s) = &self.source {
write!(f, ", source: {}", s)?;
}

Ok(())
}
}

impl PayloadTooLarge {
/// Create a new PayloadTooLarge, with entries hint, without the causing error.
pub fn new_entries_hint(entries_hint: u64) -> Self {
debug_assert!(entries_hint > 0, "entries_hint should be greater than 0");

Self {
action: RPCTypes::AppendEntries,
entries_hint,
bytes_hint: u64::MAX,
source: None,
}
}

// No used yet.
/// Create a new PayloadTooLarge, with bytes hint, without the causing error.
#[allow(dead_code)]
pub(crate) fn new_bytes_hint(bytes_hint: u64) -> Self {
debug_assert!(bytes_hint > 0, "bytes_hint should be greater than 0");

Self {
action: RPCTypes::InstallSnapshot,
entries_hint: u64::MAX,
bytes_hint,
source: None,
}
}

/// Set the source error that causes this PayloadTooLarge error.
pub fn with_source_error(mut self, e: &(impl Error + 'static)) -> Self {
self.source = Some(AnyError::new(e));
self
}

pub fn action(&self) -> RPCTypes {
self.action
}

/// Get the hint for entries number.
pub fn entries_hint(&self) -> u64 {
self.entries_hint
}

// No used yet.
#[allow(dead_code)]
pub(crate) fn bytes_hint(&self) -> u64 {
self.bytes_hint
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
Expand Down Expand Up @@ -532,3 +668,27 @@ impl<NID: NodeId> From<Result<(), RejectAppendEntries<NID>>> for AppendEntriesRe
}
}
}

#[cfg(test)]
mod tests {
use anyerror::AnyError;

use crate::error::PayloadTooLarge;

#[test]
fn test_append_too_large() -> anyhow::Result<()> {
let a = PayloadTooLarge::new_entries_hint(5);
assert_eq!("RPC(AppendEntries) payload too large: hint:(entries:5)", a.to_string());

let a = PayloadTooLarge::new_bytes_hint(5);
assert_eq!("RPC(InstallSnapshot) payload too large: hint:(bytes:5)", a.to_string());

let a = PayloadTooLarge::new_entries_hint(5).with_source_error(&AnyError::error("test"));
assert_eq!(
"RPC(AppendEntries) payload too large: hint:(entries:5), source: test",
a.to_string()
);

Ok(())
}
}
6 changes: 6 additions & 0 deletions openraft/src/log_id_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt::Formatter;
use crate::less_equal;
use crate::validate::Validate;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::NodeId;

Expand Down Expand Up @@ -43,6 +44,11 @@ impl<NID: NodeId> LogIdRange<NID> {
last_log_id: last,
}
}

#[allow(dead_code)]
pub(crate) fn len(&self) -> u64 {
self.last_log_id.next_index() - self.prev_log_id.next_index()
}
}

#[cfg(test)]
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/network/rpc_type.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy)]
#[derive(PartialEq, Eq)]
#[derive(Hash)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum RPCTypes {
Vote,
Expand Down
26 changes: 26 additions & 0 deletions openraft/src/replication/hint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! Defines config hint for replication RPC
/// Temporary config hint for replication
#[derive(Clone, Debug, Default)]
pub(crate) struct ReplicationHint {
n: u64,

/// How many times this hint can be used.
ttl: u64,
}

impl ReplicationHint {
/// Create a new `ReplicationHint`
pub(crate) fn new(n: u64, ttl: u64) -> Self {
Self { n, ttl }
}

pub(crate) fn get(&mut self) -> Option<u64> {
if self.ttl > 0 {
self.ttl -= 1;
Some(self.n)
} else {
None
}
}
}
Loading

0 comments on commit f4d047f

Please sign in to comment.