From 5728df2147e5ce2a9cd791c6e34c1c13af3b85c9 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 23 Aug 2024 18:34:07 +0530 Subject: [PATCH] feat : added worker queues and listeners to consume the message (#92) * feat : added worker queues and listeners to consume the message * changelog * refactor : removed excess queues * fear : removed duplicate macro * feat : renamed vars * refactor : code refactor according the comments above --- .env.example | 2 + .env.test | 2 + CHANGELOG.md | 1 + crates/orchestrator/src/main.rs | 22 -- crates/orchestrator/src/queue/job_queue.rs | 233 ++++++++++++++++----- crates/orchestrator/src/queue/sqs/mod.rs | 25 ++- 6 files changed, 208 insertions(+), 77 deletions(-) diff --git a/.env.example b/.env.example index 9d229975..fdfc8881 100644 --- a/.env.example +++ b/.env.example @@ -29,6 +29,8 @@ AWS_DEFAULT_REGION= # SQS SQS_JOB_PROCESSING_QUEUE_URL= SQS_JOB_VERIFICATION_QUEUE_URL= +SQS_JOB_HANDLE_FAILURE_QUEUE_URL= +SQS_WORKER_TRIGGER_QUEUE_URL= # S3 AWS_S3_BUCKET_NAME= diff --git a/.env.test b/.env.test index 0e8f36a3..11d500ba 100644 --- a/.env.test +++ b/.env.test @@ -7,6 +7,8 @@ AWS_S3_BUCKET_REGION="us-east-1" AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue" SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" +SQS_JOB_HANDLE_FAILURE_QUEUE_URL= +SQS_WORKER_TRIGGER_QUEUE_URL= AWS_DEFAULT_REGION="localhost" ##### On chain config ##### diff --git a/CHANGELOG.md b/CHANGELOG.md index 7aa715a1..356fa58d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- Worker queues to listen for trigger events. - Tests for prover client. - Added Rust Cache for Coverage Test CI. - support for fetching PIE file from storage client in proving job. diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index b2394093..1739f453 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -2,12 +2,6 @@ use dotenvy::dotenv; use orchestrator::config::config; use orchestrator::queue::init_consumers; use orchestrator::routes::app_router; -use orchestrator::workers::data_submission_worker::DataSubmissionWorker; -use orchestrator::workers::proof_registration::ProofRegistrationWorker; -use orchestrator::workers::proving::ProvingWorker; -use orchestrator::workers::snos::SnosWorker; -use orchestrator::workers::update_state::UpdateStateWorker; -use orchestrator::workers::*; use utils::env_utils::get_env_var_or_default; /// Start the server @@ -27,22 +21,6 @@ async fn main() { // init consumer init_consumers().await.expect("Failed to init consumers"); - // spawn a thread for each workers - // changes in rollup mode - sovereign, validity, validiums etc. - // will likely involve changes in these workers as well - tokio::spawn(start_cron(Box::new(SnosWorker), 60)); - tokio::spawn(start_cron(Box::new(ProvingWorker), 60)); - tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60)); - tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60)); - tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60)); - tracing::info!("Listening on http://{}", address); axum::serve(listener, app).await.expect("Failed to start axum server"); } - -async fn start_cron(worker: Box, interval: u64) { - loop { - worker.run_worker_if_enabled().await.expect("Error in running the worker."); - tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; - } -} diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index e881d2ef..d6769aea 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -3,7 +3,7 @@ use std::time::Duration; use color_eyre::eyre::Context; use color_eyre::Result as EyreResult; -use omniqueue::QueueError; +use omniqueue::{Delivery, QueueError}; use serde::{Deserialize, Serialize}; use tokio::time::sleep; use tracing::log; @@ -14,9 +14,18 @@ use crate::jobs::{handle_job_failure, process_job, verify_job, JobError, OtherEr pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; -// Below is the Data Letter Queue for the the above two jobs. +// Below is the Data Letter Queue for the above two jobs. pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue"; +// Queues for SNOS worker trigger listening +pub const WORKER_TRIGGER_QUEUE: &str = "madara_orchestrator_worker_trigger_queue"; + +use crate::workers::data_submission_worker::DataSubmissionWorker; +use crate::workers::proof_registration::ProofRegistrationWorker; +use crate::workers::proving::ProvingWorker; +use crate::workers::snos::SnosWorker; +use crate::workers::update_state::UpdateStateWorker; +use crate::workers::Worker; use thiserror::Error; #[derive(Error, Debug, PartialEq)] @@ -27,6 +36,9 @@ pub enum ConsumptionError { #[error("Failed to handle job with id {job_id:?}. Error: {error_msg:?}")] FailedToHandleJob { job_id: Uuid, error_msg: String }, + #[error("Failed to spawn {worker_trigger_type:?} worker. Error: {error_msg:?}")] + FailedToSpawnWorker { worker_trigger_type: WorkerTriggerType, error_msg: String }, + #[error("Other error: {0}")] Other(#[from] OtherError), } @@ -36,6 +48,25 @@ pub struct JobQueueMessage { pub(crate) id: Uuid, } +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +pub enum WorkerTriggerType { + Snos, + Proving, + ProofRegistration, + DataSubmission, + UpdateState, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct WorkerTriggerMessage { + pub(crate) worker: WorkerTriggerType, +} + +enum DeliveryReturnType { + Message(Delivery), + NoMessage, +} + pub async fn add_job_to_process_queue(id: Uuid) -> EyreResult<()> { log::info!("Adding job with id {:?} to processing queue", id); add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None).await @@ -52,60 +83,158 @@ where Fut: Future>, { log::info!("Consuming from queue {:?}", queue); - let config = config().await; - let delivery = match config.queue().consume_message_from_queue(queue.clone()).await { - Ok(d) => d, - Err(QueueError::NoData) => { - return Ok(()); - } - Err(e) => { - return Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }); - } + let delivery = get_delivery_from_queue(&queue).await?; + + let message = match delivery { + DeliveryReturnType::Message(message) => message, + DeliveryReturnType::NoMessage => return Ok(()), + }; + + let job_message = parse_job_message(&message)?; + + if let Some(job_message) = job_message { + handle_job_message(job_message, message, handler).await?; + } + + Ok(()) +} + +/// Function to consume the message from the worker trigger queues and spawn the worker +/// for respective message received. +pub async fn consume_worker_trigger_messages_from_queue( + queue: String, + handler: F, +) -> Result<(), ConsumptionError> +where + F: FnOnce(Box) -> Fut, + Fut: Future>, +{ + log::info!("Consuming from queue {:?}", queue); + let delivery = get_delivery_from_queue(&queue).await?; + + let message = match delivery { + DeliveryReturnType::Message(message) => message, + DeliveryReturnType::NoMessage => return Ok(()), }; - let job_message: Option = delivery + + let job_message = parse_worker_message(&message)?; + + if let Some(job_message) = job_message { + handle_worker_message(job_message, message, handler).await?; + } + + Ok(()) +} + +fn parse_job_message(message: &Delivery) -> Result, ConsumptionError> { + message .payload_serde_json() .wrap_err("Payload Serde Error") - .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; - - match job_message { - Some(job_message) => { - log::info!("Handling job with id {:?} for queue {:?}", job_message.id, queue); - match handler(job_message.id).await { - Ok(_) => delivery - .ack() - .await - .map_err(|(e, _)| e) - .wrap_err("Queue Error") - .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?, - Err(e) => { - log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); - - // if the queue as a retry logic at the source, it will be attempted - // after the nack - match delivery.nack().await { - Ok(_) => Err(ConsumptionError::FailedToHandleJob { - job_id: job_message.id, - error_msg: "Job handling failed, message nack-ed".to_string(), - })?, - Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob { - job_id: job_message.id, - error_msg: delivery_nack_error.0.to_string(), - })?, - } - } - }; + .map_err(|e| ConsumptionError::Other(OtherError::from(e))) +} + +fn parse_worker_message(message: &Delivery) -> Result, ConsumptionError> { + message + .payload_serde_json() + .wrap_err("Payload Serde Error") + .map_err(|e| ConsumptionError::Other(OtherError::from(e))) +} + +async fn handle_job_message( + job_message: JobQueueMessage, + message: Delivery, + handler: F, +) -> Result<(), ConsumptionError> +where + F: FnOnce(Uuid) -> Fut, + Fut: Future>, +{ + log::info!("Handling job with id {:?}", job_message.id); + + match handler(job_message.id).await { + Ok(_) => { + message + .ack() + .await + .map_err(|(e, _)| e) + .wrap_err("Queue Error") + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + Ok(()) } - None => return Ok(()), - }; + Err(e) => { + log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); + match message.nack().await { + Ok(_) => Err(ConsumptionError::FailedToHandleJob { + job_id: job_message.id, + error_msg: "Job handling failed, message nack-ed".to_string(), + }), + Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob { + job_id: job_message.id, + error_msg: delivery_nack_error.0.to_string(), + }), + } + } + } +} - Ok(()) +async fn handle_worker_message( + job_message: WorkerTriggerMessage, + message: Delivery, + handler: F, +) -> Result<(), ConsumptionError> +where + F: FnOnce(Box) -> Fut, + Fut: Future>, +{ + log::info!("Handling worker trigger for worker type : {:?}", job_message.worker); + let worker_handler = get_worker_handler_from_worker_trigger_type(job_message.worker.clone()); + + match handler(worker_handler).await { + Ok(_) => { + message + .ack() + .await + .map_err(|(e, _)| e) + .wrap_err("Queue Error") + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + Ok(()) + } + Err(e) => { + log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e); + message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?; + Err(ConsumptionError::FailedToSpawnWorker { + worker_trigger_type: job_message.worker, + error_msg: "Worker handling failed, message nack-ed".to_string(), + }) + } + } +} + +/// To get Box handler from `WorkerTriggerType`. +fn get_worker_handler_from_worker_trigger_type(worker_trigger_type: WorkerTriggerType) -> Box { + match worker_trigger_type { + WorkerTriggerType::Snos => Box::new(SnosWorker), + WorkerTriggerType::Proving => Box::new(ProvingWorker), + WorkerTriggerType::DataSubmission => Box::new(DataSubmissionWorker), + WorkerTriggerType::ProofRegistration => Box::new(ProofRegistrationWorker), + WorkerTriggerType::UpdateState => Box::new(UpdateStateWorker), + } +} + +/// To get the delivery from the message queue using the queue name +async fn get_delivery_from_queue(queue: &str) -> Result { + match config().await.queue().consume_message_from_queue(queue.to_string()).await { + Ok(d) => Ok(DeliveryReturnType::Message(d)), + Err(QueueError::NoData) => Ok(DeliveryReturnType::NoMessage), + Err(e) => Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }), + } } macro_rules! spawn_consumer { - ($queue_type :expr, $handler : expr) => { + ($queue_type :expr, $handler : expr, $consume_function: expr) => { tokio::spawn(async move { loop { - match consume_job_from_queue($queue_type, $handler).await { + match $consume_function($queue_type, $handler).await { Ok(_) => {} Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e), } @@ -116,10 +245,16 @@ macro_rules! spawn_consumer { } pub async fn init_consumers() -> Result<(), JobError> { - spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job); - spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job); - spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure); + spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job, consume_job_from_queue); + spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job, consume_job_from_queue); + spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure, consume_job_from_queue); + spawn_consumer!(WORKER_TRIGGER_QUEUE.to_string(), spawn_worker, consume_worker_trigger_messages_from_queue); + Ok(()) +} +/// To spawn the worker by passing the worker struct +async fn spawn_worker(worker: Box) -> color_eyre::Result<()> { + worker.run_worker_if_enabled().await.expect("Error in running the worker."); Ok(()) } diff --git a/crates/orchestrator/src/queue/sqs/mod.rs b/crates/orchestrator/src/queue/sqs/mod.rs index 1598189a..2e770017 100644 --- a/crates/orchestrator/src/queue/sqs/mod.rs +++ b/crates/orchestrator/src/queue/sqs/mod.rs @@ -1,8 +1,12 @@ +use std::collections::HashMap; use std::time::Duration; -use crate::queue::job_queue::JOB_PROCESSING_QUEUE; +use crate::queue::job_queue::{ + JOB_HANDLE_FAILURE_QUEUE, JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE, +}; use async_trait::async_trait; use color_eyre::Result; +use lazy_static::lazy_static; use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer}; use omniqueue::{Delivery, QueueError}; use utils::env_utils::get_env_var_or_panic; @@ -10,6 +14,16 @@ use utils::env_utils::get_env_var_or_panic; use crate::queue::QueueProvider; pub struct SqsQueue; +lazy_static! { + /// Maps Queue Name to Env var of queue URL. + pub static ref QUEUE_NAME_TO_ENV_VAR_MAPPING: HashMap<&'static str, &'static str> = HashMap::from([ + (JOB_PROCESSING_QUEUE, "SQS_JOB_PROCESSING_QUEUE_URL"), + (JOB_VERIFICATION_QUEUE, "SQS_JOB_VERIFICATION_QUEUE_URL"), + (JOB_HANDLE_FAILURE_QUEUE, "SQS_JOB_HANDLE_FAILURE_QUEUE_URL"), + (WORKER_TRIGGER_QUEUE, "SQS_WORKER_TRIGGER_QUEUE_URL"), + ]); +} + #[async_trait] impl QueueProvider for SqsQueue { async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option) -> Result<()> { @@ -31,12 +45,11 @@ impl QueueProvider for SqsQueue { } } +/// To fetch the queue URL from the environment variables fn get_queue_url(queue_name: String) -> String { - if queue_name == JOB_PROCESSING_QUEUE { - get_env_var_or_panic("SQS_JOB_PROCESSING_QUEUE_URL") - } else { - get_env_var_or_panic("SQS_JOB_VERIFICATION_QUEUE_URL") - } + get_env_var_or_panic( + QUEUE_NAME_TO_ENV_VAR_MAPPING.get(queue_name.as_str()).expect("Not able to get the queue env var name."), + ) } // TODO: store the producer and consumer in memory to avoid creating a new one every time