From 26672aeb2c8d114e3f82c41916e935a3d753ee8a Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 6 Feb 2024 08:36:10 -0700 Subject: [PATCH] Remove non-transactional mode (#65) --- hook-common/src/pgqueue.rs | 321 +----------------- hook-janitor/src/fixtures/webhook_cleanup.sql | 11 - hook-janitor/src/webhooks.rs | 78 +---- hook-worker/src/config.rs | 3 - hook-worker/src/main.rs | 2 +- hook-worker/src/worker.rs | 173 +++------- 6 files changed, 77 insertions(+), 511 deletions(-) diff --git a/hook-common/src/pgqueue.rs b/hook-common/src/pgqueue.rs index af91fbd..4a8b489 100644 --- a/hook-common/src/pgqueue.rs +++ b/hook-common/src/pgqueue.rs @@ -60,8 +60,6 @@ pub enum JobStatus { Discarded, /// A job that was unsuccessfully completed by a worker. Failed, - /// A job that was picked up by a worker and it's currentlly being run. - Running, } /// Allow casting JobStatus from strings. @@ -73,7 +71,6 @@ impl FromStr for JobStatus { "available" => Ok(JobStatus::Available), "completed" => Ok(JobStatus::Completed), "failed" => Ok(JobStatus::Failed), - "running" => Ok(JobStatus::Running), invalid => Err(PgQueueError::ParseJobStatusError(invalid.to_owned())), } } @@ -222,95 +219,6 @@ pub trait PgQueueJob { ) -> Result>>; } -/// A Job that can be updated in PostgreSQL. -#[derive(Debug)] -pub struct PgJob { - pub job: Job, - pub pool: PgPool, -} - -// Container struct for a batch of PgJobs. -pub struct PgBatch { - pub jobs: Vec>, -} - -impl PgJob { - async fn acquire_conn( - &mut self, - ) -> Result, PgJobError>>> - { - self.pool - .acquire() - .await - .map_err(|error| PgJobError::ConnectionError { error }) - } -} - -#[async_trait] -impl PgQueueJob for PgJob { - async fn complete(mut self) -> Result>>> { - let mut connection = self.acquire_conn().await?; - - let completed_job = - self.job - .complete(&mut *connection) - .await - .map_err(|error| PgJobError::QueryError { - command: "UPDATE".to_owned(), - error, - })?; - - Ok(completed_job) - } - - async fn fail( - mut self, - error: E, - ) -> Result, PgJobError>>> { - let mut connection = self.acquire_conn().await?; - - let failed_job = self - .job - .fail(error, &mut *connection) - .await - .map_err(|error| PgJobError::QueryError { - command: "UPDATE".to_owned(), - error, - })?; - - Ok(failed_job) - } - - async fn retry( - mut self, - error: E, - retry_interval: time::Duration, - queue: &str, - ) -> Result>>> { - if self.job.is_gte_max_attempts() { - return Err(PgJobError::RetryInvalidError { - job: Box::new(self), - error: "Maximum attempts reached".to_owned(), - }); - } - - let mut connection = self.acquire_conn().await?; - - let retried_job = self - .job - .retryable() - .queue(queue) - .retry(error, retry_interval, &mut *connection) - .await - .map_err(|error| PgJobError::QueryError { - command: "UPDATE".to_owned(), - error, - })?; - - Ok(retried_job) - } -} - /// A Job within an open PostgreSQL transaction. /// This implementation allows 'hiding' the job from any other workers running SKIP LOCKED queries. #[derive(Debug)] @@ -611,96 +519,6 @@ impl PgQueue { Ok(Self { name, pool }) } - /// Dequeue up to `limit` `Job`s from this `PgQueue`. - /// The `Job`s will be updated to `'running'` status, so any other `dequeue` calls will skip it. - pub async fn dequeue< - J: for<'d> serde::Deserialize<'d> + std::marker::Send + std::marker::Unpin + 'static, - M: for<'d> serde::Deserialize<'d> + std::marker::Send + std::marker::Unpin + 'static, - >( - &self, - attempted_by: &str, - limit: u32, - ) -> PgQueueResult>> { - let mut connection = self - .pool - .acquire() - .await - .map_err(|error| PgQueueError::ConnectionError { error })?; - - // The query that follows uses a FOR UPDATE SKIP LOCKED clause. - // For more details on this see: 2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5. - let base_query = r#" -WITH available_in_queue AS ( - SELECT - id - FROM - job_queue - WHERE - status = 'available' - AND scheduled_at <= NOW() - AND queue = $1 - ORDER BY - attempt, - scheduled_at - LIMIT $2 - FOR UPDATE SKIP LOCKED -) -UPDATE - job_queue -SET - attempted_at = NOW(), - status = 'running'::job_status, - attempt = attempt + 1, - attempted_by = array_append(attempted_by, $3::text) -FROM - available_in_queue -WHERE - job_queue.id = available_in_queue.id -RETURNING - job_queue.* - "#; - - let query_result: Result>, sqlx::Error> = sqlx::query_as(base_query) - .bind(&self.name) - .bind(limit as i64) - .bind(attempted_by) - .fetch_all(&mut *connection) - .await; - - match query_result { - Ok(jobs) => { - if jobs.is_empty() { - return Ok(None); - } - - let pg_jobs: Vec> = jobs - .into_iter() - .map(|job| PgJob { - job, - pool: self.pool.clone(), - }) - .collect(); - - Ok(Some(PgBatch { jobs: pg_jobs })) - } - - // Although connection would be closed once it goes out of scope, sqlx recommends explicitly calling close(). - // See: https://docs.rs/sqlx/latest/sqlx/postgres/any/trait.AnyConnectionBackend.html#tymethod.close. - Err(sqlx::Error::RowNotFound) => { - let _ = connection.close().await; - Ok(None) - } - - Err(e) => { - let _ = connection.close().await; - Err(PgQueueError::QueryError { - command: "UPDATE".to_owned(), - error: e, - }) - } - } - } - /// Dequeue up to `limit` `Job`s from this `PgQueue` and hold the transaction. Any other /// `dequeue_tx` calls will skip rows locked, so by holding a transaction we ensure only one /// worker can dequeue a job. Holding a transaction open can have performance implications, but @@ -742,7 +560,6 @@ UPDATE job_queue SET attempted_at = NOW(), - status = 'running'::job_status, attempt = attempt + 1, attempted_by = array_append(attempted_by, $3::text) FROM @@ -875,103 +692,6 @@ mod tests { "https://myhost/endpoint".to_owned() } - #[sqlx::test(migrations = "../migrations")] - async fn test_can_dequeue_job(db: PgPool) { - let job_target = job_target(); - let job_parameters = JobParameters::default(); - let job_metadata = JobMetadata::default(); - let worker_id = worker_id(); - let new_job = NewJob::new(1, job_metadata, job_parameters, &job_target); - - let queue = PgQueue::new_from_pool("test_can_dequeue_job", db) - .await - .expect("failed to connect to local test postgresql database"); - - queue.enqueue(new_job).await.expect("failed to enqueue job"); - - let pg_job: PgJob = queue - .dequeue(&worker_id, 1) - .await - .expect("failed to dequeue jobs") - .expect("didn't find any jobs to dequeue") - .jobs - .pop() - .unwrap(); - - assert_eq!(pg_job.job.attempt, 1); - assert!(pg_job.job.attempted_by.contains(&worker_id)); - assert_eq!(pg_job.job.attempted_by.len(), 1); - assert_eq!(pg_job.job.max_attempts, 1); - assert_eq!(*pg_job.job.parameters.as_ref(), JobParameters::default()); - assert_eq!(pg_job.job.status, JobStatus::Running); - assert_eq!(pg_job.job.target, job_target); - } - - #[sqlx::test(migrations = "../migrations")] - async fn test_dequeue_returns_none_on_no_jobs(db: PgPool) { - let worker_id = worker_id(); - let queue = PgQueue::new_from_pool("test_dequeue_returns_none_on_no_jobs", db) - .await - .expect("failed to connect to local test postgresql database"); - - let pg_jobs: Option> = queue - .dequeue(&worker_id, 1) - .await - .expect("failed to dequeue jobs"); - - assert!(pg_jobs.is_none()); - } - - #[sqlx::test(migrations = "../migrations")] - async fn test_can_dequeue_multiple_jobs(db: PgPool) { - let job_target = job_target(); - let job_metadata = JobMetadata::default(); - let job_parameters = JobParameters::default(); - let worker_id = worker_id(); - - let queue = PgQueue::new_from_pool("test_can_dequeue_multiple_jobs", db) - .await - .expect("failed to connect to local test postgresql database"); - - for _ in 0..5 { - queue - .enqueue(NewJob::new( - 1, - job_metadata.clone(), - job_parameters.clone(), - &job_target, - )) - .await - .expect("failed to enqueue job"); - } - - // Only get 4 jobs, leaving one in the queue. - let limit = 4; - let batch: PgBatch = queue - .dequeue(&worker_id, limit) - .await - .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); - - // Complete those 4. - assert_eq!(batch.jobs.len(), limit as usize); - for job in batch.jobs { - job.complete().await.expect("failed to complete job"); - } - - // Try to get up to 4 jobs, but only 1 remains. - let batch: PgBatch = queue - .dequeue(&worker_id, limit) - .await - .expect("failed to dequeue job") - .expect("didn't find a job to dequeue"); - - assert_eq!(batch.jobs.len(), 1); // Only one job should have been left in the queue. - for job in batch.jobs { - job.complete().await.expect("failed to complete job"); - } - } - #[sqlx::test(migrations = "../migrations")] async fn test_can_dequeue_tx_job(db: PgPool) { let job_target = job_target(); @@ -1000,7 +720,6 @@ mod tests { assert_eq!(tx_job.job.max_attempts, 1); assert_eq!(*tx_job.job.metadata.as_ref(), JobMetadata::default()); assert_eq!(*tx_job.job.parameters.as_ref(), JobParameters::default()); - assert_eq!(tx_job.job.status, JobStatus::Running); assert_eq!(tx_job.job.target, job_target); // Transactional jobs must be completed, failed or retried before being dropped. This is @@ -1096,14 +815,12 @@ mod tests { .expect("failed to connect to local test postgresql database"); queue.enqueue(new_job).await.expect("failed to enqueue job"); - let job: PgJob = queue - .dequeue(&worker_id, 1) + let mut batch: PgTransactionBatch<'_, JobParameters, JobMetadata> = queue + .dequeue_tx(&worker_id, 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue") - .jobs - .pop() - .unwrap(); + .expect("didn't find a job to dequeue"); + let job = batch.jobs.pop().unwrap(); let retry_interval = retry_policy.retry_interval(job.job.attempt as u32, None); let retry_queue = retry_policy.retry_queue(&job.job.queue).to_owned(); @@ -1115,9 +832,10 @@ mod tests { ) .await .expect("failed to retry job"); + batch.commit().await.expect("failed to commit transaction"); - let retried_job: PgJob = queue - .dequeue(&worker_id, 1) + let retried_job: PgTransactionJob = queue + .dequeue_tx(&worker_id, 1) .await .expect("failed to dequeue job") .expect("didn't find retried job to dequeue") @@ -1133,7 +851,6 @@ mod tests { *retried_job.job.parameters.as_ref(), JobParameters::default() ); - assert_eq!(retried_job.job.status, JobStatus::Running); assert_eq!(retried_job.job.target, job_target); } @@ -1156,14 +873,12 @@ mod tests { .expect("failed to connect to queue in local test postgresql database"); queue.enqueue(new_job).await.expect("failed to enqueue job"); - let job: PgJob = queue - .dequeue(&worker_id, 1) + let mut batch: PgTransactionBatch = queue + .dequeue_tx(&worker_id, 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue") - .jobs - .pop() - .unwrap(); + .expect("didn't find a job to dequeue"); + let job = batch.jobs.pop().unwrap(); let retry_interval = retry_policy.retry_interval(job.job.attempt as u32, None); let retry_queue = retry_policy.retry_queue(&job.job.queue).to_owned(); @@ -1175,9 +890,10 @@ mod tests { ) .await .expect("failed to retry job"); + batch.commit().await.expect("failed to commit transaction"); - let retried_job_not_found: Option> = queue - .dequeue(&worker_id, 1) + let retried_job_not_found: Option> = queue + .dequeue_tx(&worker_id, 1) .await .expect("failed to dequeue job"); @@ -1187,8 +903,8 @@ mod tests { .await .expect("failed to connect to retry queue in local test postgresql database"); - let retried_job: PgJob = queue - .dequeue(&worker_id, 1) + let retried_job: PgTransactionJob = queue + .dequeue_tx(&worker_id, 1) .await .expect("failed to dequeue job") .expect("job not found in retry queue") @@ -1204,7 +920,6 @@ mod tests { *retried_job.job.parameters.as_ref(), JobParameters::default() ); - assert_eq!(retried_job.job.status, JobStatus::Running); assert_eq!(retried_job.job.target, job_target); } @@ -1224,8 +939,8 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); - let job: PgJob = queue - .dequeue(&worker_id, 1) + let job: PgTransactionJob = queue + .dequeue_tx(&worker_id, 1) .await .expect("failed to dequeue job") .expect("didn't find a job to dequeue") diff --git a/hook-janitor/src/fixtures/webhook_cleanup.sql b/hook-janitor/src/fixtures/webhook_cleanup.sql index 5dfa827..e0b9a7a 100644 --- a/hook-janitor/src/fixtures/webhook_cleanup.sql +++ b/hook-janitor/src/fixtures/webhook_cleanup.sql @@ -163,15 +163,4 @@ VALUES 'webhooks', 'available', 'https://myhost/endpoint' - ), - -- team:1, plugin_config:2, running - ( - NULL, - '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', - '2023-12-19 20:01:18.799371+00', - now() - '1 hour' :: interval, - '{}', - 'webhooks', - 'running', - 'https://myhost/endpoint' ); \ No newline at end of file diff --git a/hook-janitor/src/webhooks.rs b/hook-janitor/src/webhooks.rs index e3b137c..7f7fadd 100644 --- a/hook-janitor/src/webhooks.rs +++ b/hook-janitor/src/webhooks.rs @@ -28,8 +28,6 @@ pub enum WebhookCleanerError { AcquireConnError { error: sqlx::Error }, #[error("failed to acquire conn and start txn: {error}")] StartTxnError { error: sqlx::Error }, - #[error("failed to reschedule stuck jobs: {error}")] - RescheduleStuckJobsError { error: sqlx::Error }, #[error("failed to get queue depth: {error}")] GetQueueDepthError { error: sqlx::Error }, #[error("failed to get row count: {error}")] @@ -145,7 +143,6 @@ impl From for AppMetric { struct SerializableTxn<'a>(Transaction<'a, Postgres>); struct CleanupStats { - jobs_unstuck_count: u64, rows_processed: u64, completed_row_count: u64, completed_agg_row_count: u64, @@ -186,45 +183,6 @@ impl WebhookCleaner { }) } - async fn reschedule_stuck_jobs(&self) -> Result { - let mut conn = self - .pg_pool - .acquire() - .await - .map_err(|e| WebhookCleanerError::AcquireConnError { error: e })?; - - // The "non-transactional" worker runs the risk of crashing and leaving jobs permanently in - // the `running` state. This query will reschedule any jobs that have been in the running - // state for more than 2 minutes (which is *much* longer than we expect any Webhook job to - // take). - // - // We don't need to increment the `attempt` counter here because the worker already did that - // when it moved the job into `running`. - // - // If the previous worker was somehow stalled for 2 minutes and completes the task, that - // will mean we sent duplicate Webhooks. Success stats should not be affected, since both - // will update the same job row, which will only be processed once by the janitor. - - let base_query = r#" - UPDATE - job_queue - SET - status = 'available'::job_status, - last_attempt_finished_at = NOW(), - scheduled_at = NOW() - WHERE - status = 'running'::job_status - AND attempted_at < NOW() - INTERVAL '2 minutes' - "#; - - let result = sqlx::query(base_query) - .execute(&mut *conn) - .await - .map_err(|e| WebhookCleanerError::RescheduleStuckJobsError { error: e })?; - - Ok(result.rows_affected()) - } - async fn get_queue_depth(&self) -> Result { let mut conn = self .pg_pool @@ -424,8 +382,6 @@ impl WebhookCleaner { let untried_status = [("status", "untried")]; let retries_status = [("status", "retries")]; - let jobs_unstuck_count = self.reschedule_stuck_jobs().await?; - let queue_depth = self.get_queue_depth().await?; metrics::gauge!("queue_depth_oldest_scheduled", &untried_status) .set(queue_depth.oldest_scheduled_at_untried.timestamp() as f64); @@ -479,7 +435,6 @@ impl WebhookCleaner { } Ok(CleanupStats { - jobs_unstuck_count, rows_processed: rows_deleted, completed_row_count, completed_agg_row_count, @@ -500,8 +455,6 @@ impl Cleaner for WebhookCleaner { metrics::counter!("webhook_cleanup_success",).increment(1); metrics::gauge!("webhook_cleanup_last_success_timestamp",) .set(get_current_timestamp_seconds()); - metrics::counter!("webhook_cleanup_jobs_unstuck") - .increment(stats.jobs_unstuck_count); if stats.rows_processed > 0 { let elapsed_time = start_time.elapsed().as_secs_f64(); @@ -546,7 +499,8 @@ mod tests { use hook_common::kafka_messages::app_metrics::{ Error as WebhookError, ErrorDetails, ErrorType, }; - use hook_common::pgqueue::{NewJob, PgJob, PgQueue, PgQueueJob}; + use hook_common::pgqueue::PgQueueJob; + use hook_common::pgqueue::{NewJob, PgQueue, PgTransactionBatch}; use hook_common::webhook::{HttpMethod, WebhookJobMetadata, WebhookJobParameters}; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::mocking::MockCluster; @@ -624,9 +578,6 @@ mod tests { .await .expect("webbook cleanup_impl failed"); - // The one 'running' job is transitioned to 'available'. - assert_eq!(cleanup_stats.jobs_unstuck_count, 1); - // Rows that are not 'completed' or 'failed' should not be processed. assert_eq!(cleanup_stats.rows_processed, 13); @@ -821,7 +772,6 @@ mod tests { .expect("webbook cleanup_impl failed"); // Reported metrics are all zeroes - assert_eq!(cleanup_stats.jobs_unstuck_count, 0); assert_eq!(cleanup_stats.rows_processed, 0); assert_eq!(cleanup_stats.completed_row_count, 0); assert_eq!(cleanup_stats.completed_agg_row_count, 0); @@ -865,22 +815,20 @@ mod tests { assert_eq!(get_count_from_new_conn(&db, "completed").await, 6); assert_eq!(get_count_from_new_conn(&db, "failed").await, 7); assert_eq!(get_count_from_new_conn(&db, "available").await, 1); - assert_eq!(get_count_from_new_conn(&db, "running").await, 1); { // The fixtures include an available job, so let's complete it while the txn is open. - let webhook_job: PgJob = queue - .dequeue(&"worker_id", 1) + let mut batch: PgTransactionBatch<'_, WebhookJobParameters, WebhookJobMetadata> = queue + .dequeue_tx(&"worker_id", 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue") - .jobs - .pop() - .unwrap(); + .expect("didn't find a job to dequeue"); + let webhook_job = batch.jobs.pop().unwrap(); webhook_job .complete() .await .expect("failed to complete job"); + batch.commit().await.expect("failed to commit batch"); } { @@ -898,18 +846,17 @@ mod tests { }; let new_job = NewJob::new(1, job_metadata, job_parameters, &"target"); queue.enqueue(new_job).await.expect("failed to enqueue job"); - let webhook_job: PgJob = queue - .dequeue(&"worker_id", 1) + let mut batch: PgTransactionBatch<'_, WebhookJobParameters, WebhookJobMetadata> = queue + .dequeue_tx(&"worker_id", 1) .await .expect("failed to dequeue job") - .expect("didn't find a job to dequeue") - .jobs - .pop() - .unwrap(); + .expect("didn't find a job to dequeue"); + let webhook_job = batch.jobs.pop().unwrap(); webhook_job .complete() .await .expect("failed to complete job"); + batch.commit().await.expect("failed to commit batch"); } { @@ -950,6 +897,5 @@ mod tests { assert_eq!(get_count_from_new_conn(&db, "completed").await, 2); assert_eq!(get_count_from_new_conn(&db, "failed").await, 0); assert_eq!(get_count_from_new_conn(&db, "available").await, 1); - assert_eq!(get_count_from_new_conn(&db, "running").await, 1); } } diff --git a/hook-worker/src/config.rs b/hook-worker/src/config.rs index 32e49f7..ceb690f 100644 --- a/hook-worker/src/config.rs +++ b/hook-worker/src/config.rs @@ -35,9 +35,6 @@ pub struct Config { #[envconfig(nested = true)] pub retry_policy: RetryPolicyConfig, - #[envconfig(default = "true")] - pub transactional: bool, - #[envconfig(default = "1")] pub dequeue_batch_size: u32, } diff --git a/hook-worker/src/main.rs b/hook-worker/src/main.rs index fede7d2..2997dfc 100644 --- a/hook-worker/src/main.rs +++ b/hook-worker/src/main.rs @@ -67,7 +67,7 @@ async fn main() -> Result<(), WorkerError> { .expect("failed to start serving metrics"); }); - worker.run(config.transactional).await; + worker.run().await; Ok(()) } diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 437a1d3..b83c909 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -4,9 +4,9 @@ use std::time; use futures::future::join_all; use hook_common::health::HealthHandle; -use hook_common::pgqueue::{PgBatch, PgTransactionBatch}; +use hook_common::pgqueue::PgTransactionBatch; use hook_common::{ - pgqueue::{Job, PgJob, PgJobError, PgQueue, PgQueueError, PgQueueJob, PgTransactionJob}, + pgqueue::{Job, PgJobError, PgQueue, PgQueueError, PgQueueJob, PgTransactionJob}, retry::RetryPolicy, webhook::{HttpMethod, WebhookJobError, WebhookJobMetadata, WebhookJobParameters}, }; @@ -50,20 +50,6 @@ impl WebhookJob for PgTransactionJob<'_, WebhookJobParameters, WebhookJobMetadat } } -impl WebhookJob for PgJob { - fn parameters(&self) -> &WebhookJobParameters { - &self.job.parameters - } - - fn metadata(&self) -> &WebhookJobMetadata { - &self.job.metadata - } - - fn job(&self) -> &Job { - &self.job - } -} - /// A worker to poll `PgQueue` and spawn tasks to process webhooks when a job becomes available. pub struct WebhookWorker<'p> { /// An identifier for this worker. Used to mark jobs we have consumed. @@ -121,29 +107,6 @@ impl<'p> WebhookWorker<'p> { } } - /// Wait until at least one job becomes available in our queue. - async fn wait_for_jobs<'a>(&self) -> PgBatch { - let mut interval = tokio::time::interval(self.poll_interval); - - loop { - interval.tick().await; - self.liveness.report_healthy().await; - - match self - .queue - .dequeue(&self.name, self.dequeue_batch_size) - .await - { - Ok(Some(batch)) => return batch, - Ok(None) => continue, - Err(error) => { - error!("error while trying to dequeue job: {}", error); - continue; - } - } - } - } - /// Wait until at least one job becomes available in our queue in transactional mode. async fn wait_for_jobs_tx<'a>( &self, @@ -170,7 +133,7 @@ impl<'p> WebhookWorker<'p> { } /// Run this worker to continuously process any jobs that become available. - pub async fn run(&self, transactional: bool) { + pub async fn run(&self) { let semaphore = Arc::new(sync::Semaphore::new(self.max_concurrent_jobs)); let report_semaphore_utilization = || { metrics::gauge!("webhook_worker_saturation_percent") @@ -179,98 +142,53 @@ impl<'p> WebhookWorker<'p> { let dequeue_batch_size_histogram = metrics::histogram!("webhook_dequeue_batch_size"); - if transactional { - loop { - report_semaphore_utilization(); - // TODO: We could grab semaphore permits here using something like: - // `min(semaphore.available_permits(), dequeue_batch_size)` - // And then dequeue only up to that many jobs. We'd then need to hand back the - // difference in permits based on how many jobs were dequeued. - let mut batch = self.wait_for_jobs_tx().await; - dequeue_batch_size_histogram.record(batch.jobs.len() as f64); - - // Get enough permits for the jobs before spawning a task. - let permits = semaphore - .clone() - .acquire_many_owned(batch.jobs.len() as u32) - .await - .expect("semaphore has been closed"); - - let client = self.client.clone(); - let retry_policy = self.retry_policy.clone(); - - tokio::spawn(async move { - let mut futures = Vec::new(); - - // We have to `take` the Vec of jobs from the batch to avoid a borrow checker - // error below when we commit. - for job in std::mem::take(&mut batch.jobs) { - let client = client.clone(); - let retry_policy = retry_policy.clone(); - - let future = - async move { process_webhook_job(client, job, &retry_policy).await }; - - futures.push(future); - } + loop { + report_semaphore_utilization(); + // TODO: We could grab semaphore permits here using something like: + // `min(semaphore.available_permits(), dequeue_batch_size)` + // And then dequeue only up to that many jobs. We'd then need to hand back the + // difference in permits based on how many jobs were dequeued. + let mut batch = self.wait_for_jobs_tx().await; + dequeue_batch_size_histogram.record(batch.jobs.len() as f64); + + // Get enough permits for the jobs before spawning a task. + let permits = semaphore + .clone() + .acquire_many_owned(batch.jobs.len() as u32) + .await + .expect("semaphore has been closed"); - let results = join_all(futures).await; - for result in results { - if let Err(e) = result { - error!("error processing webhook job: {}", e); - } - } + let client = self.client.clone(); + let retry_policy = self.retry_policy.clone(); - let _ = batch.commit().await.map_err(|e| { - error!("error committing transactional batch: {}", e); - }); + tokio::spawn(async move { + let mut futures = Vec::new(); - drop(permits); - }); - } - } else { - loop { - report_semaphore_utilization(); - // TODO: We could grab semaphore permits here using something like: - // `min(semaphore.available_permits(), dequeue_batch_size)` - // And then dequeue only up to that many jobs. We'd then need to hand back the - // difference in permits based on how many jobs were dequeued. - let batch = self.wait_for_jobs().await; - dequeue_batch_size_histogram.record(batch.jobs.len() as f64); - - // Get enough permits for the jobs before spawning a task. - let permits = semaphore - .clone() - .acquire_many_owned(batch.jobs.len() as u32) - .await - .expect("semaphore has been closed"); - - let client = self.client.clone(); - let retry_policy = self.retry_policy.clone(); - - tokio::spawn(async move { - let mut futures = Vec::new(); - - for job in batch.jobs { - let client = client.clone(); - let retry_policy = retry_policy.clone(); - - let future = - async move { process_webhook_job(client, job, &retry_policy).await }; - - futures.push(future); - } + // We have to `take` the Vec of jobs from the batch to avoid a borrow checker + // error below when we commit. + for job in std::mem::take(&mut batch.jobs) { + let client = client.clone(); + let retry_policy = retry_policy.clone(); + + let future = + async move { process_webhook_job(client, job, &retry_policy).await }; - let results = join_all(futures).await; - for result in results { - if let Err(e) = result { - error!("error processing webhook job: {}", e); - } + futures.push(future); + } + + let results = join_all(futures).await; + for result in results { + if let Err(e) = result { + error!("error processing webhook job: {}", e); } + } - drop(permits); + let _ = batch.commit().await.map_err(|e| { + error!("error committing transactional batch: {}", e); }); - } + + drop(permits); + }); } } } @@ -601,7 +519,8 @@ mod tests { liveness, ); - let consumed_job = worker.wait_for_jobs().await.jobs.pop().unwrap(); + let mut batch = worker.wait_for_jobs_tx().await; + let consumed_job = batch.jobs.pop().unwrap(); assert_eq!(consumed_job.job.attempt, 1); assert!(consumed_job.job.attempted_by.contains(&worker_id)); @@ -611,13 +530,13 @@ mod tests { *consumed_job.job.parameters.as_ref(), webhook_job_parameters ); - assert_eq!(consumed_job.job.status, JobStatus::Running); assert_eq!(consumed_job.job.target, webhook_job_parameters.url); consumed_job .complete() .await .expect("job not successfully completed"); + batch.commit().await.expect("failed to commit batch"); assert!(registry.get_status().healthy) }