Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

feat: Use new WebhookJobError struct for error reporting #13

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 78 additions & 78 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ http-body-util = "0.1.0"
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
rdkafka = { version = "0.35.0", features = ["cmake-build", "ssl"] }
reqwest = { version = "0.11" }
regex = "1.10.2"
serde = { version = "1.0" }
serde_derive = { version = "1.0" }
Expand Down
1 change: 1 addition & 0 deletions hook-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ chrono = { workspace = true }
http = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
reqwest = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
Expand Down
8 changes: 5 additions & 3 deletions hook-common/src/kafka_messages/app_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ pub enum AppMetricCategory {
ComposeWebhook,
}

#[derive(Serialize)]
#[derive(Serialize, Debug)]
pub enum ErrorType {
Timeout,
Connection,
HttpStatus(u16),
Parse,
}

#[derive(Serialize)]
#[derive(Serialize, Debug)]
pub struct ErrorDetails {
pub error: Error,
// TODO: The plugin-server sends the entire raw event with errors. In order to do this, we'll
Expand All @@ -30,7 +31,7 @@ pub struct ErrorDetails {
// event: Value,
}

#[derive(Serialize)]
#[derive(Serialize, Debug)]
pub struct Error {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -84,6 +85,7 @@ where
ErrorType::Connection => "Connection Error".to_owned(),
ErrorType::Timeout => "Timeout".to_owned(),
ErrorType::HttpStatus(s) => format!("HTTP Status: {}", s),
ErrorType::Parse => "Parse Error".to_owned(),
};
serializer.serialize_str(&error_type)
}
Expand Down
4 changes: 2 additions & 2 deletions hook-common/src/kafka_messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use chrono::{DateTime, Utc};
use serde::Serializer;
use uuid::Uuid;

fn serialize_uuid<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
pub fn serialize_uuid<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&uuid.to_string())
}

