Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
refactor: Two clients one for each mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Dec 14, 2023
1 parent 537bfcd commit c65b913
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 29 deletions.
147 changes: 129 additions & 18 deletions hook-consumer/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time;

use async_std::task;
use hook_common::pgqueue::{PgJobError, PgQueue, PgQueueError, PgTransactionJob};
use hook_common::pgqueue::{PgJob, PgJobError, PgQueue, PgQueueError, PgTransactionJob};
use http::StatusCode;
use reqwest::header;
use serde::{de::Visitor, Deserialize, Serialize};
Expand Down Expand Up @@ -147,8 +147,6 @@ pub struct WebhookConsumer<'p> {
client: reqwest::Client,
/// Maximum number of concurrent jobs being processed.
max_concurrent_jobs: usize,
/// Indicates whether we are holding an open transaction while processing or not.
transactional: bool,
}

impl<'p> WebhookConsumer<'p> {
Expand All @@ -158,7 +156,6 @@ impl<'p> WebhookConsumer<'p> {
poll_interval: time::Duration,
request_timeout: time::Duration,
max_concurrent_jobs: usize,
transactional: bool,
) -> Result<Self, WebhookConsumerError> {
let mut headers = header::HeaderMap::new();
headers.insert(
Expand All @@ -177,23 +174,82 @@ impl<'p> WebhookConsumer<'p> {
poll_interval,
client,
max_concurrent_jobs,
transactional,
})
}

/// Wait until a job becomes available in our queue.
async fn wait_for_job_tx<'a>(
&self,
) -> Result<PgTransactionJob<'a, WebhookJobParameters>, WebhookConsumerError> {
async fn wait_for_job<'a>(&self) -> Result<PgJob<WebhookJobParameters>, WebhookConsumerError> {
loop {
if let Some(job) = self.queue.dequeue_tx(&self.name).await? {
if let Some(job) = self.queue.dequeue(&self.name).await? {
return Ok(job);
} else {
task::sleep(self.poll_interval).await;
}
}
}

/// Run this consumer to continuously process any jobs that become available.
pub async fn run(&self) -> Result<(), WebhookConsumerError> {
let semaphore = Arc::new(sync::Semaphore::new(self.max_concurrent_jobs));

loop {
let webhook_job = self.wait_for_job().await?;

// reqwest::Client internally wraps with Arc, so this allocation is cheap.
let client = self.client.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();

tokio::spawn(async move {
let result = process_webhook_job(client, webhook_job).await;
drop(permit);
result
});
}
}
}

/// A consumer to poll `PgQueue` and spawn tasks to process webhooks when a job becomes available.
pub struct WebhookTransactionConsumer<'p> {
/// An identifier for this consumer. Used to mark jobs we have consumed.
name: String,
/// The queue we will be dequeuing jobs from.
queue: &'p PgQueue,
/// The interval for polling the queue.
poll_interval: time::Duration,
/// The client used for HTTP requests.
client: reqwest::Client,
/// Maximum number of concurrent jobs being processed.
max_concurrent_jobs: usize,
}

impl<'p> WebhookTransactionConsumer<'p> {
pub fn new(
name: &str,
queue: &'p PgQueue,
poll_interval: time::Duration,
request_timeout: time::Duration,
max_concurrent_jobs: usize,
) -> Result<Self, WebhookConsumerError> {
let mut headers = header::HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),
);

let client = reqwest::Client::builder()
.default_headers(headers)
.timeout(request_timeout)
.build()?;

Ok(Self {
name: name.to_owned(),
queue,
poll_interval,
client,
max_concurrent_jobs,
})
}

