Skip to content

Commit

Permalink
Feature: data letter queue (#73)
Browse files Browse the repository at this point in the history
* dl-queue: added termination queue

* dl-queue: spwan consumer to a macro_rule

* dl-queue: test for handle_job_failure

* dl-queue: handle_job_failure failed test case

* dl-queue: fixed test cases

* dl-queue: tests fixed

* dl-queue: assert optimised

* dl-queue: DL job rewritten tests

* dl-queue: formatting changes

* dl-queue: update mod.rs

* dl-queue: lint fixes

* dl-queue: using strum for JobStatus Display

* dl-queue: added test cases for  handle_job_failure_with_failed_job_status_works

* fix: testcase
  • Loading branch information
heemankv authored Aug 13, 2024
1 parent 371d08d commit 73355ca
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Basic rust-toolchain support.
- `AWS_DEFAULT_REGION="localhost"` var. in .env.test for omniqueue queue testing.
- Added basic rust-toolchain support.
- Implement DL queue for handling failed jobs.
- Added tests for state update job.
- Tests for DA job.

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ snos = { workspace = true }
starknet = { workspace = true }
starknet-core = "0.9.0"
starknet-settlement-client = { workspace = true }
strum_macros = "0.26.4"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
tracing = { workspace = true }
Expand Down
27 changes: 27 additions & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,33 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
Ok(())
}

/// Terminates the job and updates the status of the job in the DB.
/// Logs error if the job status `Completed` is existing on DL queue.
pub async fn handle_job_failure(id: Uuid) -> Result<()> {
let config = config().await;

let mut job = get_job(id).await?.clone();
let mut metadata = job.metadata.clone();

if job.status == JobStatus::Completed {
log::error!("Invalid state exists on DL queue: {}", job.status.to_string());
return Ok(());
}
// We assume that a Failure status wil only show up if the message is sent twice from a queue
// Can return silently because it's already been processed.
else if job.status == JobStatus::Failed {
return Ok(());
}

metadata.insert("last_job_status".to_string(), job.status.to_string());
job.metadata = metadata;
job.status = JobStatus::Failed;

config.database().update_job(&job).await?;

Ok(())
}

async fn get_job(id: Uuid) -> Result<JobItem> {
let config = config().await;
let job = config.database().get_job_by_id(id).await?;
Expand Down
15 changes: 14 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,35 @@ pub enum JobType {
StateTransition,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd, strum_macros::Display)]
pub enum JobStatus {
/// An acknowledgement that the job has been received by the
/// orchestrator and is waiting to be processed
#[strum(to_string = "Created")]
Created,
/// Some system has taken a lock over the job for processing and no
/// other system to process the job
#[strum(to_string = "Locked for Processing")]
LockedForProcessing,
/// The job has been processed and is pending verification
#[strum(to_string = "Pending Verification")]
PendingVerification,
/// The job has been processed and verified. No other actions needs to be taken
#[strum(to_string = "Completed")]
Completed,
/// The job was processed but the was unable to be verified under the given time
#[strum(to_string = "Verification Timeout")]
VerificationTimeout,
/// The job failed processing
#[strum(to_string = "Verification Failed")]
VerificationFailed,
/// The job failed completing
#[strum(to_string = "Failed")]
Failed,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
41 changes: 21 additions & 20 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use tracing::log;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::{process_job, verify_job};
use crate::jobs::{handle_job_failure, process_job, verify_job};

pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";
// Below is the Data Letter Queue for the the above two jobs.
pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failure_queue";

#[derive(Debug, Serialize, Deserialize)]
pub struct JobQueueMessage {
Expand Down Expand Up @@ -68,26 +70,25 @@ where
Ok(())
}

pub async fn init_consumers() -> Result<()> {
// TODO: figure out a way to generalize this
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_PROCESSING_QUEUE.to_string(), process_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_PROCESSING_QUEUE, e),
}
sleep(Duration::from_secs(1)).await;
}
});
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_VERIFICATION_QUEUE.to_string(), verify_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_VERIFICATION_QUEUE, e),
macro_rules! spawn_consumer {
($queue_type :expr, $handler : expr) => {
tokio::spawn(async move {
loop {
match consume_job_from_queue($queue_type, $handler).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
}
sleep(Duration::from_secs(1)).await;
}
sleep(Duration::from_secs(1)).await;
}
});
});
};
}

pub async fn init_consumers() -> Result<()> {
spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job);
spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job);
spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure);

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ impl TestConfigBuilder {
self.storage.unwrap(),
);

