From d121e81943d4f695cd36691878069abe412aa99d Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Tue, 23 Jan 2024 09:49:47 +0100 Subject: [PATCH] feat: Capture errors from connecting to broker --- src/connection.rs | 28 ++++++++++++++++++++++++---- tests/client.rs | 10 +++++++++- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 62b5df9..beba5cf 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use rand::prelude::*; +use std::fmt::Display; use std::ops::ControlFlow; use std::sync::Arc; use thiserror::Error; @@ -53,6 +54,26 @@ pub enum Error { pub type Result = std::result::Result; +#[derive(Debug, Error)] +pub struct MultiError(Vec>); + +impl Display for MultiError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut needs_comma = false; + if self.0.len() > 1 { + write!(f, "Multiple errors occured: ")?; + } + for err in &self.0 { + if needs_comma { + write!(f, ", ")?; + } + needs_comma = true; + write!(f, "{err}")?; + } + Ok(()) + } +} + /// How to connect to a `Transport` #[async_trait] trait ConnectionHandler { @@ -470,6 +491,7 @@ where let mut backoff = Backoff::new(backoff_config); backoff .retry_with_backoff("broker_connect", || async { + let mut errors = Vec::>::new(); for broker in &brokers { let conn = broker .connect( @@ -485,16 +507,14 @@ where Ok(transport) => transport, Err(e) => { warn!(%e, "Failed to connect to broker"); + errors.push(Box::new(e)); continue; } }; return ControlFlow::Break(connection); } - - let err = Box::::from( - "Failed to connect to any broker, backing off".to_string(), - ); + let err = Box::::from(MultiError(errors)); let err: Arc = err.into(); ControlFlow::Continue(ErrorOrThrottle::Error(err)) }) diff --git a/tests/client.rs b/tests/client.rs index 4259d62..955babb 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -721,7 +721,15 @@ async fn test_client_backoff_terminates() { match client_builder.build().await { Err(rskafka::client::error::Error::Connection(e)) => { - assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline"); + // Error can be slightly different depending on the exact underlying error. + assert!( + e.to_string().starts_with(concat!( + "all retries failed: Retry exceeded deadline. ", + "Source: error connecting to broker \"localhost:9000\"" + )), + "expected error to start with \"all retries failed...\", actual: {}", + e.to_string() + ); } _ => { unreachable!();