Skip to content

Commit

Permalink
Capture errors from connecting to broker
Browse files Browse the repository at this point in the history
  • Loading branch information
einarmo committed Jan 23, 2024
1 parent 1eba7e3 commit 76a22c7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
15 changes: 12 additions & 3 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ where
let mut backoff = Backoff::new(backoff_config);
backoff
.retry_with_backoff("broker_connect", || async {
let mut errors = Vec::new();
for broker in &brokers {
let conn = broker
.connect(
Expand All @@ -485,16 +486,24 @@ where
Ok(transport) => transport,
Err(e) => {
warn!(%e, "Failed to connect to broker");
errors.push(e);
continue;
}
};

return ControlFlow::Break(connection);
}

let err = Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to connect to any broker, backing off".to_string(),
);
// errors should always contain at least 1 element here.
let errors_string = errors
.into_iter()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(", ");

let err = Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Failed to connect to any broker, backing off. Errors: {errors_string}"
));
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
ControlFlow::Continue(ErrorOrThrottle::Error(err))
})
Expand Down
7 changes: 6 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,12 @@ async fn test_client_backoff_terminates() {

match client_builder.build().await {
Err(rskafka::client::error::Error::Connection(e)) => {
assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline");
// Error can be slightly different depending on the exact underlying error.
assert!(e.to_string().starts_with(concat!(
"all retries failed: Retry exceeded deadline. ",
"Source: Failed to connect to any broker, backing off. ",
"Errors: error connecting to broker \"localhost:9000\""
)));
}
_ => {
unreachable!();
Expand Down

0 comments on commit 76a22c7

Please sign in to comment.