From 71a2344f1da990693993498aed8d3a8a5ffb0ed2 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Fri, 20 Oct 2023 09:27:47 +0200 Subject: [PATCH] feat: implement the job-retry subcommand --- api/src/background_worker/worker.rs | 22 ++++++++--- api/src/main.rs | 57 ++++++++++++++++++++--------- 2 files changed, 57 insertions(+), 22 deletions(-) diff --git a/api/src/background_worker/worker.rs b/api/src/background_worker/worker.rs index 1867b11..2e3508f 100644 --- a/api/src/background_worker/worker.rs +++ b/api/src/background_worker/worker.rs @@ -1,7 +1,4 @@ -use hub_core::{ - thiserror, tokio, - tracing::{error, info}, -}; +use hub_core::{thiserror, tokio, tracing::error}; use sea_orm::{error::DbErr, ActiveModelTrait}; use serde::{Deserialize, Serialize}; @@ -103,7 +100,6 @@ where if let Err(e) = job_tracking_am.update(db_conn).await { error!("Error updating job tracking: {}", e); } - info!("Successfully processed job {}", job.id); }, Err(e) => { let job_tracking_am = @@ -129,6 +125,22 @@ where } } + /// This method is responsible for retrying failed jobs. + /// It fetches all failed jobs of a specific type from the database, + /// deserializes their payloads, and attempts to process them again. + /// If the job is processed successfully, its status is updated to "completed". + /// If the job fails again, an error is logged and the job is skipped. + /// The method returns an empty result if it finishes without panicking. + /// + /// # Args + /// + /// * `&self` - A reference to the Worker instance. + /// + /// # Results + /// + /// * `Result<(), WorkerError>` - An empty result indicating successful execution. + /// # Errors + /// `Err(WorkerError)` pub async fn retry(&self) -> Result<(), WorkerError> { let db_pool = self.db_pool.clone(); let conn = db_pool.get(); diff --git a/api/src/main.rs b/api/src/main.rs index fcc3152..6a4d828 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -15,7 +15,7 @@ use holaplex_hub_nfts::{ metrics::Metrics, proto, Actions, AppState, Args, Services, Subcommand, }; -use hub_core::{clap, prelude::*, tokio, tracing::info}; +use hub_core::{prelude::*, tokio}; use poem::{get, listener::TcpListener, middleware::AddData, post, EndpointExt, Route, Server}; use redis::Client as RedisClient; @@ -35,7 +35,7 @@ pub fn main() { match command { None => serve(common, port, db, hub_uploads, redis_url), - Some(Subcommand::RetryJobs) => retry_jobs(common), + Some(Subcommand::RetryJobs) => retry_jobs(common, redis_url, db, hub_uploads), } }); } @@ -76,25 +76,13 @@ fn serve( let metadata_json_upload_task_context = MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone()); - let job_queue = JobQueue::new(redis_client, connection.clone()); + let job_queue = JobQueue::new(redis_client.clone(), connection.clone()); let worker = Worker::::new( job_queue.clone(), connection.clone(), metadata_json_upload_task_context, ); - let matches = clap::Command::new("hub-nfts") - .subcommand(clap::command!("jobs").subcommand(clap::command!("retry"))) - .get_matches(); - - if let Some(("jobs", jobs_command)) = matches.subcommand() { - if let Some(("retry", _retry_command)) = jobs_command.subcommand() { - worker.retry().await?; - - return Ok(()); - } - } - let schema = build_schema(); let state = AppState::new( @@ -106,6 +94,7 @@ fn serve( polygon.clone(), common.asset_proxy, job_queue.clone(), + redis_client, ); let cons = common.consumer_cfg.build::().await?; @@ -140,6 +129,40 @@ fn serve( }) } -fn retry_jobs(common: hub_core::Common) -> Result<()> { - common.rt.block_on(async move { todo!() }) +fn retry_jobs( + common: hub_core::Common, + redis_url: String, + db: holaplex_hub_nfts::db::DbArgs, + hub_uploads: holaplex_hub_nfts::hub_uploads::HubUploadArgs, +) -> Result<()> { + common.rt.block_on(async move { + let connection = Connection::new(db) + .await + .context("failed to get database connection")?; + let redis_client = RedisClient::open(redis_url)?; + let hub_uploads = HubUploadClient::new(hub_uploads)?; + + let producer = common + .producer_cfg + .clone() + .build::() + .await?; + + let solana = Solana::new(producer.clone()); + let polygon = Polygon::new(producer.clone()); + + let metadata_json_upload_task_context = + MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone()); + + let job_queue = JobQueue::new(redis_client, connection.clone()); + let worker = Worker::::new( + job_queue.clone(), + connection.clone(), + metadata_json_upload_task_context, + ); + + worker.retry().await?; + + Ok(()) + }) }