From f614660ffa62172711d298ec5b051b471df10a3e Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 5 Feb 2024 10:00:20 -0700 Subject: [PATCH] add semaphore todo --- hook-worker/src/worker.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 605cde5..437a1d3 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -182,6 +182,10 @@ impl<'p> WebhookWorker<'p> { if transactional { loop { report_semaphore_utilization(); + // TODO: We could grab semaphore permits here using something like: + // `min(semaphore.available_permits(), dequeue_batch_size)` + // And then dequeue only up to that many jobs. We'd then need to hand back the + // difference in permits based on how many jobs were dequeued. let mut batch = self.wait_for_jobs_tx().await; dequeue_batch_size_histogram.record(batch.jobs.len() as f64); @@ -227,6 +231,10 @@ impl<'p> WebhookWorker<'p> { } else { loop { report_semaphore_utilization(); + // TODO: We could grab semaphore permits here using something like: + // `min(semaphore.available_permits(), dequeue_batch_size)` + // And then dequeue only up to that many jobs. We'd then need to hand back the + // difference in permits based on how many jobs were dequeued. let batch = self.wait_for_jobs().await; dequeue_batch_size_histogram.record(batch.jobs.len() as f64);