/// Wait until a job becomes available in our queue.
async fn wait_for_job<'a>(
&self,
Expand All @@ -212,17 +268,14 @@ impl<'p> WebhookConsumer<'p> {
let semaphore = Arc::new(sync::Semaphore::new(self.max_concurrent_jobs));

loop {
let webhook_job = match self.transactional {
true => self.wait_for_job_tx().await,
false => self.wait_for_job().await,
}?;
let webhook_job = self.wait_for_job().await?;

// reqwest::Client internally wraps with Arc, so this allocation is cheap.
let client = self.client.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();

tokio::spawn(async move {
let result = process_webhook_job(client, webhook_job).await;
let result = process_webhook_job_tx(client, webhook_job).await;
drop(permit);
result
});
Expand All @@ -243,7 +296,7 @@ impl<'p> WebhookConsumer<'p> {
///
/// * `webhook_job`: The webhook job to process as dequeued from `hook_common::pgqueue::PgQueue`.
/// * `request_timeout`: A timeout for the HTTP request.
async fn process_webhook_job(
async fn process_webhook_job_tx(
client: reqwest::Client,
webhook_job: PgTransactionJob<'_, WebhookJobParameters>,
) -> Result<(), WebhookConsumerError> {
Expand Down Expand Up @@ -290,6 +343,66 @@ async fn process_webhook_job(
}
}

/// Process a webhook job by transitioning it to its appropriate state after its request is sent.
/// After we finish, the webhook job will be set as completed (if the request was successful), retryable (if the request
/// was unsuccessful but we can still attempt a retry), or failed (if the request was unsuccessful and no more retries
/// may be attempted).
///
/// A webhook job is considered retryable after a failing request if:
/// 1. The job has attempts remaining (i.e. hasn't reached `max_attempts`), and...
/// 2. The status code indicates retrying at a later point could resolve the issue. This means: 429 and any 5XX.
///
/// # Arguments
///
/// * `webhook_job`: The webhook job to process as dequeued from `hook_common::pgqueue::PgQueue`.
/// * `request_timeout`: A timeout for the HTTP request.
async fn process_webhook_job(
client: reqwest::Client,
webhook_job: PgJob<WebhookJobParameters>,
) -> Result<(), WebhookConsumerError> {
match send_webhook(
client,
&webhook_job.job.parameters.method,
&webhook_job.job.parameters.url,
&webhook_job.job.parameters.headers,
webhook_job.job.parameters.body.clone(),
)
.await
{
Ok(_) => {
webhook_job
.complete()
.await
.map_err(|error| WebhookConsumerError::PgJobError(error.to_string()))?;
Ok(())
}
Err(WebhookConsumerError::RetryableWebhookError {
reason,
retry_after,
}) => match webhook_job.retry(reason.to_string(), retry_after).await {
Ok(_) => Ok(()),
Err(PgJobError::RetryInvalidError {
job: webhook_job,
error: fail_error,
}) => {
webhook_job
.fail(fail_error.to_string())
.await
.map_err(|job_error| WebhookConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
}
Err(job_error) => Err(WebhookConsumerError::PgJobError(job_error.to_string())),
},
Err(error) => {
webhook_job
.fail(error.to_string())
.await
.map_err(|job_error| WebhookConsumerError::PgJobError(job_error.to_string()))?;
Ok(())
}
}
}

/// Make an HTTP request to a webhook endpoint.
///
/// # Arguments
Expand Down Expand Up @@ -466,7 +579,6 @@ mod tests {
time::Duration::from_millis(100),
time::Duration::from_millis(5000),
10,
false,
)
.expect("consumer failed to initialize");
let consumed_job = consumer
Expand Down Expand Up @@ -511,13 +623,12 @@ mod tests {
enqueue_job(&queue, 1, webhook_job.clone())
.await
.expect("failed to enqueue job");
let consumer = WebhookConsumer::new(
let consumer = WebhookTransactionConsumer::new(
&worker_id,
&queue,
time::Duration::from_millis(100),
time::Duration::from_millis(5000),
10,
true,
)
.expect("consumer failed to initialize");
let consumed_job = consumer
Expand Down
34 changes: 23 additions & 11 deletions hook-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use envconfig::Envconfig;

use hook_common::pgqueue::{PgQueue, RetryPolicy};
use hook_consumer::config::Config;
use hook_consumer::consumer::WebhookConsumer;
use hook_consumer::consumer::{WebhookConsumer, WebhookTransactionConsumer};
use hook_consumer::error::WebhookConsumerError;

#[tokio::main]
Expand All @@ -23,16 +23,28 @@ async fn main() -> Result<(), WebhookConsumerError> {
.await
.expect("failed to initialize queue");

let consumer = WebhookConsumer::new(
&config.consumer_name,
&queue,
config.poll_interval.0,
config.request_timeout.0,
config.max_concurrent_jobs,
config.transactional,
)?;

let _ = consumer.run().await;
match config.transactional {
true => {
let consumer = WebhookTransactionConsumer::new(
&config.consumer_name,
&queue,
config.poll_interval.0,
config.request_timeout.0,
config.max_concurrent_jobs,
)?;
consumer.run().await?;
}
false => {
let consumer = WebhookConsumer::new(
&config.consumer_name,
&queue,
config.poll_interval.0,
config.request_timeout.0,
config.max_concurrent_jobs,
)?;
consumer.run().await?;
}
};

Ok(())
}

0 comments on commit c65b913

Please sign in to comment.