diff --git a/hook-common/src/pgqueue.rs b/hook-common/src/pgqueue.rs index 4dab918..af91fbd 100644 --- a/hook-common/src/pgqueue.rs +++ b/hook-common/src/pgqueue.rs @@ -1,14 +1,17 @@ //! # PgQueue //! //! A job queue implementation backed by a PostgreSQL table. -use std::str::FromStr; use std::time; +use std::{str::FromStr, sync::Arc}; use async_trait::async_trait; use chrono; use serde; +use sqlx::postgres::any::AnyConnectionBackend; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; use thiserror::Error; +use tokio::sync::Mutex; +use tracing::error; /// Enumeration of errors for operations with PgQueue. /// Errors that can originate from sqlx and are wrapped by us to provide additional context. @@ -24,16 +27,22 @@ pub enum PgQueueError { ParseJobStatusError(String), #[error("{0} is not a valid HttpMethod")] ParseHttpMethodError(String), + #[error("transaction was already closed")] + TransactionAlreadyClosedError, } #[derive(Error, Debug)] pub enum PgJobError { #[error("retry is an invalid state for this PgJob: {error}")] RetryInvalidError { job: T, error: String }, + #[error("connection failed with: {error}")] + ConnectionError { error: sqlx::Error }, #[error("{command} query failed with: {error}")] QueryError { command: String, error: sqlx::Error }, #[error("transaction {command} failed with: {error}")] TransactionError { command: String, error: sqlx::Error }, + #[error("transaction was already closed")] + TransactionAlreadyClosedError, } /// Enumeration of possible statuses for a Job. @@ -217,20 +226,39 @@ pub trait PgQueueJob { #[derive(Debug)] pub struct PgJob { pub job: Job, - pub connection: sqlx::pool::PoolConnection, + pub pool: PgPool, +} + +// Container struct for a batch of PgJobs. +pub struct PgBatch { + pub jobs: Vec>, +} + +impl PgJob { + async fn acquire_conn( + &mut self, + ) -> Result, PgJobError>>> + { + self.pool + .acquire() + .await + .map_err(|error| PgJobError::ConnectionError { error }) + } } #[async_trait] impl PgQueueJob for PgJob { async fn complete(mut self) -> Result>>> { - let completed_job = self - .job - .complete(&mut *self.connection) - .await - .map_err(|error| PgJobError::QueryError { - command: "UPDATE".to_owned(), - error, - })?; + let mut connection = self.acquire_conn().await?; + + let completed_job = + self.job + .complete(&mut *connection) + .await + .map_err(|error| PgJobError::QueryError { + command: "UPDATE".to_owned(), + error, + })?; Ok(completed_job) } @@ -239,9 +267,11 @@ impl PgQueueJob for PgJob { mut self, error: E, ) -> Result, PgJobError>>> { + let mut connection = self.acquire_conn().await?; + let failed_job = self .job - .fail(error, &mut *self.connection) + .fail(error, &mut *connection) .await .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), @@ -264,11 +294,13 @@ impl PgQueueJob for PgJob { }); } + let mut connection = self.acquire_conn().await?; + let retried_job = self .job .retryable() .queue(queue) - .retry(error, retry_interval, &mut *self.connection) + .retry(error, retry_interval, &mut *connection) .await .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), @@ -284,7 +316,39 @@ impl PgQueueJob for PgJob { #[derive(Debug)] pub struct PgTransactionJob<'c, J, M> { pub job: Job, - pub transaction: sqlx::Transaction<'c, sqlx::postgres::Postgres>, + + /// The open transaction this job came from. If multiple jobs were queried at once, then this + /// transaction will be shared between them (across async tasks and threads as necessary). See + /// below for more information. + shared_txn: Arc>>>, +} + +// Container struct for a batch of PgTransactionJob. Includes a reference to the shared transaction +// for committing the work when all of the jobs are finished. +pub struct PgTransactionBatch<'c, J, M> { + pub jobs: Vec>, + + /// The open transaction the jobs in the Vec came from. This should be used to commit or + /// rollback when all of the work is finished. + shared_txn: Arc>>>, +} + +impl<'c, J, M> PgTransactionBatch<'_, J, M> { + pub async fn commit(self) -> PgQueueResult<()> { + let mut txn_guard = self.shared_txn.lock().await; + + txn_guard + .as_deref_mut() + .ok_or(PgQueueError::TransactionAlreadyClosedError)? + .commit() + .await + .map_err(|e| PgQueueError::QueryError { + command: "COMMIT".to_owned(), + error: e, + })?; + + Ok(()) + } } #[async_trait] @@ -292,22 +356,20 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio async fn complete( mut self, ) -> Result>>> { - let completed_job = self - .job - .complete(&mut *self.transaction) - .await - .map_err(|error| PgJobError::QueryError { - command: "UPDATE".to_owned(), - error, - })?; + let mut txn_guard = self.shared_txn.lock().await; - self.transaction - .commit() - .await - .map_err(|error| PgJobError::TransactionError { - command: "COMMIT".to_owned(), - error, - })?; + let txn_ref = txn_guard + .as_deref_mut() + .ok_or(PgJobError::TransactionAlreadyClosedError)?; + + let completed_job = + self.job + .complete(txn_ref) + .await + .map_err(|error| PgJobError::QueryError { + command: "UPDATE".to_owned(), + error, + })?; Ok(completed_job) } @@ -316,22 +378,20 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio mut self, error: S, ) -> Result, PgJobError>>> { - let failed_job = self - .job - .fail(error, &mut *self.transaction) - .await - .map_err(|error| PgJobError::QueryError { - command: "UPDATE".to_owned(), - error, - })?; + let mut txn_guard = self.shared_txn.lock().await; - self.transaction - .commit() - .await - .map_err(|error| PgJobError::TransactionError { - command: "COMMIT".to_owned(), - error, - })?; + let txn_ref = txn_guard + .as_deref_mut() + .ok_or(PgJobError::TransactionAlreadyClosedError)?; + + let failed_job = + self.job + .fail(error, txn_ref) + .await + .map_err(|error| PgJobError::QueryError { + command: "UPDATE".to_owned(), + error, + })?; Ok(failed_job) } @@ -351,25 +411,23 @@ impl<'c, J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgTransactio }); } + let mut txn_guard = self.shared_txn.lock().await; + + let txn_ref = txn_guard + .as_deref_mut() + .ok_or(PgJobError::TransactionAlreadyClosedError)?; + let retried_job = self .job .retryable() .queue(queue) - .retry(error, retry_interval, &mut *self.transaction) + .retry(error, retry_interval, txn_ref) .await .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), error, })?; - self.transaction - .commit() - .await - .map_err(|error| PgJobError::TransactionError { - command: "COMMIT".to_owned(), - error, - })?; - Ok(retried_job) } } @@ -553,15 +611,16 @@ impl PgQueue { Ok(Self { name, pool }) } - /// Dequeue a `Job` from this `PgQueue`. - /// The `Job` will be updated to `'running'` status, so any other `dequeue` calls will skip it. + /// Dequeue up to `limit` `Job`s from this `PgQueue`. + /// The `Job`s will be updated to `'running'` status, so any other `dequeue` calls will skip it. pub async fn dequeue< J: for<'d> serde::Deserialize<'d> + std::marker::Send + std::marker::Unpin + 'static, M: for<'d> serde::Deserialize<'d> + std::marker::Send + std::marker::Unpin + 'static, >( &self, attempted_by: &str, - ) -> PgQueueResult>> { + limit: u32, + ) -> PgQueueResult>> { let mut connection = self .pool .acquire() @@ -583,7 +642,7 @@ WITH available_in_queue AS ( ORDER BY attempt, scheduled_at - LIMIT 1 + LIMIT $2 FOR UPDATE SKIP LOCKED ) UPDATE @@ -592,7 +651,7 @@ SET attempted_at = NOW(), status = 'running'::job_status, attempt = attempt + 1, - attempted_by = array_append(attempted_by, $2::text) + attempted_by = array_append(attempted_by, $3::text) FROM available_in_queue WHERE @@ -601,14 +660,29 @@ RETURNING job_queue.* "#; - let query_result: Result, sqlx::Error> = sqlx::query_as(base_query) + let query_result: Result>, sqlx::Error> = sqlx::query_as(base_query) .bind(&self.name) + .bind(limit as i64) .bind(attempted_by) - .fetch_one(&mut *connection) + .fetch_all(&mut *connection) .await; match query_result { - Ok(job) => Ok(Some(PgJob { job, connection })), + Ok(jobs) => { + if jobs.is_empty() { + return Ok(None); + } + + let pg_jobs: Vec> = jobs + .into_iter() + .map(|job| PgJob { + job, + pool: self.pool.clone(), + }) + .collect(); + + Ok(Some(PgBatch { jobs: pg_jobs })) + } // Although connection would be closed once it goes out of scope, sqlx recommends explicitly calling close(). // See: https://docs.rs/sqlx/latest/sqlx/postgres/any/trait.AnyConnectionBackend.html#tymethod.close. @@ -616,6 +690,7 @@ RETURNING let _ = connection.close().await; Ok(None) } + Err(e) => { let _ = connection.close().await; Err(PgQueueError::QueryError { @@ -626,9 +701,10 @@ RETURNING } } - /// Dequeue a `Job` from this `PgQueue` and hold the transaction. - /// Any other `dequeue_tx` calls will skip rows locked, so by holding a transaction we ensure only one worker can dequeue a job. - /// Holding a transaction open can have performance implications, but it means no `'running'` state is required. + /// Dequeue up to `limit` `Job`s from this `PgQueue` and hold the transaction. Any other + /// `dequeue_tx` calls will skip rows locked, so by holding a transaction we ensure only one + /// worker can dequeue a job. Holding a transaction open can have performance implications, but + /// it means no `'running'` state is required. pub async fn dequeue_tx< 'a, J: for<'d> serde::Deserialize<'d> + std::marker::Send + std::marker::Unpin + 'static, @@ -636,7 +712,8 @@ RETURNING >( &self, attempted_by: &str, - ) -> PgQueueResult>> { + limit: u32, + ) -> PgQueueResult>> { let mut tx = self .pool .begin() @@ -658,7 +735,7 @@ WITH available_in_queue AS ( ORDER BY attempt, scheduled_at - LIMIT 1 + LIMIT $2 FOR UPDATE SKIP LOCKED ) UPDATE @@ -667,7 +744,7 @@ SET attempted_at = NOW(), status = 'running'::job_status, attempt = attempt + 1, - attempted_by = array_append(attempted_by, $2::text) + attempted_by = array_append(attempted_by, $3::text) FROM available_in_queue WHERE @@ -676,20 +753,38 @@ RETURNING job_queue.* "#; - let query_result: Result, sqlx::Error> = sqlx::query_as(base_query) + let query_result: Result>, sqlx::Error> = sqlx::query_as(base_query) .bind(&self.name) + .bind(limit as i64) .bind(attempted_by) - .fetch_one(&mut *tx) + .fetch_all(&mut *tx) .await; match query_result { - Ok(job) => Ok(Some(PgTransactionJob { - job, - transaction: tx, - })), + Ok(jobs) => { + if jobs.is_empty() { + return Ok(None); + } + + let shared_txn = Arc::new(Mutex::new(Some(tx))); + + let pg_jobs: Vec> = jobs + .into_iter() + .map(|job| PgTransactionJob { + job, + shared_txn: shared_txn.clone(), + }) + .collect(); + + Ok(Some(PgTransactionBatch { + jobs: pg_jobs, + shared_txn: shared_txn.clone(), + })) + } - // Transaction is rolledback on drop. + // Transaction is rolled back on drop. Err(sqlx::Error::RowNotFound) => Ok(None), + Err(e) => Err(PgQueueError::QueryError { command: "UPDATE".to_owned(), error: e, @@ -736,7 +831,7 @@ mod tests { use super::*; use crate::retry::RetryPolicy; - #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)] + #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)] struct JobMetadata { team_id: u32, plugin_config_id: i32, @@ -753,7 +848,7 @@ mod tests { } } - #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)] + #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)] struct JobParameters { method: String, body: String, @@ -795,10 +890,13 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let pg_job: PgJob = queue - .dequeue(&worker_id) + .dequeue(&worker_id, 1) .await - .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); + .expect("failed to dequeue jobs") + .expect("didn't find any jobs to dequeue") + .jobs + .pop() + .unwrap(); assert_eq!(pg_job.job.attempt, 1); assert!(pg_job.job.attempted_by.contains(&worker_id)); @@ -816,12 +914,62 @@ mod tests { .await .expect("failed to connect to local test postgresql database"); - let pg_job: Option> = queue - .dequeue(&worker_id) + let pg_jobs: Option> = queue + .dequeue(&worker_id, 1) .await - .expect("failed to dequeue job"); + .expect("failed to dequeue jobs"); + + assert!(pg_jobs.is_none()); + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_can_dequeue_multiple_jobs(db: PgPool) { + let job_target = job_target(); + let job_metadata = JobMetadata::default(); + let job_parameters = JobParameters::default(); + let worker_id = worker_id(); - assert!(pg_job.is_none()); + let queue = PgQueue::new_from_pool("test_can_dequeue_multiple_jobs", db) + .await + .expect("failed to connect to local test postgresql database"); + + for _ in 0..5 { + queue + .enqueue(NewJob::new( + 1, + job_metadata.clone(), + job_parameters.clone(), + &job_target, + )) + .await + .expect("failed to enqueue job"); + } + + // Only get 4 jobs, leaving one in the queue. + let limit = 4; + let batch: PgBatch = queue + .dequeue(&worker_id, limit) + .await + .expect("failed to dequeue job") + .expect("didn't find a job to dequeue"); + + // Complete those 4. + assert_eq!(batch.jobs.len(), limit as usize); + for job in batch.jobs { + job.complete().await.expect("failed to complete job"); + } + + // Try to get up to 4 jobs, but only 1 remains. + let batch: PgBatch = queue + .dequeue(&worker_id, limit) + .await + .expect("failed to dequeue job") + .expect("didn't find a job to dequeue"); + + assert_eq!(batch.jobs.len(), 1); // Only one job should have been left in the queue. + for job in batch.jobs { + job.complete().await.expect("failed to complete job"); + } } #[sqlx::test(migrations = "../migrations")] @@ -830,19 +978,21 @@ mod tests { let job_metadata = JobMetadata::default(); let job_parameters = JobParameters::default(); let worker_id = worker_id(); - let new_job = NewJob::new(1, job_metadata, job_parameters, &job_target); let queue = PgQueue::new_from_pool("test_can_dequeue_tx_job", db) .await .expect("failed to connect to local test postgresql database"); + let new_job = NewJob::new(1, job_metadata, job_parameters, &job_target); queue.enqueue(new_job).await.expect("failed to enqueue job"); - let tx_job: PgTransactionJob<'_, JobParameters, JobMetadata> = queue - .dequeue_tx(&worker_id) + let mut batch: PgTransactionBatch<'_, JobParameters, JobMetadata> = queue + .dequeue_tx(&worker_id, 1) .await - .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); + .expect("failed to dequeue jobs") + .expect("didn't find any jobs to dequeue"); + + let tx_job = batch.jobs.pop().unwrap(); assert_eq!(tx_job.job.attempt, 1); assert!(tx_job.job.attempted_by.contains(&worker_id)); @@ -852,6 +1002,65 @@ mod tests { assert_eq!(*tx_job.job.parameters.as_ref(), JobParameters::default()); assert_eq!(tx_job.job.status, JobStatus::Running); assert_eq!(tx_job.job.target, job_target); + + // Transactional jobs must be completed, failed or retried before being dropped. This is + // to prevent logic bugs when using the shared txn. + tx_job.complete().await.expect("failed to complete job"); + + batch.commit().await.expect("failed to commit transaction"); + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_can_dequeue_multiple_tx_jobs(db: PgPool) { + let job_target = job_target(); + let job_metadata = JobMetadata::default(); + let job_parameters = JobParameters::default(); + let worker_id = worker_id(); + + let queue = PgQueue::new_from_pool("test_can_dequeue_multiple_tx_jobs", db) + .await + .expect("failed to connect to local test postgresql database"); + + for _ in 0..5 { + queue + .enqueue(NewJob::new( + 1, + job_metadata.clone(), + job_parameters.clone(), + &job_target, + )) + .await + .expect("failed to enqueue job"); + } + + // Only get 4 jobs, leaving one in the queue. + let limit = 4; + let mut batch: PgTransactionBatch<'_, JobParameters, JobMetadata> = queue + .dequeue_tx(&worker_id, limit) + .await + .expect("failed to dequeue job") + .expect("didn't find a job to dequeue"); + + assert_eq!(batch.jobs.len(), limit as usize); + + // Complete those 4 and commit. + for job in std::mem::take(&mut batch.jobs) { + job.complete().await.expect("failed to complete job"); + } + batch.commit().await.expect("failed to commit transaction"); + + // Try to get up to 4 jobs, but only 1 remains. + let mut batch: PgTransactionBatch<'_, JobParameters, JobMetadata> = queue + .dequeue_tx(&worker_id, limit) + .await + .expect("failed to dequeue job") + .expect("didn't find a job to dequeue"); + assert_eq!(batch.jobs.len(), 1); // Only one job should have been left in the queue. + + for job in std::mem::take(&mut batch.jobs) { + job.complete().await.expect("failed to complete job"); + } + batch.commit().await.expect("failed to commit transaction"); } #[sqlx::test(migrations = "../migrations")] @@ -861,12 +1070,12 @@ mod tests { .await .expect("failed to connect to local test postgresql database"); - let tx_job: Option> = queue - .dequeue_tx(&worker_id) + let batch: Option> = queue + .dequeue_tx(&worker_id, 1) .await .expect("failed to dequeue job"); - assert!(tx_job.is_none()); + assert!(batch.is_none()); } #[sqlx::test(migrations = "../migrations")] @@ -888,10 +1097,13 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let job: PgJob = queue - .dequeue(&worker_id) + .dequeue(&worker_id, 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); + .expect("didn't find a job to dequeue") + .jobs + .pop() + .unwrap(); let retry_interval = retry_policy.retry_interval(job.job.attempt as u32, None); let retry_queue = retry_policy.retry_queue(&job.job.queue).to_owned(); @@ -905,10 +1117,13 @@ mod tests { .expect("failed to retry job"); let retried_job: PgJob = queue - .dequeue(&worker_id) + .dequeue(&worker_id, 1) .await .expect("failed to dequeue job") - .expect("didn't find retried job to dequeue"); + .expect("didn't find retried job to dequeue") + .jobs + .pop() + .unwrap(); assert_eq!(retried_job.job.attempt, 2); assert!(retried_job.job.attempted_by.contains(&worker_id)); @@ -942,10 +1157,13 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let job: PgJob = queue - .dequeue(&worker_id) + .dequeue(&worker_id, 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); + .expect("didn't find a job to dequeue") + .jobs + .pop() + .unwrap(); let retry_interval = retry_policy.retry_interval(job.job.attempt as u32, None); let retry_queue = retry_policy.retry_queue(&job.job.queue).to_owned(); @@ -958,8 +1176,8 @@ mod tests { .await .expect("failed to retry job"); - let retried_job_not_found: Option> = queue - .dequeue(&worker_id) + let retried_job_not_found: Option> = queue + .dequeue(&worker_id, 1) .await .expect("failed to dequeue job"); @@ -970,10 +1188,13 @@ mod tests { .expect("failed to connect to retry queue in local test postgresql database"); let retried_job: PgJob = queue - .dequeue(&worker_id) + .dequeue(&worker_id, 1) .await .expect("failed to dequeue job") - .expect("job not found in retry queue"); + .expect("job not found in retry queue") + .jobs + .pop() + .unwrap(); assert_eq!(retried_job.job.attempt, 2); assert!(retried_job.job.attempted_by.contains(&worker_id)); @@ -1004,10 +1225,13 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let job: PgJob = queue - .dequeue(&worker_id) + .dequeue(&worker_id, 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); + .expect("didn't find a job to dequeue") + .jobs + .pop() + .unwrap(); let retry_interval = retry_policy.retry_interval(job.job.attempt as u32, None); diff --git a/hook-janitor/src/webhooks.rs b/hook-janitor/src/webhooks.rs index 5cdf431..e3b137c 100644 --- a/hook-janitor/src/webhooks.rs +++ b/hook-janitor/src/webhooks.rs @@ -870,10 +870,13 @@ mod tests { { // The fixtures include an available job, so let's complete it while the txn is open. let webhook_job: PgJob = queue - .dequeue(&"worker_id") + .dequeue(&"worker_id", 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); + .expect("didn't find a job to dequeue") + .jobs + .pop() + .unwrap(); webhook_job .complete() .await @@ -896,10 +899,13 @@ mod tests { let new_job = NewJob::new(1, job_metadata, job_parameters, &"target"); queue.enqueue(new_job).await.expect("failed to enqueue job"); let webhook_job: PgJob = queue - .dequeue(&"worker_id") + .dequeue(&"worker_id", 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); + .expect("didn't find a job to dequeue") + .jobs + .pop() + .unwrap(); webhook_job .complete() .await diff --git a/hook-worker/src/config.rs b/hook-worker/src/config.rs index 477ff74..32e49f7 100644 --- a/hook-worker/src/config.rs +++ b/hook-worker/src/config.rs @@ -37,6 +37,9 @@ pub struct Config { #[envconfig(default = "true")] pub transactional: bool, + + #[envconfig(default = "1")] + pub dequeue_batch_size: u32, } impl Config { diff --git a/hook-worker/src/main.rs b/hook-worker/src/main.rs index 6cad3fd..fede7d2 100644 --- a/hook-worker/src/main.rs +++ b/hook-worker/src/main.rs @@ -47,6 +47,7 @@ async fn main() -> Result<(), WorkerError> { let worker = WebhookWorker::new( &config.worker_name, &queue, + config.dequeue_batch_size, config.poll_interval.0, config.request_timeout.0, config.max_concurrent_jobs, diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index c526c3f..437a1d3 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -2,7 +2,9 @@ use std::collections; use std::sync::Arc; use std::time; +use futures::future::join_all; use hook_common::health::HealthHandle; +use hook_common::pgqueue::{PgBatch, PgTransactionBatch}; use hook_common::{ pgqueue::{Job, PgJob, PgJobError, PgQueue, PgQueueError, PgQueueJob, PgTransactionJob}, retry::RetryPolicy, @@ -68,6 +70,8 @@ pub struct WebhookWorker<'p> { name: String, /// The queue we will be dequeuing jobs from. queue: &'p PgQueue, + /// The maximum number of jobs to dequeue in one query. + dequeue_batch_size: u32, /// The interval for polling the queue. poll_interval: time::Duration, /// The client used for HTTP requests. @@ -81,9 +85,11 @@ pub struct WebhookWorker<'p> { } impl<'p> WebhookWorker<'p> { + #[allow(clippy::too_many_arguments)] pub fn new( name: &str, queue: &'p PgQueue, + dequeue_batch_size: u32, poll_interval: time::Duration, request_timeout: time::Duration, max_concurrent_jobs: usize, @@ -106,6 +112,7 @@ impl<'p> WebhookWorker<'p> { Self { name: name.to_owned(), queue, + dequeue_batch_size, poll_interval, client, max_concurrent_jobs, @@ -114,16 +121,20 @@ impl<'p> WebhookWorker<'p> { } } - /// Wait until a job becomes available in our queue. - async fn wait_for_job<'a>(&self) -> PgJob { + /// Wait until at least one job becomes available in our queue. + async fn wait_for_jobs<'a>(&self) -> PgBatch { let mut interval = tokio::time::interval(self.poll_interval); loop { interval.tick().await; self.liveness.report_healthy().await; - match self.queue.dequeue(&self.name).await { - Ok(Some(job)) => return job, + match self + .queue + .dequeue(&self.name, self.dequeue_batch_size) + .await + { + Ok(Some(batch)) => return batch, Ok(None) => continue, Err(error) => { error!("error while trying to dequeue job: {}", error); @@ -133,18 +144,22 @@ impl<'p> WebhookWorker<'p> { } } - /// Wait until a job becomes available in our queue in transactional mode. - async fn wait_for_job_tx<'a>( + /// Wait until at least one job becomes available in our queue in transactional mode. + async fn wait_for_jobs_tx<'a>( &self, - ) -> PgTransactionJob<'a, WebhookJobParameters, WebhookJobMetadata> { + ) -> PgTransactionBatch<'a, WebhookJobParameters, WebhookJobMetadata> { let mut interval = tokio::time::interval(self.poll_interval); loop { interval.tick().await; self.liveness.report_healthy().await; - match self.queue.dequeue_tx(&self.name).await { - Ok(Some(job)) => return job, + match self + .queue + .dequeue_tx(&self.name, self.dequeue_batch_size) + .await + { + Ok(Some(batch)) => return batch, Ok(None) => continue, Err(error) => { error!("error while trying to dequeue_tx job: {}", error); @@ -162,70 +177,104 @@ impl<'p> WebhookWorker<'p> { .set(1f64 - semaphore.available_permits() as f64 / self.max_concurrent_jobs as f64); }; + let dequeue_batch_size_histogram = metrics::histogram!("webhook_dequeue_batch_size"); + if transactional { loop { report_semaphore_utilization(); - let webhook_job = self.wait_for_job_tx().await; - spawn_webhook_job_processing_task( - self.client.clone(), - semaphore.clone(), - self.retry_policy.clone(), - webhook_job, - ) - .await; + // TODO: We could grab semaphore permits here using something like: + // `min(semaphore.available_permits(), dequeue_batch_size)` + // And then dequeue only up to that many jobs. We'd then need to hand back the + // difference in permits based on how many jobs were dequeued. + let mut batch = self.wait_for_jobs_tx().await; + dequeue_batch_size_histogram.record(batch.jobs.len() as f64); + + // Get enough permits for the jobs before spawning a task. + let permits = semaphore + .clone() + .acquire_many_owned(batch.jobs.len() as u32) + .await + .expect("semaphore has been closed"); + + let client = self.client.clone(); + let retry_policy = self.retry_policy.clone(); + + tokio::spawn(async move { + let mut futures = Vec::new(); + + // We have to `take` the Vec of jobs from the batch to avoid a borrow checker + // error below when we commit. + for job in std::mem::take(&mut batch.jobs) { + let client = client.clone(); + let retry_policy = retry_policy.clone(); + + let future = + async move { process_webhook_job(client, job, &retry_policy).await }; + + futures.push(future); + } + + let results = join_all(futures).await; + for result in results { + if let Err(e) = result { + error!("error processing webhook job: {}", e); + } + } + + let _ = batch.commit().await.map_err(|e| { + error!("error committing transactional batch: {}", e); + }); + + drop(permits); + }); } } else { loop { report_semaphore_utilization(); - let webhook_job = self.wait_for_job().await; - spawn_webhook_job_processing_task( - self.client.clone(), - semaphore.clone(), - self.retry_policy.clone(), - webhook_job, - ) - .await; + // TODO: We could grab semaphore permits here using something like: + // `min(semaphore.available_permits(), dequeue_batch_size)` + // And then dequeue only up to that many jobs. We'd then need to hand back the + // difference in permits based on how many jobs were dequeued. + let batch = self.wait_for_jobs().await; + dequeue_batch_size_histogram.record(batch.jobs.len() as f64); + + // Get enough permits for the jobs before spawning a task. + let permits = semaphore + .clone() + .acquire_many_owned(batch.jobs.len() as u32) + .await + .expect("semaphore has been closed"); + + let client = self.client.clone(); + let retry_policy = self.retry_policy.clone(); + + tokio::spawn(async move { + let mut futures = Vec::new(); + + for job in batch.jobs { + let client = client.clone(); + let retry_policy = retry_policy.clone(); + + let future = + async move { process_webhook_job(client, job, &retry_policy).await }; + + futures.push(future); + } + + let results = join_all(futures).await; + for result in results { + if let Err(e) = result { + error!("error processing webhook job: {}", e); + } + } + + drop(permits); + }); } } } } -/// Spawn a Tokio task to process a Webhook Job once we successfully acquire a permit. -/// -/// # Arguments -/// -/// * `client`: An HTTP client to execute the webhook job request. -/// * `semaphore`: A semaphore used for rate limiting purposes. This function will panic if this semaphore is closed. -/// * `retry_policy`: The retry policy used to set retry parameters if a job fails and has remaining attempts. -/// * `webhook_job`: The webhook job to process as dequeued from `hook_common::pgqueue::PgQueue`. -async fn spawn_webhook_job_processing_task( - client: reqwest::Client, - semaphore: Arc, - retry_policy: RetryPolicy, - webhook_job: W, -) -> tokio::task::JoinHandle> { - let permit = semaphore - .acquire_owned() - .await - .expect("semaphore has been closed"); - - let labels = [("queue", webhook_job.queue())]; - - metrics::counter!("webhook_jobs_total", &labels).increment(1); - - tokio::spawn(async move { - let result = process_webhook_job(client, webhook_job, &retry_policy).await; - drop(permit); - match result { - Ok(_) => Ok(()), - Err(error) => { - error!("failed to process webhook job: {}", error); - Err(error) - } - } - }) -} - /// Process a webhook job by transitioning it to its appropriate state after its request is sent. /// After we finish, the webhook job will be set as completed (if the request was successful), retryable (if the request /// was unsuccessful but we can still attempt a retry), or failed (if the request was unsuccessful and no more retries @@ -248,6 +297,7 @@ async fn process_webhook_job( let parameters = webhook_job.parameters(); let labels = [("queue", webhook_job.queue())]; + metrics::counter!("webhook_jobs_total", &labels).increment(1); let now = tokio::time::Instant::now(); @@ -543,6 +593,7 @@ mod tests { let worker = WebhookWorker::new( &worker_id, &queue, + 1, time::Duration::from_millis(100), time::Duration::from_millis(5000), 10, @@ -550,7 +601,7 @@ mod tests { liveness, ); - let consumed_job = worker.wait_for_job().await; + let consumed_job = worker.wait_for_jobs().await.jobs.pop().unwrap(); assert_eq!(consumed_job.job.attempt, 1); assert!(consumed_job.job.attempted_by.contains(&worker_id));