Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Dequeue multiple items at a time #60

Merged
merged 6 commits into from
Feb 6, 2024
Merged

Dequeue multiple items at a time #60

merged 6 commits into from
Feb 6, 2024

Conversation

bretthoerner
Copy link
Contributor

@bretthoerner bretthoerner commented Jan 31, 2024

This amortizes the work done in both querying PG, and in the number of Tokio tasks spawned (we spawn 1 per batch rather than 1 per job).

The gist is that PgJob and PgTransactionJob are now wrapped in PgBatch and PgTransactionBatch respectively. Each contains a Vec of jobs. dequeue takes a limit parameter and returns up to that many jobs at once.

For non-txn jobs, everything is easy and mostly the same. I no longer hold a connection open for non-txn jobs (which is, to me, the benefit of non-txn jobs), so each job completes, then grabs a conn from the pool and does the query to finalize its work.

For txn jobs, we need to share the single txn object among them, and with the batch itself. Each one completes and grabs the txn via a mutex to do its work. After all work is done, the top-level Worker code is what calls commit.

In the future both of these could be optimized: the Worker code could do all Webhook stuff in parallel, and then updating the DB could be done in a batch query (or queries) rather than 1 per job. I'd rather do that as part of a future PR.

@@ -165,65 +172,87 @@ impl<'p> WebhookWorker<'p> {
if transactional {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the duplicated code for txn vs non-txn predated this PR. Refactoring to not have any duplication would be a lot of work, I think, and maybe/hopefully we'll drop one of these modes soon anyway?

@bretthoerner bretthoerner requested a review from a team January 31, 2024 13:28
Copy link
Contributor

@xvello xvello left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Not approving as I think this deserves another pair of eyes.

I think we're digging our own grave with keeping two parallel implementations. I'd support dropping non-tx for now and switching to it later if we find that tx breaks some of our scalability assumptions.

Right now, I would not be comfortable switching prod to non-tx, so it's almost as good as dead code.

hook-worker/src/config.rs Outdated Show resolved Hide resolved
hook-common/src/pgqueue.rs Show resolved Hide resolved
hook-common/src/pgqueue.rs Outdated Show resolved Hide resolved
hook-worker/src/worker.rs Show resolved Hide resolved
@bretthoerner
Copy link
Contributor Author

bretthoerner commented Feb 2, 2024

I think we're digging our own grave with keeping two parallel implementations. I'd support dropping non-tx for now and switching to it later if we find that tx breaks some of our scalability assumptions.

Right now, I would not be comfortable switching prod to non-tx, so it's almost as good as dead code.

I would also prefer to just pick an implementation.

That said, I did have one more thought for us to consider. PgQueue is written as a generic queue. If we think we might use the queue for longer running jobs, I can see where holding PG transactions open for, say, 5 minutes (or longer, I'm just pulling numbers out of thin air) would get pretty sketchy. But transaction mode is pretty sweet for Webhooks where our longest timeout will probably be like 30 seconds (and most jobs will complete in hundreds of millis).

@bretthoerner bretthoerner force-pushed the brett/multi-get branch 2 times, most recently from 1f9cafd to 8cb66e0 Compare February 2, 2024 19:15
let mut batch = self.wait_for_jobs_tx().await;
dequeue_batch_size_histogram.record(batch.jobs.len() as f64);

// Get enough permits for the jobs before spawning a task.
Copy link
Contributor

@xvello xvello Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be blocked here due to one missing permit.

We might get more throughput if we only pull min(semaphore.available_permits(), dequeue_batch_size) jobs, so that we guarantee we won't block here.

Copy link
Contributor Author

@bretthoerner bretthoerner Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to block?

If I understand correctly and we use min(semaphore.available_permits(), dequeue_batch_size) then we could spawn off the work of 100 jobs by only grabbing 1 permit. If we do that then what's the point of having max_concurrent_jobs at all?

That said, I proposed max_concurrent_jobs because non-transaction mode has no coupling between PG connections and the amount of work in progress. We could drop this from the transaction mode branch since the connection limit and batch size gives you a max of work in progress. (Or hopefully we pick one mode and call it a day.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placed the comment here because this is where we block, but my recommendation applies higher: change the signature of wait_for_jobs() to accept a max job count to pull, so that we can call wait_for_jobs(min(semaphore.available_permits(), dequeue_batch_size)).

That's something we can implement later on, we can skip it for now and revisit if we manage to actually hit this semaphore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah-ha, gotcha, that makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I think it's fine to just add a TODO comment when calling wait_for_jobs(), and move on for now.

@bretthoerner
Copy link
Contributor Author

@tomasfarias Any thoughts?

pub async fn commit(&mut self) -> PgQueueResult<()> {
let mut txn_guard = self.shared_txn.lock().await;

let txn = txn_guard.take().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this take call doing? From the Mutex docs, it seems like we should deref txn_guard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm familiar with the similarly named std::mem::take, and I'm afraid if we are replacing the txn held by the mutex.

Copy link
Contributor Author

@bretthoerner bretthoerner Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit takes ownership, so taking it from an Option is the only way I found to call it. I wouldn't say we're replacing it but yes we are taking it. Let me know if I'm missing something. :)

Copy link
Contributor

@tomasfarias tomasfarias Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like you are taking ownership, at least I see just a mutable reference to PgTransactionBatch. If you mean we take ownership to of the tx, we do, but in a way that requires leaving something behind in the Option. Why not take ownership of PgTransactionBatch instead? At least it seems it should be fine to be done with the batch once we commit.

I tried this out locally to see if there would be any compilation issues, and there are none (besides needing to bring the sqlx::postgres::any::AnyConnectionBackend trait into scope):

    pub async fn commit(self) -> PgQueueResult<()> {
        let mut txn_guard = self.shared_txn.lock().await;

        txn_guard
            .as_deref_mut()
            .ok_or(PgQueueError::TransactionAlreadyClosedError)?
            .commit()
            .await
            .map_err(|e| PgQueueError::QueryError {
                command: "COMMIT".to_owned(),
                error: e,
            })?;

        Ok(())
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of prefer this as sqlx takes ownership of transaction on commit, so it kind of makes sense that we take ownership of the transaction batch when we commit it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very important though, I understood your explanation, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of prefer this as sqlx takes ownership of transaction on commit, so it kind of makes sense that we take ownership of the transaction batch when we commit it.

Yeah, makes sense to me, thanks!

@bretthoerner
Copy link
Contributor Author

PR to remove non-transaction mode. It's a lot of noise so I wasn't sure if it would help or hurt reviewing this PR. Up to y'all: #65

Comment on lines +206 to +207
// We have to `take` the Vec of jobs from the batch to avoid a borrow checker
// error below when we commit.
for job in std::mem::take(&mut batch.jobs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me a bit nervous: The borrow checker is there to help us, not sure I want to avoid it.

Copy link
Contributor Author

@bretthoerner bretthoerner Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The borrow checker is still in play, maybe my comment is just bad. I can't find a way to take ownership of each job in the Vec (via complete() etc) and still be allowed to commit at the end.

This is just moving the Vec out of the batch, so we are the sole owner and can complete/fail jobs, and then the batch can still be used to commit.

@@ -37,6 +37,9 @@ pub struct Config {

#[envconfig(default = "true")]
pub transactional: bool,

#[envconfig(default = "10")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for the default? Otherwise I'd keep it in 1 until we need this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to 1 because I agree this is arbitrary, but I think we'll configure this to be non-0 in prod almost immediately. We currently only have 1 task dequeueing. If round trip to grab a job takes 5ms that's a 200 job/sec peak as written.

@bretthoerner
Copy link
Contributor Author

The current conflict will be un-conflicted when I re-merge the change I had to revert: #67

Copy link
Contributor

@tomasfarias tomasfarias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a pending comment regarding ownership of commit, but it's not that critical.

Overall I'm fine with this. Still hesitant that we are prematurely optimizing, but after reading the PR a few times, I'm comfortable with moving forward. Ultimately we are saving pg connections, which is not a trivial thing to save, and I can see a future where we will end up batching at some point.

Thank you!

@tomasfarias
Copy link
Contributor

By the way, this PR will likely conflict with the error changes I'm doing at #36. Happy to get this merged out first @bretthoerner, and then I can rebase my PR and change the error types here too.

@bretthoerner bretthoerner merged commit 54bf761 into main Feb 6, 2024
4 checks passed
@bretthoerner bretthoerner deleted the brett/multi-get branch February 6, 2024 15:27
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants