From 9dc6b05c070579abc3f474dc01a3e57c74f4f599 Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Tue, 23 Jan 2024 09:07:40 +0100 Subject: [PATCH 1/3] feat: Log original error when backing off timed out This commit also removes a few unhelpful comments that are no longer correct. --- src/backoff.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/backoff.rs b/src/backoff.rs index 2595620..4cbe42e 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -28,13 +28,10 @@ impl Default for BackoffConfig { type SourceError = Box; -// TODO: Currently, retrying can't fail, but there should be a global maximum timeout that -// causes an error if the total time retrying exceeds that amount. -// See https://github.com/influxdata/rskafka/issues/65 #[derive(Debug, thiserror::Error)] #[allow(missing_copy_implementations)] pub enum BackoffError { - #[error("Retry exceeded deadline")] + #[error("Retry exceeded deadline. Source: {source}")] DeadlineExceded { deadline: Duration, source: SourceError, @@ -103,9 +100,6 @@ impl Backoff { } /// Perform an async operation that retries with a backoff - // TODO: Currently, this can't fail, but there should be a global maximum timeout that - // causes an error if the total time retrying exceeds that amount. - // See https://github.com/influxdata/rskafka/issues/65 pub async fn retry_with_backoff( &mut self, request_name: &str, From bd45435ccd643696c5e60b4ebbd1a711d311c56b Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Tue, 23 Jan 2024 09:27:12 +0100 Subject: [PATCH 2/3] feat: Expose connection::Error as ConnectionError --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 01739db..e8522cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,8 @@ pub mod client; mod connection; +pub use connection::Error as ConnectionError; + #[cfg(feature = "unstable-fuzzing")] pub mod messenger; #[cfg(not(feature = "unstable-fuzzing"))] From badfe9d56c21d2f938924fe4238c728cfb19f076 Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Tue, 23 Jan 2024 09:49:47 +0100 Subject: [PATCH 3/3] feat: Capture errors from connecting to broker --- src/connection.rs | 28 ++++++++++++++++++++++++---- tests/client.rs | 10 +++++++++- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 62b5df9..beba5cf 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use rand::prelude::*; +use std::fmt::Display; use std::ops::ControlFlow; use std::sync::Arc; use thiserror::Error; @@ -53,6 +54,26 @@ pub enum Error { pub type Result = std::result::Result; +#[derive(Debug, Error)] +pub struct MultiError(Vec>); + +impl Display for MultiError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut needs_comma = false; + if self.0.len() > 1 { + write!(f, "Multiple errors occured: ")?; + } + for err in &self.0 { + if needs_comma { + write!(f, ", ")?; + } + needs_comma = true; + write!(f, "{err}")?; + } + Ok(()) + } +} + /// How to connect to a `Transport` #[async_trait] trait ConnectionHandler { @@ -470,6 +491,7 @@ where let mut backoff = Backoff::new(backoff_config); backoff .retry_with_backoff("broker_connect", || async { + let mut errors = Vec::>::new(); for broker in &brokers { let conn = broker .connect( @@ -485,16 +507,14 @@ where Ok(transport) => transport, Err(e) => { warn!(%e, "Failed to connect to broker"); + errors.push(Box::new(e)); continue; } }; return ControlFlow::Break(connection); } - - let err = Box::::from( - "Failed to connect to any broker, backing off".to_string(), - ); + let err = Box::::from(MultiError(errors)); let err: Arc = err.into(); ControlFlow::Continue(ErrorOrThrottle::Error(err)) }) diff --git a/tests/client.rs b/tests/client.rs index 4259d62..7ec8486 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -721,7 +721,15 @@ async fn test_client_backoff_terminates() { match client_builder.build().await { Err(rskafka::client::error::Error::Connection(e)) => { - assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline"); + // Error can be slightly different depending on the exact underlying error. + assert!( + e.to_string().starts_with(concat!( + "all retries failed: Retry exceeded deadline. ", + "Source: error connecting to broker \"localhost:9000\"" + )), + "expected error to start with \"all retries failed...\", actual: {}", + e + ); } _ => { unreachable!();