Skip to content

Commit

Permalink
Merge pull request #230 from cognitedata/backoff-log-source
Browse files Browse the repository at this point in the history
feat: Minor improvements to error visibility
  • Loading branch information
kodiakhq[bot] authored Feb 6, 2024
2 parents 2c7bd88 + badfe9d commit 1b1c220
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
8 changes: 1 addition & 7 deletions src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ impl Default for BackoffConfig {

type SourceError = Box<dyn std::error::Error + Send + Sync>;

// TODO: Currently, retrying can't fail, but there should be a global maximum timeout that
// causes an error if the total time retrying exceeds that amount.
// See https://github.com/influxdata/rskafka/issues/65
#[derive(Debug, thiserror::Error)]
#[allow(missing_copy_implementations)]
pub enum BackoffError {
#[error("Retry exceeded deadline")]
#[error("Retry exceeded deadline. Source: {source}")]
DeadlineExceded {
deadline: Duration,
source: SourceError,
Expand Down Expand Up @@ -103,9 +100,6 @@ impl Backoff {
}

/// Perform an async operation that retries with a backoff
// TODO: Currently, this can't fail, but there should be a global maximum timeout that
// causes an error if the total time retrying exceeds that amount.
// See https://github.com/influxdata/rskafka/issues/65
pub async fn retry_with_backoff<F, F1, B, E>(
&mut self,
request_name: &str,
Expand Down
28 changes: 24 additions & 4 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use rand::prelude::*;
use std::fmt::Display;
use std::ops::ControlFlow;
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -53,6 +54,26 @@ pub enum Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Error)]
pub struct MultiError(Vec<Box<dyn std::error::Error + Send + Sync>>);

impl Display for MultiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut needs_comma = false;
if self.0.len() > 1 {
write!(f, "Multiple errors occured: ")?;
}
for err in &self.0 {
if needs_comma {
write!(f, ", ")?;
}
needs_comma = true;
write!(f, "{err}")?;
}
Ok(())
}
}

/// How to connect to a `Transport`
#[async_trait]
trait ConnectionHandler {
Expand Down Expand Up @@ -470,6 +491,7 @@ where
let mut backoff = Backoff::new(backoff_config);
backoff
.retry_with_backoff("broker_connect", || async {
let mut errors = Vec::<Box<dyn std::error::Error + Send + Sync>>::new();
for broker in &brokers {
let conn = broker
.connect(
Expand All @@ -485,16 +507,14 @@ where
Ok(transport) => transport,
Err(e) => {
warn!(%e, "Failed to connect to broker");
errors.push(Box::new(e));
continue;
}
};

return ControlFlow::Break(connection);
}

let err = Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to connect to any broker, backing off".to_string(),
);
let err = Box::<dyn std::error::Error + Send + Sync>::from(MultiError(errors));
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
ControlFlow::Continue(ErrorOrThrottle::Error(err))
})
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub mod client;

mod connection;

pub use connection::Error as ConnectionError;

#[cfg(feature = "unstable-fuzzing")]
pub mod messenger;
#[cfg(not(feature = "unstable-fuzzing"))]
Expand Down
10 changes: 9 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,15 @@ async fn test_client_backoff_terminates() {

match client_builder.build().await {
Err(rskafka::client::error::Error::Connection(e)) => {
assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline");
// Error can be slightly different depending on the exact underlying error.
assert!(
e.to_string().starts_with(concat!(
"all retries failed: Retry exceeded deadline. ",
"Source: error connecting to broker \"localhost:9000\""
)),
"expected error to start with \"all retries failed...\", actual: {}",
e
);
}
_ => {
unreachable!();
Expand Down

0 comments on commit 1b1c220

Please sign in to comment.