Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

feat: Track response in error details #29

Merged
merged 8 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ metrics = "0.22.0"
metrics-exporter-prometheus = "0.14.0"
rand = "0.8.5"
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
reqwest = { version = "0.12.3" }
reqwest = { version = "0.12.3", features = ["stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
Expand Down
111 changes: 106 additions & 5 deletions hook-worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,125 @@
use std::fmt;
use std::time;

use hook_common::pgqueue;
use hook_common::{pgqueue, webhook::WebhookJobError};
use thiserror::Error;

/// Enumeration of errors related to webhook job processing in the WebhookWorker.
/// Enumeration of error classes handled by `WebhookWorker`.
#[derive(Error, Debug)]
pub enum WebhookError {
#[error(transparent)]
Parse(#[from] WebhookParseError),
#[error(transparent)]
Request(#[from] WebhookRequestError),
}

/// Enumeration of parsing errors that can occur as `WebhookWorker` sets up a webhook.
#[derive(Error, Debug)]
pub enum WebhookParseError {
#[error("{0} is not a valid HttpMethod")]
ParseHttpMethodError(String),
#[error("error parsing webhook headers")]
ParseHeadersError(http::Error),
#[error("error parsing webhook url")]
ParseUrlError(url::ParseError),
#[error("a webhook could not be delivered but it could be retried later: {error}")]
}

/// Enumeration of request errors that can occur as `WebhookWorker` sends a request.
#[derive(Error, Debug)]
pub enum WebhookRequestError {
RetryableRequestError {
error: reqwest::Error,
response: Option<String>,
retry_after: Option<time::Duration>,
},
#[error("a webhook could not be delivered and it cannot be retried further: {0}")]
NonRetryableRetryableRequestError(reqwest::Error),
NonRetryableRetryableRequestError {
error: reqwest::Error,
response: Option<String>,
},
}

/// Enumeration of errors that can occur while handling a `reqwest::Response`.
/// Currently, not consumed anywhere. Grouped here to support a common error type for
/// `utils::first_n_bytes_of_response`.
#[derive(Error, Debug)]
pub enum WebhookResponseError {
#[error("failed to parse a response as UTF8")]
ParseUTF8StringError(#[from] std::str::Utf8Error),
#[error("error while iterating over response body chunks")]
StreamIterationError(#[from] reqwest::Error),
#[error("attempted to slice a chunk of length {0} with an out of bounds index of {1}")]
ChunkOutOfBoundsError(usize, usize),
}
Comment on lines +44 to +52
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not handling these at the moment. hook-worker just defaults to an empty response on error.


/// Implement display of `WebhookRequestError` by appending to the underlying `reqwest::Error`
/// any response message if available.
impl fmt::Display for WebhookRequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WebhookRequestError::RetryableRequestError {
error, response, ..
}
| WebhookRequestError::NonRetryableRetryableRequestError { error, response } => {
let response_message = match response {
Some(m) => m.to_string(),
None => "No response from the server".to_string(),
};
writeln!(f, "{}", error)?;
write!(f, "{}", response_message)?;

Ok(())
}
}
}
}

/// Implementation of `WebhookRequestError` designed to further describe the error.
/// In particular, we pass some calls to underyling `reqwest::Error` to provide more details.
impl WebhookRequestError {
pub fn is_timeout(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_timeout()
}
}
}

pub fn is_status(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_status()
}
}
}

pub fn status(&self) -> Option<http::StatusCode> {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.status()
}
}
}
}

impl From<&WebhookRequestError> for WebhookJobError {
fn from(error: &WebhookRequestError) -> Self {
if error.is_timeout() {
WebhookJobError::new_timeout(&error.to_string())
} else if error.is_status() {
WebhookJobError::new_http_status(
error.status().expect("status code is defined").into(),
&error.to_string(),
)
} else {
// Catch all other errors as `app_metrics::ErrorType::Connection` errors.
// Not all of `reqwest::Error` may strictly be connection errors, so our supported error types may need an extension
// depending on how strict error reporting has to be.
WebhookJobError::new_connection(&error.to_string())
}
}
}

/// Enumeration of errors related to initialization and consumption of webhook jobs.
Expand Down
1 change: 1 addition & 0 deletions hook-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod config;
pub mod error;
pub mod util;
pub mod worker;
35 changes: 35 additions & 0 deletions hook-worker/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::error::WebhookResponseError;
use futures::StreamExt;
use reqwest::Response;

pub async fn first_n_bytes_of_response(
response: Response,
n: usize,
) -> Result<String, WebhookResponseError> {
let mut body = response.bytes_stream();
let mut buffer = String::with_capacity(n);

while let Some(chunk) = body.next().await {
if buffer.len() >= n {
break;
}

let chunk = chunk?;
let chunk_str = std::str::from_utf8(&chunk)?;
let upper_bound = std::cmp::min(n - buffer.len(), chunk_str.len());

if let Some(partial_chunk_str) = chunk_str.get(0..upper_bound) {
buffer.push_str(partial_chunk_str);
} else {
// For whatever reason we are out of bounds. We should never land here
// given the `std::cmp::min` usage, but I am being extra careful by not
// using a slice index that would panic instead.
return Err(WebhookResponseError::ChunkOutOfBoundsError(
chunk_str.len(),
upper_bound,
));
}
}

Ok(buffer)
}
Loading
Loading