Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
feat: Support for attempted_by
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 30, 2023
1 parent 132ceee commit 388b544
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
52 changes: 38 additions & 14 deletions hook-common/src/pgqueue.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! # PgQueue
//!
//! A job queue implementation backed by a PostgreSQL table.
use std::str::FromStr;

use chrono::prelude::*;
Expand All @@ -6,7 +10,7 @@ use sqlx::postgres::{PgPool, PgPoolOptions};
use thiserror::Error;

/// Enumeration of errors for operations with PgQueue.
/// Errors can originate from sqlx and are wrapped by us to provide additional context.
/// Errors that can originate from sqlx and are wrapped by us to provide additional context.
#[derive(Error, Debug)]
pub enum PgQueueError {
#[error("connection failed with: {error}")]
Expand All @@ -23,17 +27,17 @@ pub enum PgQueueError {
}

/// Enumeration of possible statuses for a Job.
/// Available: A job that is waiting in the queue to be picked up by a worker.
/// Completed: A job that was successfully completed by a worker.
/// Failed: A job that was unsuccessfully completed by a worker.
/// Running: A job that was picked up by a worker and it's currentlly being run.
#[derive(Debug, PartialEq, sqlx::Type)]
#[sqlx(type_name = "job_status")]
#[sqlx(rename_all = "lowercase")]
pub enum JobStatus {
/// A job that is waiting in the queue to be picked up by a worker.
Available,
/// A job that was successfully completed by a worker.
Completed,
/// A job that was unsuccessfully completed by a worker.
Failed,
/// A job that was picked up by a worker and it's currentlly being run.
Running,
}

Expand All @@ -58,12 +62,23 @@ pub type JobParameters<J> = sqlx::types::Json<J>;
/// A Job to be executed by a worker dequeueing a PgQueue.
#[derive(sqlx::FromRow)]
pub struct Job<J> {
/// A unique id identifying a job.
pub id: i64,
/// A number corresponding to the current job attempt.
pub attempt: i32,
/// A datetime corresponding to when the current job attempt started.
pub attempted_at: Option<DateTime<Utc>>,
/// A vector of identifiers that have attempted this job. E.g. thread ids, pod names, etc...
pub attempted_by: Vec<String>,
/// A datetime corresponding to when the job was finished (either successfully or unsuccessfully).
pub finished_at: Option<DateTime<Utc>>,
/// A datetime corresponding to when the job was created.
pub created_at: DateTime<Utc>,
/// A datetime corresponding to when the first job attempt was started.
pub started_at: Option<DateTime<Utc>>,
/// The current status of the job.
pub status: JobStatus,
/// Arbitrary job parameters stored as JSON.
pub parameters: sqlx::types::Json<J>,
}

Expand All @@ -90,16 +105,21 @@ impl<J> NewJob<J> {

/// A queue implemented on top of a PostgreSQL table.
pub struct PgQueue {
/// The identifier of the PostgreSQL table this queue runs on.
table: String,
/// A connection pool used to connect to the PostgreSQL database.
pool: PgPool,
/// The identifier of the worker listening on this queue.
worker: String,
}

pub type PgQueueResult<T> = std::result::Result<T, PgQueueError>;

impl PgQueue {
/// Initialize a new PgQueue backed by table in PostgreSQL.
pub async fn new(table: &str, url: &str) -> PgQueueResult<Self> {
pub async fn new(table: &str, url: &str, worker: &str) -> PgQueueResult<Self> {
let table = table.to_owned();
let worker = worker.to_owned();
let pool = PgPoolOptions::new()
.connect(url)
.await
Expand All @@ -108,6 +128,7 @@ impl PgQueue {
Ok(Self {
table,
pool,
worker,
})
}

Expand All @@ -132,7 +153,8 @@ UPDATE
SET
started_at = NOW(),
status = 'running'::job_status,
attempt = "{0}".attempt + 1
attempt = "{0}".attempt + 1,
attempted_by = array_append("{0}".attempted_by, $1::text)
FROM
available_in_queue
WHERE
Expand All @@ -142,9 +164,7 @@ RETURNING
"#, &self.table);

let item: Job<J> = sqlx::query_as(&base_query)
.bind(&self.table)
.bind(&self.table)
.bind(&self.table)
.bind(&self.worker)
.fetch_one(&self.pool)
.await
.map_err(|error| PgQueueError::QueryError { command: "UPDATE".to_owned(), error})?;
Expand Down Expand Up @@ -199,17 +219,21 @@ mod tests {
};
let new_job = NewJob::new(job_parameters);

let queue = PgQueue::new("job_queue", "postgres://posthog:posthog@localhost:15432/test_database").await.unwrap();
let worker_id = std::process::id().to_string();
let queue = PgQueue::new("job_queue", "postgres://posthog:posthog@localhost:15432/test_database", &worker_id)
.await
.expect("failed to connect to local test postgresql database");

queue.enqueue(new_job).await.unwrap();
queue.enqueue(new_job).await.expect("failed to enqueue job");

let job: Job<JobParameters> = queue.dequeue().await.unwrap();
let job: Job<JobParameters> = queue.dequeue().await.expect("failed to dequeue job");

assert_eq!(job.attempt, 1);
assert_eq!(job.parameters.method, "POST".to_string());
assert_eq!(job.parameters.body, "{\"event\":\"event-name\"}".to_string());
assert_eq!(job.parameters.url, "https://localhost".to_string());
assert_eq!(job.finished_at, None);
assert!(job.finished_at.is_none());
assert_eq!(job.status, JobStatus::Running);
assert!(job.attempted_by.contains(&worker_id));
}
}
2 changes: 2 additions & 0 deletions migrations/20231129172339_job_queue_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ CREATE TYPE job_status AS ENUM(
CREATE TABLE job_queue(
id BIGSERIAL PRIMARY KEY,
attempt INT NOT NULL DEFAULT 0,
attempted_at TIMESTAMPTZ DEFAULT NULL,
attempted_by TEXT[] DEFAULT ARRAY[]::TEXT[],
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ DEFAULT NULL,
started_at TIMESTAMPTZ DEFAULT NULL,
Expand Down

0 comments on commit 388b544

Please sign in to comment.