diff --git a/api/src/background_worker/job_queue.rs b/api/src/background_worker/job_queue.rs index bcac193..31c267e 100644 --- a/api/src/background_worker/job_queue.rs +++ b/api/src/background_worker/job_queue.rs @@ -70,7 +70,7 @@ impl JobQueue { let payload = serde_json::to_string(&job_to_enqueue)?; redis::cmd("LPUSH") - .arg("job_queue") + .arg(T::QUEUE) .arg(payload) .query_async(&mut conn) .await?; @@ -94,7 +94,7 @@ impl JobQueue { let db_conn = self.db_pool.get(); let res: Option<(String, String)> = redis::cmd("BRPOP") - .arg("job_queue") + .arg(T::QUEUE) .arg(0) .query_async(&mut conn) .await?; diff --git a/api/src/background_worker/tasks/metadata_json_upload_task.rs b/api/src/background_worker/tasks/metadata_json_upload_task.rs index f348c78..fd73361 100644 --- a/api/src/background_worker/tasks/metadata_json_upload_task.rs +++ b/api/src/background_worker/tasks/metadata_json_upload_task.rs @@ -666,8 +666,15 @@ impl Context { #[async_trait::async_trait] impl BackgroundTask for MetadataJsonUploadTask { + const QUEUE: &'static str = "job_queue"; + const NAME: &'static str = "MetadataJsonUploadTask"; + fn name(&self) -> &'static str { - "MetadataJsonUploadTask" + Self::NAME + } + + fn queue(&self) -> &'static str { + Self::QUEUE } fn payload(&self) -> Result { diff --git a/api/src/background_worker/tasks/mod.rs b/api/src/background_worker/tasks/mod.rs index 48f6b0d..87d56ea 100644 --- a/api/src/background_worker/tasks/mod.rs +++ b/api/src/background_worker/tasks/mod.rs @@ -36,6 +36,12 @@ pub enum BackgroundTaskError { #[async_trait::async_trait] pub trait BackgroundTask: Send + Sync + std::fmt::Debug { + /// The name of the task + const NAME: &'static str; + + /// The queue of the task + const QUEUE: &'static str; + /// Process the task /// # Arguments /// * `self` - The task @@ -55,6 +61,7 @@ pub trait BackgroundTask: Send + Sync + std::fmt::Debug /// * `anyhow::Error` - Unable to serialize the payload fn payload(&self) -> Result; fn name(&self) -> &'static str; + fn queue(&self) -> &'static str; } pub use metadata_json_upload_task::{ diff --git a/api/src/background_worker/worker.rs b/api/src/background_worker/worker.rs index 32cad80..1867b11 100644 --- a/api/src/background_worker/worker.rs +++ b/api/src/background_worker/worker.rs @@ -6,6 +6,7 @@ use sea_orm::{error::DbErr, ActiveModelTrait}; use serde::{Deserialize, Serialize}; use super::{ + job::Job, job_queue::{JobQueue, JobQueueError}, tasks::BackgroundTask, }; @@ -68,6 +69,8 @@ where tokio::spawn({ let db_pool = db_pool.clone(); let context = context.clone(); + let job_queue = job_queue.clone(); + async move { let db_conn = db_pool.get(); let db_pool_process = db_pool.clone(); @@ -106,8 +109,15 @@ where let job_tracking_am = job_trackings::Entity::update_status(model, "failed"); if let Err(e) = job_tracking_am.update(db_conn).await { - error!("Error updating job tracking: {}", e); + error!("Error updating job tracking after failure: {}", e); + } + + let requeue_result = job_queue.enqueue(job.task).await; + + if let Err(e) = requeue_result { + error!("Error requeueing job {}: {}", job.id, e); } + error!("Error processing job {}: {}", job.id, e); }, } @@ -118,4 +128,47 @@ where }); } } + + pub async fn retry(&self) -> Result<(), WorkerError> { + let db_pool = self.db_pool.clone(); + let conn = db_pool.get(); + + let failed_jobs = job_trackings::Entity::filter_failed_for_job_type(T::NAME.to_string()) + .all(conn) + .await?; + + for failed_job in failed_jobs { + let task_payload_result: Result = + serde_json::from_value(failed_job.clone().payload); + + match task_payload_result { + Ok(task_payload) => { + let job = Job::new(failed_job.id, task_payload); + + let task_results = job + .task + .process(db_pool.clone(), self.context.clone()) + .await; + + if let Err(e) = task_results { + error!("Error retrying job: {}", e); + continue; + } + + let job_tracking_am = + job_trackings::Entity::update_status(failed_job, "completed"); + + if let Err(e) = job_tracking_am.update(conn).await { + error!("Error updating job tracking: {}", e); + } + }, + Err(e) => { + error!("Error deserializing job: {}", e); + continue; + }, + } + } + + Ok(()) + } } diff --git a/api/src/entities/job_trackings.rs b/api/src/entities/job_trackings.rs index ce3bea0..35db8bd 100644 --- a/api/src/entities/job_trackings.rs +++ b/api/src/entities/job_trackings.rs @@ -1,5 +1,5 @@ use hub_core::chrono; -use sea_orm::{entity::prelude::*, Set}; +use sea_orm::{entity::prelude::*, QueryOrder, Set}; use serde_json::Value as Json; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] @@ -49,4 +49,14 @@ impl Entity { active_model } + + pub fn filter_failed_for_job_type(job_type: String) -> Select { + Self::find() + .filter( + Column::Status + .eq("failed") + .and(Column::JobType.eq(job_type)), + ) + .order_by_asc(Column::CreatedAt) + } } diff --git a/api/src/main.rs b/api/src/main.rs index 0f8f2a1..bc3c267 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -15,7 +15,7 @@ use holaplex_hub_nfts::{ metrics::Metrics, proto, Actions, AppState, Args, Services, }; -use hub_core::{prelude::*, tokio}; +use hub_core::{clap, prelude::*, tokio, tracing::info}; use poem::{get, listener::TcpListener, middleware::AddData, post, EndpointExt, Route, Server}; use redis::Client as RedisClient; @@ -36,18 +36,15 @@ pub fn main() { let connection = Connection::new(db) .await .context("failed to get database connection")?; - - let schema = build_schema(); + let hub_uploads = HubUploadClient::new(hub_uploads)?; + let credits = common.credits_cfg.build::().await?; + let metrics = Metrics::new()?; let producer = common .producer_cfg .clone() .build::() .await?; - let credits = common.credits_cfg.build::().await?; - let hub_uploads = HubUploadClient::new(hub_uploads)?; - - let metrics = Metrics::new()?; let event_processor = events::Processor::new( connection.clone(), @@ -71,6 +68,20 @@ pub fn main() { metadata_json_upload_task_context, ); + let matches = clap::Command::new("hub-nfts") + .subcommand(clap::command!("jobs").subcommand(clap::command!("retry"))) + .get_matches(); + + if let Some(("jobs", jobs_command)) = matches.subcommand() { + if let Some(("retry", _retry_command)) = jobs_command.subcommand() { + worker.retry().await?; + + return Ok(()); + } + } + + let schema = build_schema(); + let state = AppState::new( schema, connection.clone(),