Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Don't hold connection for non-txn jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Feb 2, 2024
1 parent da6250b commit 6dcf73b
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions hook-common/src/pgqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub enum PgQueueError {
pub enum PgJobError<T> {
#[error("retry is an invalid state for this PgJob: {error}")]
RetryInvalidError { job: T, error: String },
#[error("connection failed with: {error}")]
ConnectionError { error: sqlx::Error },
#[error("{command} query failed with: {error}")]
QueryError { command: String, error: sqlx::Error },
#[error("transaction {command} failed with: {error}")]
Expand Down Expand Up @@ -217,20 +219,34 @@ pub trait PgQueueJob {
#[derive(Debug)]
pub struct PgJob<J, M> {
pub job: Job<J, M>,
pub connection: sqlx::pool::PoolConnection<sqlx::postgres::Postgres>,
pub pool: PgPool,
}

impl<J: std::marker::Send, M: std::marker::Send> PgJob<J, M> {
async fn acquire_conn(
&mut self,
) -> Result<sqlx::pool::PoolConnection<sqlx::postgres::Postgres>, PgJobError<Box<PgJob<J, M>>>>
{
self.pool
.acquire()
.await
.map_err(|error| PgJobError::ConnectionError { error })
}
}

#[async_trait]
impl<J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgJob<J, M> {
async fn complete(mut self) -> Result<CompletedJob, PgJobError<Box<PgJob<J, M>>>> {
let completed_job = self
.job
.complete(&mut *self.connection)
.await
.map_err(|error| PgJobError::QueryError {
command: "UPDATE".to_owned(),
error,
})?;
let mut connection = self.acquire_conn().await?;

let completed_job =
self.job
.complete(&mut *connection)
.await
.map_err(|error| PgJobError::QueryError {
command: "UPDATE".to_owned(),
error,
})?;

Ok(completed_job)
}
Expand All @@ -239,9 +255,11 @@ impl<J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgJob<J, M> {
mut self,
error: E,
) -> Result<FailedJob<E>, PgJobError<Box<PgJob<J, M>>>> {
let mut connection = self.acquire_conn().await?;

let failed_job = self
.job
.fail(error, &mut *self.connection)
.fail(error, &mut *connection)
.await
.map_err(|error| PgJobError::QueryError {
command: "UPDATE".to_owned(),
Expand All @@ -264,11 +282,13 @@ impl<J: std::marker::Send, M: std::marker::Send> PgQueueJob for PgJob<J, M> {
});
}

let mut connection = self.acquire_conn().await?;

let retried_job = self
.job
.retryable()
.queue(queue)
.retry(error, retry_interval, &mut *self.connection)
.retry(error, retry_interval, &mut *connection)
.await
.map_err(|error| PgJobError::QueryError {
command: "UPDATE".to_owned(),
Expand Down Expand Up @@ -600,7 +620,10 @@ RETURNING
.await;

match query_result {
Ok(job) => Ok(Some(PgJob { job, connection })),
Ok(job) => Ok(Some(PgJob {
job,
pool: self.pool.clone(),
})),

// Although connection would be closed once it goes out of scope, sqlx recommends explicitly calling close().
// See: https://docs.rs/sqlx/latest/sqlx/postgres/any/trait.AnyConnectionBackend.html#tymethod.close.
Expand Down

0 comments on commit 6dcf73b

Please sign in to comment.