diff --git a/crates/integ/tests/api_tests.rs b/crates/integ/tests/api_tests.rs index dbc630e84..b5391fefb 100644 --- a/crates/integ/tests/api_tests.rs +++ b/crates/integ/tests/api_tests.rs @@ -17,11 +17,14 @@ use tracing::info; async fn wait_for_state( client: &Client, + run_id: Option, pipeline_id: &str, expected_state: &str, -) -> anyhow::Result<()> { +) -> anyhow::Result { let mut last_state = "None".to_string(); - while last_state != expected_state { + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + let jobs = client .get_pipeline_jobs() .id(pipeline_id) @@ -30,6 +33,10 @@ async fn wait_for_state( .unwrap(); let job = jobs.data.first().unwrap(); + if Some(job.run_id) == run_id { + continue; + } + let state = job.state.clone(); if last_state != state { info!("Job transitioned to {}", state); @@ -40,10 +47,10 @@ async fn wait_for_state( bail!("Job transitioned to failed"); } - tokio::time::sleep(Duration::from_millis(100)).await; + if last_state == expected_state { + return Ok(job.run_id); + } } - - Ok(()) } fn get_client() -> Arc { @@ -66,8 +73,8 @@ fn get_client() -> Arc { .clone() } -async fn start_pipeline(run_id: u32, query: &str, udfs: &[&str]) -> anyhow::Result { - let pipeline_name = format!("pipeline_{}", run_id); +async fn start_pipeline(test_id: u32, query: &str, udfs: &[&str]) -> anyhow::Result { + let pipeline_name = format!("pipeline_{}", test_id); info!("Creating pipeline {}", pipeline_name); let pipeline_id = get_client() @@ -94,21 +101,21 @@ async fn start_pipeline(run_id: u32, query: &str, udfs: &[&str]) -> anyhow::Resu } async fn start_and_monitor( - run_id: u32, + test_id: u32, query: &str, udfs: &[&str], checkpoints_to_wait: u32, -) -> anyhow::Result<(String, String)> { +) -> anyhow::Result<(String, String, i64)> { let api_client = get_client(); println!("Starting pipeline"); - let pipeline_id = start_pipeline(run_id, query, udfs) + let pipeline_id = start_pipeline(test_id, query, udfs) .await .expect("failed to start pipeline"); // wait for job to enter running phase println!("Waiting until running"); - wait_for_state(&api_client, &pipeline_id, "Running") + let run_id = wait_for_state(&api_client, None, &pipeline_id, "Running") .await .unwrap(); @@ -151,7 +158,7 @@ async fn start_and_monitor( assert!(!details.data.is_empty()); - return Ok((pipeline_id, job.id.clone())); + return Ok((pipeline_id, job.id.clone(), run_id)); } } @@ -161,9 +168,10 @@ async fn start_and_monitor( async fn patch_and_wait( pipeline_id: &str, + run_id: Option, body: builder::PipelinePatch, expected_state: &str, -) -> anyhow::Result<()> { +) -> anyhow::Result { println!("Patching with {:?}", body); get_client() .patch_pipeline() @@ -173,9 +181,7 @@ async fn patch_and_wait( .await?; println!("Waiting for {}", expected_state); - wait_for_state(&get_client(), pipeline_id, expected_state).await?; - - Ok(()) + wait_for_state(&get_client(), run_id, pipeline_id, expected_state).await } #[tokio::test] @@ -184,8 +190,8 @@ async fn basic_pipeline() { // create a source println!("Creating source"); - let run_id: u32 = random(); - let source_name = format!("source_{}", run_id); + let test_id: u32 = random(); + let source_name = format!("source_{}", test_id); let source_id = api_client .create_connection_table() @@ -222,7 +228,7 @@ async fn basic_pipeline() { assert_eq!(valid.errors, Vec::::new()); assert!(valid.graph.is_some()); - let (pipeline_id, job_id) = start_and_monitor(run_id, &query, &[], 10).await.unwrap(); + let (pipeline_id, job_id, _) = start_and_monitor(test_id, &query, &[], 10).await.unwrap(); // get error messages let errors = api_client @@ -268,8 +274,9 @@ async fn basic_pipeline() { } // stop job - patch_and_wait( + let run_id = patch_and_wait( &pipeline_id, + None, PipelinePatch::builder().stop(StopType::Checkpoint), "Stopped", ) @@ -277,8 +284,9 @@ async fn basic_pipeline() { .unwrap(); // start job - patch_and_wait( + let run_id = patch_and_wait( &pipeline_id, + Some(run_id), PipelinePatch::builder().stop(StopType::None), "Running", ) @@ -287,8 +295,9 @@ async fn basic_pipeline() { // rescale job println!("Rescaling pipeline"); - patch_and_wait( + let run_id = patch_and_wait( &pipeline_id, + Some(run_id), PipelinePatch::builder().parallelism(2), "Running", ) @@ -317,13 +326,14 @@ async fn basic_pipeline() { .await .unwrap(); - wait_for_state(&api_client, &pipeline_id, "Running") + wait_for_state(&api_client, Some(run_id), &pipeline_id, "Running") .await .unwrap(); // stop job patch_and_wait( &pipeline_id, + None, PipelinePatch::builder().stop(StopType::Immediate), "Stopped", ) @@ -387,11 +397,12 @@ select my_double(cast(counter as bigint)) from impulse; let run_id: u32 = random(); - let (pipeline_id, _job_id) = start_and_monitor(run_id, query, &[udf], 3).await.unwrap(); + let (pipeline_id, _job_id, _) = start_and_monitor(run_id, query, &[udf], 3).await.unwrap(); // stop job patch_and_wait( &pipeline_id, + None, PipelinePatch::builder().stop(StopType::Checkpoint), "Stopped", ) @@ -601,7 +612,7 @@ async fn connection_table() { ]) ); - let (pipeline_id, _) = start_and_monitor( + let (pipeline_id, _, _) = start_and_monitor( run_id, &format!("select * from {};", connection_table.name), &[], @@ -613,6 +624,7 @@ async fn connection_table() { // stop job patch_and_wait( &pipeline_id, + None, PipelinePatch::builder().stop(StopType::Immediate), "Stopped", )