diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 94f5a62464d..b6a71cf2544 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -3491,6 +3491,8 @@ mod tests { #[tokio::test] async fn test_ingester_closes_idle_shards() { + // The `CloseIdleShardsTask` task is already unit tested, so this test ensures the task is + // correctly spawned upon starting an ingester. let idle_shard_timeout = Duration::from_millis(200); let (_ingester_ctx, ingester) = IngesterForTest::default() .with_idle_shard_timeout(idle_shard_timeout) @@ -3499,7 +3501,6 @@ mod tests { let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); - let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2)); let doc_mapping_uid = DocMappingUid::random(); let doc_mapping_json = format!( @@ -3515,14 +3516,6 @@ mod tests { doc_mapping_uid: Some(doc_mapping_uid), ..Default::default() }; - let shard_02 = Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - shard_state: ShardState::Closed as i32, - doc_mapping_uid: Some(doc_mapping_uid), - ..Default::default() - }; let mut state_guard = ingester.state.lock_fully().await.unwrap(); let now = Instant::now(); @@ -3536,32 +3529,21 @@ mod tests { ) .await .unwrap(); - ingester - .init_primary_shard( - &mut state_guard.inner, - &mut state_guard.mrecordlog, - shard_02, - &doc_mapping_json, - now, - ) - .await - .unwrap(); + drop(state_guard); - tokio::time::sleep(Duration::from_millis(100)).await; // 2 times the run interval period of the close idle shards task + for _ in 0..10 { + tokio::time::sleep(Duration::from_millis(100)).await; - let state_guard = ingester.state.lock_partially().await.unwrap(); - state_guard - .shards - .get(&queue_id_01) - .unwrap() - .assert_is_closed(); - state_guard - .shards - .get(&queue_id_02) - .unwrap() - .assert_is_open(); - drop(state_guard); + let state_guard = ingester.state.lock_partially().await.unwrap(); + let shard = state_guard.shards.get(&queue_id_01).unwrap(); + + if shard.is_closed() { + return; + } + drop(state_guard); + } + panic!("idle shard was not closed"); } #[tokio::test]