fn serialize_datetime<S>(datetime: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
pub fn serialize_datetime<S>(datetime: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
Expand Down
95 changes: 95 additions & 0 deletions hook-common/src/webhook.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections;
use std::convert::From;
use std::fmt;
use std::str::FromStr;

use serde::{de::Visitor, Deserialize, Serialize};

use crate::kafka_messages::{app_metrics, serialize_uuid};
use crate::pgqueue::PgQueueError;

/// Supported HTTP methods for webhooks.
Expand Down Expand Up @@ -134,3 +136,96 @@ pub struct WebhookJobMetadata {
pub plugin_id: Option<i32>,
pub plugin_config_id: Option<i32>,
}

/// An error originating during a Webhook Job invocation.
/// This is to be serialized to be stored as an error whenever retrying or failing a webhook job.
#[derive(Serialize, Debug)]
pub struct WebhookJobError {
pub r#type: app_metrics::ErrorType,
pub details: app_metrics::ErrorDetails,
#[serde(serialize_with = "serialize_uuid")]
pub uuid: uuid::Uuid,
}

/// Webhook jobs boil down to an HTTP request, so it's useful to have a way to convert from &reqwest::Error.
/// For the convertion we check all possible error types with the associated is_* methods provided by reqwest.
/// Some precision may be lost as our app_metrics::ErrorType does not support the same number of variants.
impl From<&reqwest::Error> for WebhookJobError {
fn from(error: &reqwest::Error) -> Self {
if error.is_timeout() {
WebhookJobError::new_timeout(&error.to_string())
} else if error.is_status() {
WebhookJobError::new_http_status(
error.status().expect("status code is defined").into(),
&error.to_string(),
)
} else {
// Catch all other errors as `app_metrics::ErrorType::Connection` errors.
// Not all of `reqwest::Error` may strictly be connection errors, so our supported error types may need an extension
// depending on how strict error reporting has to be.
WebhookJobError::new_connection(&error.to_string())
}
}
}

impl WebhookJobError {
pub fn new_timeout(message: &str) -> Self {
let error_details = app_metrics::Error {
name: "timeout".to_owned(),
message: Some(message.to_owned()),
stack: None,
};
Self {
r#type: app_metrics::ErrorType::Timeout,
details: app_metrics::ErrorDetails {
error: error_details,
},
uuid: uuid::Uuid::now_v7(),
}
}

pub fn new_connection(message: &str) -> Self {
let error_details = app_metrics::Error {
name: "connection error".to_owned(),
message: Some(message.to_owned()),
stack: None,
};
Self {
r#type: app_metrics::ErrorType::Connection,
details: app_metrics::ErrorDetails {
error: error_details,
},
uuid: uuid::Uuid::now_v7(),
}
}

pub fn new_http_status(status_code: u16, message: &str) -> Self {
let error_details = app_metrics::Error {
name: "http status".to_owned(),
message: Some(message.to_owned()),
stack: None,
};
Self {
r#type: app_metrics::ErrorType::HttpStatus(status_code),
details: app_metrics::ErrorDetails {
error: error_details,
},
uuid: uuid::Uuid::now_v7(),
}
}

pub fn new_parse(message: &str) -> Self {
let error_details = app_metrics::Error {
name: "parse error".to_owned(),
message: Some(message.to_owned()),
stack: None,
};
Self {
r#type: app_metrics::ErrorType::Parse,
details: app_metrics::ErrorDetails {
error: error_details,
},
uuid: uuid::Uuid::now_v7(),
}
}
}
2 changes: 1 addition & 1 deletion hook-consumer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ envconfig = { workspace = true }
futures = "0.3"
hook-common = { path = "../hook-common" }
http = { version = "0.2" }
reqwest = { version = "0.11" }
reqwest = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
sqlx = { workspace = true }
Expand Down
120 changes: 71 additions & 49 deletions hook-consumer/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::time;

use async_std::task;
use hook_common::pgqueue::{PgJobError, PgQueue, PgQueueError, PgTransactionJob};
use hook_common::webhook::{HttpMethod, WebhookJobMetadata, WebhookJobParameters};
use hook_common::webhook::{HttpMethod, WebhookJobError, WebhookJobMetadata, WebhookJobParameters};
use http::StatusCode;
use reqwest::header;
use tokio::sync;

use crate::error::WebhookConsumerError;
use crate::error::{ConsumerError, WebhookError};

/// A consumer to poll `PgQueue` and spawn tasks to process webhooks when a job becomes available.
pub struct WebhookConsumer<'p> {
Expand Down Expand Up @@ -57,8 +57,7 @@ impl<'p> WebhookConsumer<'p> {
/// Wait until a job becomes available in our queue.
async fn wait_for_job<'a>(
&self,
) -> Result<PgTransactionJob<'a, WebhookJobParameters, WebhookJobMetadata>, WebhookConsumerError>
{
) -> Result<PgTransactionJob<'a, WebhookJobParameters, WebhookJobMetadata>, ConsumerError> {
loop {
if let Some(job) = self.queue.dequeue_tx(&self.name).await? {
return Ok(job);
Expand All @@ -69,7 +68,7 @@ impl<'p> WebhookConsumer<'p> {
}

/// Run this consumer to continuously process any jobs that become available.
pub async fn run(&self) -> Result<(), WebhookConsumerError> {
pub async fn run(&self) -> Result<(), ConsumerError> {
let semaphore = Arc::new(sync::Semaphore::new(self.max_concurrent_jobs));

loop {
Expand Down Expand Up @@ -99,12 +98,12 @@ impl<'p> WebhookConsumer<'p> {
///
/// # Arguments
///
/// * `client`: An HTTP client to execute the webhook job request.
/// * `webhook_job`: The webhook job to process as dequeued from `hook_common::pgqueue::PgQueue`.
/// * `request_timeout`: A timeout for the HTTP request.
async fn process_webhook_job(
client: reqwest::Client,
webhook_job: PgTransactionJob<'_, WebhookJobParameters, WebhookJobMetadata>,
) -> Result<(), WebhookConsumerError> {
) -> Result<(), ConsumerError> {
match send_webhook(
client,
&webhook_job.job.parameters.method,
Expand All @@ -118,31 +117,53 @@ async fn process_webhook_job(
webhook_job
.complete()
.await
.map_err(|error| WebhookConsumerError::PgJobError(error.to_string()))?;
.map_err(|error| ConsumerError::PgJobError(error.to_string()))?;
Ok(())
}
Err(WebhookError::ParseHeadersError(e)) => {
webhook_job
.fail(WebhookJobError::new_parse(&e.to_string()))
.await
.map_err(|job_error| ConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
}
Err(WebhookConsumerError::RetryableWebhookError {
reason,
retry_after,
}) => match webhook_job.retry(reason.to_string(), retry_after).await {
Ok(_) => Ok(()),
Err(PgJobError::RetryInvalidError {
job: webhook_job,
error: fail_error,
}) => {
webhook_job
.fail(fail_error.to_string())
.await
.map_err(|job_error| WebhookConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
Err(WebhookError::ParseHttpMethodError(e)) => {
webhook_job
.fail(WebhookJobError::new_parse(&e))
.await
.map_err(|job_error| ConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
}
Err(WebhookError::ParseUrlError(e)) => {
webhook_job
.fail(WebhookJobError::new_parse(&e.to_string()))
.await
.map_err(|job_error| ConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
}
Err(WebhookError::RetryableRequestError { error, retry_after }) => {
match webhook_job
.retry(WebhookJobError::from(&error), retry_after)
.await
{
Ok(_) => Ok(()),
Err(PgJobError::RetryInvalidError {
job: webhook_job, ..
}) => {
webhook_job
.fail(WebhookJobError::from(&error))
.await
.map_err(|job_error| ConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
}
Err(job_error) => Err(ConsumerError::PgJobError(job_error.to_string())),
}
Err(job_error) => Err(WebhookConsumerError::PgJobError(job_error.to_string())),
},
Err(error) => {
}
Err(WebhookError::NonRetryableRetryableRequestError(error)) => {
webhook_job
.fail(error.to_string())
.fail(WebhookJobError::from(&error))
.await
.map_err(|job_error| WebhookConsumerError::PgJobError(job_error.to_string()))?;
.map_err(|job_error| ConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
}
}
Expand All @@ -152,23 +173,23 @@ async fn process_webhook_job(
///
/// # Arguments
///
/// * `client`: An HTTP client to execute the HTTP request.
/// * `method`: The HTTP method to use in the HTTP request.
/// * `url`: The URL we are targetting with our request. Parsing this URL fail.
/// * `headers`: Key, value pairs of HTTP headers in a `std::collections::HashMap`. Can fail if headers are not valid.
/// * `body`: The body of the request. Ownership is required.
/// * `timeout`: A timeout for the HTTP request.
async fn send_webhook(
client: reqwest::Client,
method: &HttpMethod,
url: &str,
headers: &collections::HashMap<String, String>,
body: String,
) -> Result<reqwest::Response, WebhookConsumerError> {
) -> Result<reqwest::Response, WebhookError> {
let method: http::Method = method.into();
let url: reqwest::Url = (url).parse().map_err(WebhookConsumerError::ParseUrlError)?;
let url: reqwest::Url = (url).parse().map_err(WebhookError::ParseUrlError)?;
let headers: reqwest::header::HeaderMap = (headers)
.try_into()
.map_err(WebhookConsumerError::ParseHeadersError)?;
.map_err(WebhookError::ParseHeadersError)?;
let body = reqwest::Body::from(body);

let response = client
Expand All @@ -177,27 +198,28 @@ async fn send_webhook(
.body(body)
.send()
.await
.map_err(|e| WebhookConsumerError::RetryableWebhookError {
reason: e.to_string(),
.map_err(|e| WebhookError::RetryableRequestError {
error: e,
retry_after: None,
})?;

let status = response.status();

if status.is_success() {
Ok(response)
} else if is_retryable_status(status) {
let retry_after = parse_retry_after_header(response.headers());

Err(WebhookConsumerError::RetryableWebhookError {
reason: format!("retryable status code {}", status),
retry_after,
})
} else {
Err(WebhookConsumerError::NonRetryableWebhookError(format!(
"non-retryable status code {}",
status
)))
let retry_after = parse_retry_after_header(response.headers());

match response.error_for_status() {
Ok(response) => Ok(response),
Err(err) => {
if is_retryable_status(
err.status()
.expect("status code is set as error is generated from a response"),
) {
Err(WebhookError::RetryableRequestError {
error: err,
retry_after,
})
} else {
Err(WebhookError::NonRetryableRetryableRequestError(err))
}
}
}
}

Expand Down
Loading