Skip to content

Commit

Permalink
Fix race condition in integ test that was leading to frequent failuers
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 30, 2024
1 parent 135e845 commit 560fb5b
Showing 1 changed file with 37 additions and 25 deletions.
62 changes: 37 additions & 25 deletions crates/integ/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ use tracing::info;

async fn wait_for_state(
client: &Client,
run_id: Option<i64>,
pipeline_id: &str,
expected_state: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<i64> {
let mut last_state = "None".to_string();
while last_state != expected_state {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;

let jobs = client
.get_pipeline_jobs()
.id(pipeline_id)
Expand All @@ -30,6 +33,10 @@ async fn wait_for_state(
.unwrap();
let job = jobs.data.first().unwrap();

if Some(job.run_id) == run_id {
continue;
}

let state = job.state.clone();
if last_state != state {
info!("Job transitioned to {}", state);
Expand All @@ -40,10 +47,10 @@ async fn wait_for_state(
bail!("Job transitioned to failed");
}

tokio::time::sleep(Duration::from_millis(100)).await;
if last_state == expected_state {
return Ok(job.run_id);
}
}

Ok(())
}

fn get_client() -> Arc<Client> {
Expand All @@ -66,8 +73,8 @@ fn get_client() -> Arc<Client> {
.clone()
}

async fn start_pipeline(run_id: u32, query: &str, udfs: &[&str]) -> anyhow::Result<String> {
let pipeline_name = format!("pipeline_{}", run_id);
async fn start_pipeline(test_id: u32, query: &str, udfs: &[&str]) -> anyhow::Result<String> {
let pipeline_name = format!("pipeline_{}", test_id);
info!("Creating pipeline {}", pipeline_name);

let pipeline_id = get_client()
Expand All @@ -94,21 +101,21 @@ async fn start_pipeline(run_id: u32, query: &str, udfs: &[&str]) -> anyhow::Resu
}

async fn start_and_monitor(
run_id: u32,
test_id: u32,
query: &str,
udfs: &[&str],
checkpoints_to_wait: u32,
) -> anyhow::Result<(String, String)> {
) -> anyhow::Result<(String, String, i64)> {
let api_client = get_client();

println!("Starting pipeline");
let pipeline_id = start_pipeline(run_id, query, udfs)
let pipeline_id = start_pipeline(test_id, query, udfs)
.await
.expect("failed to start pipeline");

// wait for job to enter running phase
println!("Waiting until running");
wait_for_state(&api_client, &pipeline_id, "Running")
let run_id = wait_for_state(&api_client, None, &pipeline_id, "Running")
.await
.unwrap();

Expand Down Expand Up @@ -151,7 +158,7 @@ async fn start_and_monitor(

assert!(!details.data.is_empty());

return Ok((pipeline_id, job.id.clone()));
return Ok((pipeline_id, job.id.clone(), run_id));
}
}

Expand All @@ -161,9 +168,10 @@ async fn start_and_monitor(

async fn patch_and_wait(
pipeline_id: &str,
run_id: Option<i64>,
body: builder::PipelinePatch,
expected_state: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<i64> {
println!("Patching with {:?}", body);
get_client()
.patch_pipeline()
Expand All @@ -173,9 +181,7 @@ async fn patch_and_wait(
.await?;

println!("Waiting for {}", expected_state);
wait_for_state(&get_client(), pipeline_id, expected_state).await?;

Ok(())
wait_for_state(&get_client(), run_id, pipeline_id, expected_state).await
}

#[tokio::test]
Expand All @@ -184,8 +190,8 @@ async fn basic_pipeline() {

// create a source
println!("Creating source");
let run_id: u32 = random();
let source_name = format!("source_{}", run_id);
let test_id: u32 = random();
let source_name = format!("source_{}", test_id);

let source_id = api_client
.create_connection_table()
Expand Down Expand Up @@ -222,7 +228,7 @@ async fn basic_pipeline() {
assert_eq!(valid.errors, Vec::<String>::new());
assert!(valid.graph.is_some());

let (pipeline_id, job_id) = start_and_monitor(run_id, &query, &[], 10).await.unwrap();
let (pipeline_id, job_id, _) = start_and_monitor(test_id, &query, &[], 10).await.unwrap();

// get error messages
let errors = api_client
Expand Down Expand Up @@ -268,17 +274,19 @@ async fn basic_pipeline() {
}

// stop job
patch_and_wait(
let run_id = patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Checkpoint),
"Stopped",
)
.await
.unwrap();

// start job
patch_and_wait(
let run_id = patch_and_wait(
&pipeline_id,
Some(run_id),
PipelinePatch::builder().stop(StopType::None),
"Running",
)
Expand All @@ -287,8 +295,9 @@ async fn basic_pipeline() {

// rescale job
println!("Rescaling pipeline");
patch_and_wait(
let run_id = patch_and_wait(
&pipeline_id,
Some(run_id),
PipelinePatch::builder().parallelism(2),
"Running",
)
Expand Down Expand Up @@ -317,13 +326,14 @@ async fn basic_pipeline() {
.await
.unwrap();

wait_for_state(&api_client, &pipeline_id, "Running")
wait_for_state(&api_client, Some(run_id), &pipeline_id, "Running")
.await
.unwrap();

// stop job
patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Immediate),
"Stopped",
)
Expand Down Expand Up @@ -387,11 +397,12 @@ select my_double(cast(counter as bigint)) from impulse;

let run_id: u32 = random();

let (pipeline_id, _job_id) = start_and_monitor(run_id, query, &[udf], 3).await.unwrap();
let (pipeline_id, _job_id, _) = start_and_monitor(run_id, query, &[udf], 3).await.unwrap();

// stop job
patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Checkpoint),
"Stopped",
)
Expand Down Expand Up @@ -601,7 +612,7 @@ async fn connection_table() {
])
);

let (pipeline_id, _) = start_and_monitor(
let (pipeline_id, _, _) = start_and_monitor(
run_id,
&format!("select * from {};", connection_table.name),
&[],
Expand All @@ -613,6 +624,7 @@ async fn connection_table() {
// stop job
patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Immediate),
"Stopped",
)
Expand Down

0 comments on commit 560fb5b

Please sign in to comment.