Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Append final errors to error array and treat request errors as retryable #11

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hook-common/src/pgqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ UPDATE
SET
finished_at = NOW(),
status = 'failed'::job_status
errors = array_append("{0}".errors, $3)
WHERE
"{0}".id = $2
AND queue = $1
Expand All @@ -261,6 +262,7 @@ RETURNING
sqlx::query(&base_query)
.bind(&failed_job.queue)
.bind(failed_job.id)
.bind(&failed_job.error)
.execute(&mut *self.connection)
.await
.map_err(|error| PgJobError::QueryError {
Expand Down Expand Up @@ -394,6 +396,7 @@ UPDATE
SET
finished_at = NOW(),
status = 'failed'::job_status
errors = array_append("{0}".errors, $3)
WHERE
"{0}".id = $2
AND queue = $1
Expand All @@ -406,6 +409,7 @@ RETURNING
sqlx::query(&base_query)
.bind(&failed_job.queue)
.bind(failed_job.id)
.bind(&failed_job.error)
.execute(&mut *self.transaction)
.await
.map_err(|error| PgJobError::QueryError {
Expand Down
19 changes: 12 additions & 7 deletions hook-consumer/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl<'p> WebhookConsumer<'p> {
poll_interval: time::Duration,
request_timeout: time::Duration,
max_concurrent_jobs: usize,
) -> Result<Self, WebhookConsumerError> {
) -> Self {
let mut headers = header::HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
Expand All @@ -42,15 +42,16 @@ impl<'p> WebhookConsumer<'p> {
let client = reqwest::Client::builder()
.default_headers(headers)
.timeout(request_timeout)
.build()?;
.build()
.expect("failed to construct reqwest client for webhook consumer");
Comment on lines +45 to +46
Copy link
Contributor

@tomasfarias tomasfarias Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is nothing fallible in this method if we make this change (as we'll panic), so we can return Self instead of a Result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye! I feel like clippy or something should tell me that. 🤔


Ok(Self {
Self {
name: name.to_owned(),
queue,
poll_interval,
client,
max_concurrent_jobs,
})
}
}

/// Wait until a job becomes available in our queue.
Expand Down Expand Up @@ -174,7 +175,11 @@ async fn send_webhook(
.headers(headers)
.body(body)
.send()
.await?;
.await
.map_err(|e| WebhookConsumerError::RetryableWebhookError {
reason: e.to_string(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we surface Webhook send errors to users, I think we'll want to tame this with an enum rather than leaving it open ended, but that seems unrelated to this change and I'm leaving it for later.

retry_after: None,
})?;

let status = response.status();

Expand Down Expand Up @@ -328,8 +333,8 @@ mod tests {
time::Duration::from_millis(100),
time::Duration::from_millis(5000),
10,
)
.expect("consumer failed to initialize");
);

let consumed_job = consumer
.wait_for_job()
.await
Expand Down
2 changes: 0 additions & 2 deletions hook-consumer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ pub enum WebhookConsumerError {
QueueError(#[from] pgqueue::PgQueueError),
#[error("an error occurred in the underlying job")]
PgJobError(String),
#[error("an error occurred when attempting to send a request")]
RequestError(#[from] reqwest::Error),
#[error("a webhook could not be delivered but it could be retried later: {reason}")]
RetryableWebhookError {
reason: String,
Expand Down
2 changes: 1 addition & 1 deletion hook-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> Result<(), WebhookConsumerError> {
config.poll_interval.0,
config.request_timeout.0,
config.max_concurrent_jobs,
)?;
);

let _ = consumer.run().await;

Expand Down