drop_database().await.unwrap();

config_force_init(config).await;

server
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn test_database_create_job(#[future] get_config: Guard<Arc<Config>>) -> c
// Test Util Functions
// ==========================================

fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
pub fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64) -> JobItem {
JobItem {
id: Uuid::new_v4(),
internal_id: internal_id.to_string(),
Expand Down
107 changes: 103 additions & 4 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use rstest::rstest;

use crate::config::config;
use crate::jobs::handle_job_failure;
use crate::jobs::types::JobType;
use crate::{jobs::types::JobStatus, tests::config::TestConfigBuilder};

use super::database::build_job_item;

#[cfg(test)]
pub mod da_job;

Expand All @@ -15,18 +24,15 @@ use std::time::Duration;
use mockall::predicate::eq;
use mongodb::bson::doc;
use omniqueue::QueueError;
use rstest::rstest;
use tokio::time::sleep;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::types::{ExternalId, JobItem, JobVerificationStatus};
use crate::jobs::{create_job, increment_key_in_metadata, process_job, verify_job, Job, MockJob};
use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE};
use crate::tests::common::MessagePayloadType;
use crate::tests::config::TestConfigBuilder;

/// Tests `create_job` function when job is not existing in the db.
#[rstest]
Expand Down Expand Up @@ -501,3 +507,96 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i
version: 0,
}
}

#[rstest]
#[case(JobType::DataSubmission, JobStatus::Completed)] // code should panic here, how can completed move to dl queue ?
#[case(JobType::SnosRun, JobStatus::PendingVerification)]
#[case(JobType::ProofCreation, JobStatus::LockedForProcessing)]
#[case(JobType::ProofRegistration, JobStatus::Created)]
#[case(JobType::StateTransition, JobStatus::Completed)]
#[case(JobType::ProofCreation, JobStatus::VerificationTimeout)]
#[case(JobType::DataSubmission, JobStatus::VerificationFailed)]
#[tokio::test]
async fn handle_job_failure_with_failed_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) {
TestConfigBuilder::new().build().await;
let config = config().await;
let database_client = config.database();
let internal_id = 1;

// create a job, with already available "last_job_status"
let mut job_expected = build_job_item(job_type.clone(), JobStatus::Failed, internal_id);
let mut job_metadata = job_expected.metadata.clone();
job_metadata.insert("last_job_status".to_string(), job_status.to_string());
job_expected.metadata = job_metadata.clone();

let job_id = job_expected.id;

// feeding the job to DB
database_client.create_job(job_expected.clone()).await.unwrap();

// calling handle_job_failure
handle_job_failure(job_id).await.expect("handle_job_failure failed to run");

let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap();

assert_eq!(job_fetched, job_expected);
}

#[rstest]
#[case::pending_verification(JobType::SnosRun, JobStatus::PendingVerification)]
#[case::verification_timeout(JobType::SnosRun, JobStatus::VerificationTimeout)]
#[tokio::test]
async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobType, #[case] job_status: JobStatus) {
TestConfigBuilder::new().build().await;
let config = config().await;
let database_client = config.database();
let internal_id = 1;

// create a job
let job = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let job_id = job.id;

// feeding the job to DB
database_client.create_job(job.clone()).await.unwrap();

// calling handle_job_failure
handle_job_failure(job_id).await.expect("handle_job_failure failed to run");

let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap();

// creating expected output
let mut job_expected = job.clone();
let mut job_metadata = job_expected.metadata.clone();
job_metadata.insert("last_job_status".to_string(), job_status.to_string());
job_expected.metadata = job_metadata.clone();
job_expected.status = JobStatus::Failed;

assert_eq!(job_fetched, job_expected);
}

#[rstest]
#[case(JobType::DataSubmission)]
#[tokio::test]
async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType) {
let job_status = JobStatus::Completed;

TestConfigBuilder::new().build().await;
let config = config().await;
let database_client = config.database();
let internal_id = 1;

// create a job
let job_expected = build_job_item(job_type.clone(), job_status.clone(), internal_id);
let job_id = job_expected.id;

// feeding the job to DB
database_client.create_job(job_expected.clone()).await.unwrap();

// calling handle_job_failure
handle_job_failure(job_id).await.expect("Test call to handle_job_failure should have passed.");

// The completed job status on db is untouched.
let job_fetched = config.database().get_job_by_id(job_id).await.expect("Unable to fetch Job Data").unwrap();

assert_eq!(job_fetched, job_expected);
}

0 comments on commit 73355ca

Please sign in to comment.