diff --git a/hook-api/src/handlers/app.rs b/hook-api/src/handlers/app.rs index 7b1e840..f8d4b24 100644 --- a/hook-api/src/handlers/app.rs +++ b/hook-api/src/handlers/app.rs @@ -30,9 +30,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn index(db: PgPool) { - let pg_queue = PgQueue::new_from_pool("test_index", db) - .await - .expect("failed to construct pg_queue"); + let pg_queue = PgQueue::new_from_pool("test_index", db).await; let app = add_routes(Router::new(), pg_queue); diff --git a/hook-api/src/handlers/webhook.rs b/hook-api/src/handlers/webhook.rs index 3712aa2..2575655 100644 --- a/hook-api/src/handlers/webhook.rs +++ b/hook-api/src/handlers/webhook.rs @@ -127,9 +127,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_success(db: PgPool) { - let pg_queue = PgQueue::new_from_pool("test_index", db) - .await - .expect("failed to construct pg_queue"); + let pg_queue = PgQueue::new_from_pool("test_index", db).await; let app = add_routes(Router::new(), pg_queue); @@ -172,9 +170,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_bad_url(db: PgPool) { - let pg_queue = PgQueue::new_from_pool("test_index", db) - .await - .expect("failed to construct pg_queue"); + let pg_queue = PgQueue::new_from_pool("test_index", db).await; let app = add_routes(Router::new(), pg_queue); @@ -212,9 +208,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_payload_missing_fields(db: PgPool) { - let pg_queue = PgQueue::new_from_pool("test_index", db) - .await - .expect("failed to construct pg_queue"); + let pg_queue = PgQueue::new_from_pool("test_index", db).await; let app = add_routes(Router::new(), pg_queue); @@ -235,9 +229,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_payload_not_json(db: PgPool) { - let pg_queue = PgQueue::new_from_pool("test_index", db) - .await - .expect("failed to construct pg_queue"); + let pg_queue = PgQueue::new_from_pool("test_index", db).await; let app = add_routes(Router::new(), pg_queue); @@ -258,9 +250,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_payload_body_too_large(db: PgPool) { - let pg_queue = PgQueue::new_from_pool("test_index", db) - .await - .expect("failed to construct pg_queue"); + let pg_queue = PgQueue::new_from_pool("test_index", db).await; let app = add_routes(Router::new(), pg_queue); diff --git a/hook-common/src/pgqueue.rs b/hook-common/src/pgqueue.rs index 4dab918..fd1fcc6 100644 --- a/hook-common/src/pgqueue.rs +++ b/hook-common/src/pgqueue.rs @@ -10,32 +10,48 @@ use serde; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; use thiserror::Error; -/// Enumeration of errors for operations with PgQueue. -/// Errors that can originate from sqlx and are wrapped by us to provide additional context. +/// Enumeration of parsing errors in PgQueue. #[derive(Error, Debug)] -pub enum PgQueueError { - #[error("pool creation failed with: {error}")] - PoolCreationError { error: sqlx::Error }, - #[error("connection failed with: {error}")] - ConnectionError { error: sqlx::Error }, - #[error("{command} query failed with: {error}")] - QueryError { command: String, error: sqlx::Error }, +pub enum ParseError { #[error("{0} is not a valid JobStatus")] ParseJobStatusError(String), #[error("{0} is not a valid HttpMethod")] ParseHttpMethodError(String), } +/// Enumeration of database-related errors in PgQueue. +/// Errors that can originate from sqlx and are wrapped by us to provide additional context. #[derive(Error, Debug)] -pub enum PgJobError { - #[error("retry is an invalid state for this PgJob: {error}")] - RetryInvalidError { job: T, error: String }, +pub enum DatabaseError { + #[error("pool creation failed with: {error}")] + PoolCreationError { error: sqlx::Error }, + #[error("connection failed with: {error}")] + ConnectionError { error: sqlx::Error }, #[error("{command} query failed with: {error}")] QueryError { command: String, error: sqlx::Error }, #[error("transaction {command} failed with: {error}")] TransactionError { command: String, error: sqlx::Error }, } +/// An error that occurs when a job cannot be retried. +/// Returns the underlying job so that a client can fail it. +#[derive(Error, Debug)] +#[error("retry is an invalid state for this job: {error}")] +pub struct RetryInvalidError { + pub job: T, + pub error: String, +} + +/// Enumeration of errors that can occur when retrying a job. +/// They are in a separate enum a failed retry could be returning the underlying job. +#[derive(Error, Debug)] +pub enum RetryError { + #[error(transparent)] + DatabaseError(#[from] DatabaseError), + #[error(transparent)] + RetryInvalidError(#[from] RetryInvalidError), +} + /// Enumeration of possible statuses for a Job. #[derive(Debug, PartialEq, sqlx::Type)] #[sqlx(type_name = "job_status")] @@ -57,7 +73,7 @@ pub enum JobStatus { /// Allow casting JobStatus from strings. impl FromStr for JobStatus { - type Err = PgQueueError; + type Err = ParseError; fn from_str(s: &str) -> Result { match s { @@ -65,7 +81,7 @@ impl FromStr for JobStatus { "completed" => Ok(JobStatus::Completed), "failed" => Ok(JobStatus::Failed), "running" => Ok(JobStatus::Running), - invalid => Err(PgQueueError::ParseJobStatusError(invalid.to_owned())), + invalid => Err(ParseError::ParseJobStatusError(invalid.to_owned())), } } } @@ -198,19 +214,19 @@ RETURNING #[async_trait] pub trait PgQueueJob { - async fn complete(mut self) -> Result>>; + async fn complete(mut self) -> Result; async fn fail( mut self, error: E, - ) -> Result, PgJobError>>; + ) -> Result, DatabaseError>; async fn retry( mut self, error: E, retry_interval: time::Duration, queue: &str, - ) -> Result>>; + ) -> Result>>; } /// A Job that can be updated in PostgreSQL. @@ -222,12 +238,12 @@ pub struct PgJob { #[async_trait] impl PgQueueJob for PgJob { - async fn complete(mut self) -> Result>>> { + async fn complete(mut self) -> Result { let completed_job = self .job .complete(&mut *self.connection) .await - .map_err(|error| PgJobError::QueryError { + .map_err(|error| DatabaseError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -238,12 +254,12 @@ impl PgQueueJob for PgJob { async fn fail( mut self, error: E, - ) -> Result, PgJobError>>> { + ) -> Result, DatabaseError> { let failed_job = self .job .fail(error, &mut *self.connection) .await - .map_err(|error| PgJobError::QueryError { + .map_err(|error| DatabaseError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -256,12 +272,12 @@ impl PgQueueJob for PgJob { error: E, retry_interval: time::Duration, queue: &str, - ) -> Result>>> { + ) -> Result>>> { if self.job.is_gte_max_attempts() { - return Err(PgJobError::RetryInvalidError { + return Err(RetryError::from(RetryInvalidError { job: Box::new(self), error: "Maximum attempts reached".to_owned(), - }); + })); } let retried_job = self @@ -270,7 +286,7 @@ impl PgQueueJob for PgJob { .queue(queue) .retry(error, retry_interval, &mut *self.connection) .await - .map_err(|error| PgJobError::QueryError { + .map_err(|error| DatabaseError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -289,14 +305,12 @@ pub struct PgTransactionJob<'c, J, M> { #[async_trait] impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactionJob<'c, J, M> { - async fn complete( - mut self, - ) -> Result>>> { + async fn complete(mut self) -> Result { let completed_job = self .job .complete(&mut *self.transaction) .await - .map_err(|error| PgJobError::QueryError { + .map_err(|error| DatabaseError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -304,7 +318,7 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio self.transaction .commit() .await - .map_err(|error| PgJobError::TransactionError { + .map_err(|error| DatabaseError::TransactionError { command: "COMMIT".to_owned(), error, })?; @@ -315,12 +329,12 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio async fn fail( mut self, error: S, - ) -> Result, PgJobError>>> { + ) -> Result, DatabaseError> { let failed_job = self .job .fail(error, &mut *self.transaction) .await - .map_err(|error| PgJobError::QueryError { + .map_err(|error| DatabaseError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -328,7 +342,7 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio self.transaction .commit() .await - .map_err(|error| PgJobError::TransactionError { + .map_err(|error| DatabaseError::TransactionError { command: "COMMIT".to_owned(), error, })?; @@ -341,14 +355,14 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio error: E, retry_interval: time::Duration, queue: &str, - ) -> Result>>> { + ) -> Result>>> { // Ideally, the transition to RetryableJob should be fallible. // But taking ownership of self when we return this error makes things difficult. if self.job.is_gte_max_attempts() { - return Err(PgJobError::RetryInvalidError { + return Err(RetryError::from(RetryInvalidError { job: Box::new(self), error: "Maximum attempts reached".to_owned(), - }); + })); } let retried_job = self @@ -357,7 +371,7 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio .queue(queue) .retry(error, retry_interval, &mut *self.transaction) .await - .map_err(|error| PgJobError::QueryError { + .map_err(|error| DatabaseError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -365,7 +379,7 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio self.transaction .commit() .await - .map_err(|error| PgJobError::TransactionError { + .map_err(|error| DatabaseError::TransactionError { command: "COMMIT".to_owned(), error, })?; @@ -515,7 +529,7 @@ pub struct PgQueue { pool: PgPool, } -pub type PgQueueResult = std::result::Result; +pub type PgQueueResult = std::result::Result; impl PgQueue { /// Initialize a new PgQueue backed by table in PostgreSQL by intializing a connection pool to the database in `url`. @@ -532,7 +546,7 @@ impl PgQueue { ) -> PgQueueResult { let name = queue_name.to_owned(); let options = PgConnectOptions::from_str(url) - .map_err(|error| PgQueueError::PoolCreationError { error })? + .map_err(|error| DatabaseError::PoolCreationError { error })? .application_name(app_name); let pool = PgPoolOptions::new() .max_connections(max_connections) @@ -547,10 +561,10 @@ impl PgQueue { /// /// * `queue_name`: A name for the queue we are going to initialize. /// * `pool`: A database connection pool to be used by this queue. - pub async fn new_from_pool(queue_name: &str, pool: PgPool) -> PgQueueResult { + pub async fn new_from_pool(queue_name: &str, pool: PgPool) -> PgQueue { let name = queue_name.to_owned(); - Ok(Self { name, pool }) + Self { name, pool } } /// Dequeue a `Job` from this `PgQueue`. @@ -566,7 +580,7 @@ impl PgQueue { .pool .acquire() .await - .map_err(|error| PgQueueError::ConnectionError { error })?; + .map_err(|error| DatabaseError::ConnectionError { error })?; // The query that follows uses a FOR UPDATE SKIP LOCKED clause. // For more details on this see: 2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5. @@ -618,7 +632,7 @@ RETURNING } Err(e) => { let _ = connection.close().await; - Err(PgQueueError::QueryError { + Err(DatabaseError::QueryError { command: "UPDATE".to_owned(), error: e, }) @@ -641,7 +655,7 @@ RETURNING .pool .begin() .await - .map_err(|error| PgQueueError::ConnectionError { error })?; + .map_err(|error| DatabaseError::ConnectionError { error })?; // The query that follows uses a FOR UPDATE SKIP LOCKED clause. // For more details on this see: 2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5. @@ -690,7 +704,7 @@ RETURNING // Transaction is rolledback on drop. Err(sqlx::Error::RowNotFound) => Ok(None), - Err(e) => Err(PgQueueError::QueryError { + Err(e) => Err(DatabaseError::QueryError { command: "UPDATE".to_owned(), error: e, }), @@ -722,7 +736,7 @@ VALUES .bind(&job.target) .execute(&self.pool) .await - .map_err(|error| PgQueueError::QueryError { + .map_err(|error| DatabaseError::QueryError { command: "INSERT".to_owned(), error, })?; @@ -788,9 +802,7 @@ mod tests { let worker_id = worker_id(); let new_job = NewJob::new(1, job_metadata, job_parameters, &job_target); - let queue = PgQueue::new_from_pool("test_can_dequeue_job", db) - .await - .expect("failed to connect to local test postgresql database"); + let queue = PgQueue::new_from_pool("test_can_dequeue_job", db).await; queue.enqueue(new_job).await.expect("failed to enqueue job"); @@ -812,9 +824,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_dequeue_returns_none_on_no_jobs(db: PgPool) { let worker_id = worker_id(); - let queue = PgQueue::new_from_pool("test_dequeue_returns_none_on_no_jobs", db) - .await - .expect("failed to connect to local test postgresql database"); + let queue = PgQueue::new_from_pool("test_dequeue_returns_none_on_no_jobs", db).await; let pg_job: Option> = queue .dequeue(&worker_id) @@ -832,9 +842,7 @@ mod tests { let worker_id = worker_id(); let new_job = NewJob::new(1, job_metadata, job_parameters, &job_target); - let queue = PgQueue::new_from_pool("test_can_dequeue_tx_job", db) - .await - .expect("failed to connect to local test postgresql database"); + let queue = PgQueue::new_from_pool("test_can_dequeue_tx_job", db).await; queue.enqueue(new_job).await.expect("failed to enqueue job"); @@ -857,9 +865,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_dequeue_tx_returns_none_on_no_jobs(db: PgPool) { let worker_id = worker_id(); - let queue = PgQueue::new_from_pool("test_dequeue_tx_returns_none_on_no_jobs", db) - .await - .expect("failed to connect to local test postgresql database"); + let queue = PgQueue::new_from_pool("test_dequeue_tx_returns_none_on_no_jobs", db).await; let tx_job: Option> = queue .dequeue_tx(&worker_id) @@ -882,9 +888,7 @@ mod tests { .queue(&queue_name) .provide(); - let queue = PgQueue::new_from_pool(&queue_name, db) - .await - .expect("failed to connect to local test postgresql database"); + let queue = PgQueue::new_from_pool(&queue_name, db).await; queue.enqueue(new_job).await.expect("failed to enqueue job"); let job: PgJob = queue @@ -936,9 +940,7 @@ mod tests { .queue(&retry_queue_name) .provide(); - let queue = PgQueue::new_from_pool(&queue_name, db.clone()) - .await - .expect("failed to connect to queue in local test postgresql database"); + let queue = PgQueue::new_from_pool(&queue_name, db.clone()).await; queue.enqueue(new_job).await.expect("failed to enqueue job"); let job: PgJob = queue @@ -965,9 +967,7 @@ mod tests { assert!(retried_job_not_found.is_none()); - let queue = PgQueue::new_from_pool(&retry_queue_name, db) - .await - .expect("failed to connect to retry queue in local test postgresql database"); + let queue = PgQueue::new_from_pool(&retry_queue_name, db).await; let retried_job: PgJob = queue .dequeue(&worker_id) @@ -997,9 +997,8 @@ mod tests { let new_job = NewJob::new(1, job_metadata, job_parameters, &job_target); let retry_policy = RetryPolicy::build(0, time::Duration::from_secs(0)).provide(); - let queue = PgQueue::new_from_pool("test_cannot_retry_job_without_remaining_attempts", db) - .await - .expect("failed to connect to local test postgresql database"); + let queue = + PgQueue::new_from_pool("test_cannot_retry_job_without_remaining_attempts", db).await; queue.enqueue(new_job).await.expect("failed to enqueue job"); diff --git a/hook-common/src/webhook.rs b/hook-common/src/webhook.rs index 4122c20..bfa0ee3 100644 --- a/hook-common/src/webhook.rs +++ b/hook-common/src/webhook.rs @@ -8,7 +8,7 @@ use serde::{de::Visitor, Deserialize, Serialize}; use crate::kafka_messages::app_metrics; use crate::kafka_messages::{deserialize_datetime, serialize_datetime}; -use crate::pgqueue::PgQueueError; +use crate::pgqueue::ParseError; /// Supported HTTP methods for webhooks. #[derive(Debug, PartialEq, Clone, Copy)] @@ -22,7 +22,7 @@ pub enum HttpMethod { /// Allow casting `HttpMethod` from strings. impl FromStr for HttpMethod { - type Err = PgQueueError; + type Err = ParseError; fn from_str(s: &str) -> Result { match s.to_ascii_uppercase().as_ref() { @@ -31,7 +31,7 @@ impl FromStr for HttpMethod { "PATCH" => Ok(HttpMethod::PATCH), "POST" => Ok(HttpMethod::POST), "PUT" => Ok(HttpMethod::PUT), - invalid => Err(PgQueueError::ParseHttpMethodError(invalid.to_owned())), + invalid => Err(ParseError::ParseHttpMethodError(invalid.to_owned())), } } } diff --git a/hook-janitor/src/webhooks.rs b/hook-janitor/src/webhooks.rs index c1390a7..5920cff 100644 --- a/hook-janitor/src/webhooks.rs +++ b/hook-janitor/src/webhooks.rs @@ -836,9 +836,7 @@ mod tests { WebhookCleaner::new_from_pool(db.clone(), mock_producer, APP_METRICS_TOPIC.to_owned()) .expect("unable to create webhook cleaner"); - let queue = PgQueue::new_from_pool("webhooks", db.clone()) - .await - .expect("failed to connect to local test postgresql database"); + let queue = PgQueue::new_from_pool("webhooks", db.clone()).await; async fn get_count_from_new_conn(db: &PgPool, status: &str) -> i64 { let mut conn = db.acquire().await.unwrap(); diff --git a/hook-worker/src/error.rs b/hook-worker/src/error.rs index 614fe72..914ffb1 100644 --- a/hook-worker/src/error.rs +++ b/hook-worker/src/error.rs @@ -24,10 +24,10 @@ pub enum WebhookError { /// Enumeration of errors related to initialization and consumption of webhook jobs. #[derive(Error, Debug)] pub enum WorkerError { + #[error("a database error occurred when executing a job")] + DatabaseError(#[from] pgqueue::DatabaseError), + #[error("a parsing error occurred in the underlying queue")] + QueueParseError(#[from] pgqueue::ParseError), #[error("timed out while waiting for jobs to be available")] TimeoutError, - #[error("an error occurred in the underlying queue")] - QueueError(#[from] pgqueue::PgQueueError), - #[error("an error occurred in the underlying job: {0}")] - PgJobError(String), } diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index f277a8b..58f71f9 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -5,7 +5,10 @@ use std::time; use chrono::Utc; use hook_common::health::HealthHandle; use hook_common::{ - pgqueue::{Job, PgJob, PgJobError, PgQueue, PgQueueError, PgQueueJob, PgTransactionJob}, + pgqueue::{ + DatabaseError, Job, PgJob, PgQueue, PgQueueJob, PgTransactionJob, RetryError, + RetryInvalidError, + }, retry::RetryPolicy, webhook::{HttpMethod, WebhookJobError, WebhookJobMetadata, WebhookJobParameters}, }; @@ -269,10 +272,10 @@ async fn process_webhook_job( metrics::histogram!("webhook_jobs_end_to_end_duration_seconds", &labels) .record((end_to_end_duration.num_milliseconds() as f64) / 1_000_f64); - webhook_job - .complete() - .await - .map_err(|error| WorkerError::PgJobError(error.to_string()))?; + webhook_job.complete().await.map_err(|error| { + metrics::counter!("webhook_jobs_database_error", &labels).increment(1); + error + })?; metrics::counter!("webhook_jobs_completed", &labels).increment(1); metrics::histogram!("webhook_jobs_processing_duration_seconds", &labels) @@ -286,7 +289,7 @@ async fn process_webhook_job( .await .map_err(|job_error| { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - WorkerError::PgJobError(job_error.to_string()) + job_error })?; metrics::counter!("webhook_jobs_failed", &labels).increment(1); @@ -299,7 +302,7 @@ async fn process_webhook_job( .await .map_err(|job_error| { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - WorkerError::PgJobError(job_error.to_string()) + job_error })?; metrics::counter!("webhook_jobs_failed", &labels).increment(1); @@ -312,7 +315,7 @@ async fn process_webhook_job( .await .map_err(|job_error| { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - WorkerError::PgJobError(job_error.to_string()) + job_error })?; metrics::counter!("webhook_jobs_failed", &labels).increment(1); @@ -334,25 +337,24 @@ async fn process_webhook_job( Ok(()) } - Err(PgJobError::RetryInvalidError { + Err(RetryError::RetryInvalidError(RetryInvalidError { job: webhook_job, .. - }) => { + })) => { webhook_job .fail(WebhookJobError::from(&error)) .await .map_err(|job_error| { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - WorkerError::PgJobError(job_error.to_string()) + job_error })?; metrics::counter!("webhook_jobs_failed", &labels).increment(1); Ok(()) } - Err(job_error) => { + Err(RetryError::DatabaseError(job_error)) => { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - - Err(WorkerError::PgJobError(job_error.to_string())) + Err(WorkerError::from(job_error)) } } } @@ -362,7 +364,7 @@ async fn process_webhook_job( .await .map_err(|job_error| { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); - WorkerError::PgJobError(job_error.to_string()) + job_error })?; metrics::counter!("webhook_jobs_failed", &labels).increment(1); @@ -494,7 +496,7 @@ mod tests { max_attempts: i32, job_parameters: WebhookJobParameters, job_metadata: WebhookJobMetadata, - ) -> Result<(), PgQueueError> { + ) -> Result<(), DatabaseError> { let job_target = job_parameters.url.to_owned(); let new_job = NewJob::new(max_attempts, job_metadata, job_parameters, &job_target); queue.enqueue(new_job).await?; @@ -535,9 +537,7 @@ mod tests { async fn test_wait_for_job(db: PgPool) { let worker_id = worker_id(); let queue_name = "test_wait_for_job".to_string(); - let queue = PgQueue::new_from_pool(&queue_name, db) - .await - .expect("failed to connect to PG"); + let queue = PgQueue::new_from_pool(&queue_name, db).await; let webhook_job_parameters = WebhookJobParameters { body: "a webhook job body. much wow.".to_owned(),