From 07bcb0b23e804ac91b44ffbb223a80611ba68ca0 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 1 Dec 2023 23:32:48 +0900 Subject: [PATCH] Various bugfix: (#4224) - the scaling down of pipeline was broken when dealing with one small index. It would remove the one pipeline systematically. - various log fixes --- quickwit/quickwit-common/src/lib.rs | 22 ++++ .../src/control_plane.rs | 2 +- .../src/indexing_scheduler/scheduling/mod.rs | 113 ++++++++++++------ .../quickwit-ingest/src/ingest_v2/ingester.rs | 4 +- .../quickwit-proto/src/types/pipeline_uid.rs | 8 +- 5 files changed, 109 insertions(+), 40 deletions(-) diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index eac5881219b..74abc5a4ca1 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -176,6 +176,17 @@ where T: Debug } } +#[inline] +pub const fn div_ceil_u32(lhs: u32, rhs: u32) -> u32 { + let d = lhs / rhs; + let r = lhs % rhs; + if r > 0 { + d + 1 + } else { + d + } +} + #[inline] pub const fn div_ceil(lhs: i64, rhs: i64) -> i64 { let d = lhs / rhs; @@ -268,4 +279,15 @@ mod tests { assert_eq!(div_ceil(-5, -2), 3); assert_eq!(div_ceil(-6, -2), 3); } + + #[test] + fn test_div_ceil_u32() { + assert_eq!(div_ceil_u32(5, 1), 5); + assert_eq!(div_ceil_u32(5, 2), 3); + assert_eq!(div_ceil_u32(6, 2), 3); + assert_eq!(div_ceil_u32(3, 3), 1); + assert_eq!(div_ceil_u32(2, 3), 1); + assert_eq!(div_ceil_u32(1, 3), 1); + assert_eq!(div_ceil_u32(0, 3), 0); + } } diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 3cc934edc46..da7f9c6eda8 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -129,7 +129,7 @@ impl Actor for ControlPlane { self.model .load_from_metastore(&mut self.metastore, ctx.progress()) .await - .context("failed to intialize the model")?; + .context("failed to initialize the model")?; self.indexing_scheduler .schedule_indexing_plan_if_needed(&self.model); diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index e1a724d484d..c94be5f5d2f 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -36,7 +36,7 @@ use crate::indexing_scheduler::scheduling::scheduling_logic_model::{ /// If we have several pipelines below this threshold we /// reduce the number of pipelines. /// -/// Note that even for 2 pipelines, this creates an hysteris effet. +/// Note that even for 2 pipelines, this creates an hysteris effect. /// /// Starting from a single pipeline. /// An overall load above 80% is enough to trigger the creation of a @@ -44,7 +44,7 @@ use crate::indexing_scheduler::scheduling::scheduling_logic_model::{ /// /// Coming back to a single pipeline requires having a load per pipeline /// of 30%. Which translates into an overall load of 60%. -const CPU_PER_PIPELINE_LOAD_THRESHOLD: CpuCapacity = CpuCapacity::from_cpu_millis(1_200); +const CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD: CpuCapacity = CpuCapacity::from_cpu_millis(1_200); /// That's 80% of a period const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200); @@ -188,8 +188,10 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( load_per_shard, } => { // For the moment we do something voluntarily suboptimal. - let max_num_pipelines = (shard_ids.len() as u32) * load_per_shard.get() - / CPU_PER_PIPELINE_LOAD_THRESHOLD.cpu_millis(); + let max_num_pipelines = quickwit_common::div_ceil_u32( + shard_ids.len() as u32 * load_per_shard.get(), + CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD.cpu_millis(), + ); if previous_tasks.len() > max_num_pipelines as usize { previous_tasks = &previous_tasks[..max_num_pipelines as usize]; } @@ -209,9 +211,6 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( .take(max_shard_in_pipeline) .collect(); remaining_num_shards_to_schedule_on_node -= shard_ids.len() as u32; - if remaining_num_shards_to_schedule_on_node == 0 { - break; - } let new_task = IndexingTask { index_uid: previous_task.index_uid.clone(), source_id: previous_task.source_id.clone(), @@ -219,6 +218,9 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( shard_ids, }; new_tasks.push(new_task); + if remaining_num_shards_to_schedule_on_node == 0 { + break; + } } new_tasks } @@ -295,6 +297,12 @@ fn convert_scheduling_solution_to_physical_plan_single_node( tasks } +fn pick_indexer(capacity_per_node: &[(String, u32)]) -> impl Iterator { + capacity_per_node.iter().flat_map(|(node_id, capacity)| { + std::iter::repeat(node_id.as_str()).take(*capacity as usize) + }) +} + /// This function takes a scheduling solution (which abstracts the notion of pipelines, /// and shard ids) and builds a physical plan, attempting to make as little change as possible /// to the existing pipelines. @@ -329,6 +337,8 @@ fn convert_scheduling_solution_to_physical_plan( } } + // We still need to do some extra work for sharded sources: assign missing shards, and possibly + // adding extra pipelines. for source in sources { let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type else { continue; @@ -347,45 +357,41 @@ fn convert_scheduling_solution_to_physical_plan( indexing_task.shard_ids.retain(|&shard| { let shard_added = scheduled_shards.insert(shard); if shard_added { + true + } else { error!( "this should never happen. shard was allocated into two pipelines." ); - true - } else { false } }); - num_shards_for_indexer_source -= 1; + num_shards_for_indexer_source -= indexing_task.shard_ids.len() as u32; } } remaining_capacity_per_node.push((indexer.to_string(), num_shards_for_indexer_source)); } - // Missing shards is the list of shards that is not scheduled into a pipeline yet. - let missing_shards: Vec = shard_ids + // Missing shards is an iterator over the shards that are not scheduled into a pipeline yet. + let missing_shards = shard_ids .iter() .filter(|shard_id| !scheduled_shards.contains(shard_id)) - .copied() - .collect(); + .copied(); + + // Let's assign the missing shards. - // Let's allocate the missing shards. // TODO that's the logic that has to change. Eventually we want to remove shards that // were previously allocated and create new shards to replace them. let max_shard_per_pipeline = compute_max_num_shards_per_pipeline(&source.source_type); - for missing_shard in missing_shards { - let (last_indexer_str, remaining_shard) = - remaining_capacity_per_node.last_mut().unwrap(); - *remaining_shard -= 1; + for (missing_shard, indexer_str) in + missing_shards.zip(pick_indexer(&remaining_capacity_per_node)) + { add_shard_to_indexer( missing_shard, - last_indexer_str, + indexer_str, &source.source_uid, max_shard_per_pipeline, &mut new_physical_plan, ); - if *remaining_shard == 0 { - remaining_capacity_per_node.pop(); - } } } @@ -689,8 +695,10 @@ mod tests { &indexer_id_to_cpu_capacities, Some(&indexing_plan), ); - let indexing_tasks = new_plan.indexer(NODE).unwrap(); - indexing_tasks.to_vec() + let mut indexing_tasks = new_plan.indexer(NODE).unwrap().to_vec(); + // We sort indexing tasks for normalization purpose + indexing_tasks.sort_by_key(|task| task.shard_ids[0]); + indexing_tasks } #[test] @@ -728,24 +736,25 @@ mod tests { ); assert_eq!(indexing_tasks_2.len(), 3); assert_eq!(&indexing_tasks_2[0].shard_ids, &[0, 1, 2, 3, 4]); - assert_eq!(&indexing_tasks_2[1].shard_ids, &[7]); - assert_eq!(&indexing_tasks_2[2].shard_ids, &[5, 6, 8, 9, 10]); + assert_eq!(&indexing_tasks_2[1].shard_ids, &[5, 6, 8, 9, 10]); + assert_eq!(&indexing_tasks_2[2].shard_ids, &[7]); + // Now the load comes back to normal // The hysteresis takes effect. We do not switch back to 2 pipelines. - let pipeline_tasks2: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_1 + let pipeline_tasks_2: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_2 .iter() .map(|task| (task.pipeline_uid(), &task.shard_ids[..])) .collect(); + assert_eq!(indexing_tasks_2.len(), 3); let indexing_tasks_3 = group_shards_into_pipelines_aux( &source_uid, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - &pipeline_tasks2, + &pipeline_tasks_2, mcpu(400), ); - assert_eq!(indexing_tasks_3.len(), 2); - assert_eq!(&indexing_tasks_3[0].shard_ids, &[0, 1, 2, 3, 4, 5, 6, 7,]); - assert_eq!(&indexing_tasks_3[1].shard_ids, &[8, 9, 10]); - let pipeline_tasks3: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_2 + assert_eq!(&indexing_tasks_3, &indexing_tasks_2); + + let pipeline_tasks3: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_3 .iter() .map(|task| (task.pipeline_uid(), &task.shard_ids[..])) .collect(); @@ -754,10 +763,42 @@ mod tests { &source_uid, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &pipeline_tasks3, - mcpu(320), + mcpu(200), ); assert_eq!(indexing_tasks_4.len(), 2); - assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 1, 2, 3, 4, 10]); - assert_eq!(&indexing_tasks_4[1].shard_ids, &[5, 6, 7, 8, 9]); + assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 1, 2, 3, 4, 7]); + assert_eq!(&indexing_tasks_4[1].shard_ids, &[5, 6, 8, 9, 10]); + } + + /// We want to make sure for small pipelines, we still reschedule them with the same + /// pipeline uid. + #[test] + fn test_group_shards_into_pipeline_single_small_pipeline() { + let source_uid = source_id(); + let pipeline_uid = PipelineUid::from_u128(1u128); + let indexing_tasks = group_shards_into_pipelines_aux( + &source_uid, + &[12], + &[(pipeline_uid, &[12])], + mcpu(100), + ); + assert_eq!(indexing_tasks.len(), 1); + let indexing_task = &indexing_tasks[0]; + assert_eq!(&indexing_task.shard_ids, &[12]); + assert_eq!(indexing_task.pipeline_uid.unwrap(), pipeline_uid); + } + + #[test] + fn test_pick_indexer_for_shard() { + let indexer_capacity = vec![ + ("node1".to_string(), 1), + ("node2".to_string(), 0), + ("node3".to_string(), 2), + ("node4".to_string(), 2), + ("node5".to_string(), 0), + ("node6".to_string(), 0), + ]; + let indexers: Vec<&str> = super::pick_indexer(&indexer_capacity).collect(); + assert_eq!(indexers, &["node1", "node3", "node3", "node4", "node4"]); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index c22fba85af4..fe4d071175a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -48,7 +48,7 @@ use quickwit_proto::ingest::ingester::{ use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; use quickwit_proto::types::{NodeId, Position, QueueId}; use tokio::sync::{watch, RwLock}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use super::fetch::FetchStreamTask; use super::models::IngesterShard; @@ -468,7 +468,7 @@ impl IngesterService for Ingester { .expect("rate limiter should be initialized"); if !rate_limiter.acquire_bytes(requested_capacity) { - warn!("failed to persist records to shard `{queue_id}`: rate limited"); + debug!("failed to persist records to shard `{queue_id}`: rate limited"); let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, diff --git a/quickwit/quickwit-proto/src/types/pipeline_uid.rs b/quickwit/quickwit-proto/src/types/pipeline_uid.rs index 279e0249ed5..e571dedd9e0 100644 --- a/quickwit/quickwit-proto/src/types/pipeline_uid.rs +++ b/quickwit/quickwit-proto/src/types/pipeline_uid.rs @@ -27,9 +27,15 @@ use ulid::Ulid; const ULID_SIZE: usize = 16; /// A pipeline uid identify an indexing pipeline and an indexing task. -#[derive(Debug, Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] pub struct PipelineUid(Ulid); +impl std::fmt::Debug for PipelineUid { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "Pipeline({})", self.0) + } +} + impl PipelineUid { pub fn from_u128(ulid_u128: u128) -> PipelineUid { PipelineUid(Ulid::from_bytes(ulid_u128.to_le_bytes()))