Skip to content

Commit

Permalink
Various bugfix: (#4224)
Browse files Browse the repository at this point in the history
- the scaling down of pipeline was broken when dealing with one small
index. It would remove the one pipeline systematically.
- various log fixes
  • Loading branch information
fulmicoton authored Dec 1, 2023
1 parent 546df3d commit 07bcb0b
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 40 deletions.
22 changes: 22 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ where T: Debug
}
}

#[inline]
pub const fn div_ceil_u32(lhs: u32, rhs: u32) -> u32 {
let d = lhs / rhs;
let r = lhs % rhs;
if r > 0 {
d + 1
} else {
d
}
}

#[inline]
pub const fn div_ceil(lhs: i64, rhs: i64) -> i64 {
let d = lhs / rhs;
Expand Down Expand Up @@ -268,4 +279,15 @@ mod tests {
assert_eq!(div_ceil(-5, -2), 3);
assert_eq!(div_ceil(-6, -2), 3);
}

#[test]
fn test_div_ceil_u32() {
assert_eq!(div_ceil_u32(5, 1), 5);
assert_eq!(div_ceil_u32(5, 2), 3);
assert_eq!(div_ceil_u32(6, 2), 3);
assert_eq!(div_ceil_u32(3, 3), 1);
assert_eq!(div_ceil_u32(2, 3), 1);
assert_eq!(div_ceil_u32(1, 3), 1);
assert_eq!(div_ceil_u32(0, 3), 0);
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Actor for ControlPlane {
self.model
.load_from_metastore(&mut self.metastore, ctx.progress())
.await
.context("failed to intialize the model")?;
.context("failed to initialize the model")?;

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ use crate::indexing_scheduler::scheduling::scheduling_logic_model::{
/// If we have several pipelines below this threshold we
/// reduce the number of pipelines.
///
/// Note that even for 2 pipelines, this creates an hysteris effet.
/// Note that even for 2 pipelines, this creates an hysteris effect.
///
/// Starting from a single pipeline.
/// An overall load above 80% is enough to trigger the creation of a
/// second pipeline.
///
/// Coming back to a single pipeline requires having a load per pipeline
/// of 30%. Which translates into an overall load of 60%.
const CPU_PER_PIPELINE_LOAD_THRESHOLD: CpuCapacity = CpuCapacity::from_cpu_millis(1_200);
const CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD: CpuCapacity = CpuCapacity::from_cpu_millis(1_200);

/// That's 80% of a period
const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200);
Expand Down Expand Up @@ -188,8 +188,10 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
load_per_shard,
} => {
// For the moment we do something voluntarily suboptimal.
let max_num_pipelines = (shard_ids.len() as u32) * load_per_shard.get()
/ CPU_PER_PIPELINE_LOAD_THRESHOLD.cpu_millis();
let max_num_pipelines = quickwit_common::div_ceil_u32(
shard_ids.len() as u32 * load_per_shard.get(),
CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD.cpu_millis(),
);
if previous_tasks.len() > max_num_pipelines as usize {
previous_tasks = &previous_tasks[..max_num_pipelines as usize];
}
Expand All @@ -209,16 +211,16 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
.take(max_shard_in_pipeline)
.collect();
remaining_num_shards_to_schedule_on_node -= shard_ids.len() as u32;
if remaining_num_shards_to_schedule_on_node == 0 {
break;
}
let new_task = IndexingTask {
index_uid: previous_task.index_uid.clone(),
source_id: previous_task.source_id.clone(),
pipeline_uid: previous_task.pipeline_uid,
shard_ids,
};
new_tasks.push(new_task);
if remaining_num_shards_to_schedule_on_node == 0 {
break;
}
}
new_tasks
}
Expand Down Expand Up @@ -295,6 +297,12 @@ fn convert_scheduling_solution_to_physical_plan_single_node(
tasks
}

fn pick_indexer(capacity_per_node: &[(String, u32)]) -> impl Iterator<Item = &str> {
capacity_per_node.iter().flat_map(|(node_id, capacity)| {
std::iter::repeat(node_id.as_str()).take(*capacity as usize)
})
}

/// This function takes a scheduling solution (which abstracts the notion of pipelines,
/// and shard ids) and builds a physical plan, attempting to make as little change as possible
/// to the existing pipelines.
Expand Down Expand Up @@ -329,6 +337,8 @@ fn convert_scheduling_solution_to_physical_plan(
}
}

// We still need to do some extra work for sharded sources: assign missing shards, and possibly
// adding extra pipelines.
for source in sources {
let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type else {
continue;
Expand All @@ -347,45 +357,41 @@ fn convert_scheduling_solution_to_physical_plan(
indexing_task.shard_ids.retain(|&shard| {
let shard_added = scheduled_shards.insert(shard);
if shard_added {
true
} else {
error!(
"this should never happen. shard was allocated into two pipelines."
);
true
} else {
false
}
});
num_shards_for_indexer_source -= 1;
num_shards_for_indexer_source -= indexing_task.shard_ids.len() as u32;
}
}
remaining_capacity_per_node.push((indexer.to_string(), num_shards_for_indexer_source));
}

// Missing shards is the list of shards that is not scheduled into a pipeline yet.
let missing_shards: Vec<ShardId> = shard_ids
// Missing shards is an iterator over the shards that are not scheduled into a pipeline yet.
let missing_shards = shard_ids
.iter()
.filter(|shard_id| !scheduled_shards.contains(shard_id))
.copied()
.collect();
.copied();

// Let's assign the missing shards.

// Let's allocate the missing shards.
// TODO that's the logic that has to change. Eventually we want to remove shards that
// were previously allocated and create new shards to replace them.
let max_shard_per_pipeline = compute_max_num_shards_per_pipeline(&source.source_type);
for missing_shard in missing_shards {
let (last_indexer_str, remaining_shard) =
remaining_capacity_per_node.last_mut().unwrap();
*remaining_shard -= 1;
for (missing_shard, indexer_str) in
missing_shards.zip(pick_indexer(&remaining_capacity_per_node))
{
add_shard_to_indexer(
missing_shard,
last_indexer_str,
indexer_str,
&source.source_uid,
max_shard_per_pipeline,
&mut new_physical_plan,
);
if *remaining_shard == 0 {
remaining_capacity_per_node.pop();
}
}
}

Expand Down Expand Up @@ -689,8 +695,10 @@ mod tests {
&indexer_id_to_cpu_capacities,
Some(&indexing_plan),
);
let indexing_tasks = new_plan.indexer(NODE).unwrap();
indexing_tasks.to_vec()
let mut indexing_tasks = new_plan.indexer(NODE).unwrap().to_vec();
// We sort indexing tasks for normalization purpose
indexing_tasks.sort_by_key(|task| task.shard_ids[0]);
indexing_tasks
}

#[test]
Expand Down Expand Up @@ -728,24 +736,25 @@ mod tests {
);
assert_eq!(indexing_tasks_2.len(), 3);
assert_eq!(&indexing_tasks_2[0].shard_ids, &[0, 1, 2, 3, 4]);
assert_eq!(&indexing_tasks_2[1].shard_ids, &[7]);
assert_eq!(&indexing_tasks_2[2].shard_ids, &[5, 6, 8, 9, 10]);
assert_eq!(&indexing_tasks_2[1].shard_ids, &[5, 6, 8, 9, 10]);
assert_eq!(&indexing_tasks_2[2].shard_ids, &[7]);

// Now the load comes back to normal
// The hysteresis takes effect. We do not switch back to 2 pipelines.
let pipeline_tasks2: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_1
let pipeline_tasks_2: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_2
.iter()
.map(|task| (task.pipeline_uid(), &task.shard_ids[..]))
.collect();
assert_eq!(indexing_tasks_2.len(), 3);
let indexing_tasks_3 = group_shards_into_pipelines_aux(
&source_uid,
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
&pipeline_tasks2,
&pipeline_tasks_2,
mcpu(400),
);
assert_eq!(indexing_tasks_3.len(), 2);
assert_eq!(&indexing_tasks_3[0].shard_ids, &[0, 1, 2, 3, 4, 5, 6, 7,]);
assert_eq!(&indexing_tasks_3[1].shard_ids, &[8, 9, 10]);
let pipeline_tasks3: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_2
assert_eq!(&indexing_tasks_3, &indexing_tasks_2);

let pipeline_tasks3: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_3
.iter()
.map(|task| (task.pipeline_uid(), &task.shard_ids[..]))
.collect();
Expand All @@ -754,10 +763,42 @@ mod tests {
&source_uid,
&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
&pipeline_tasks3,
mcpu(320),
mcpu(200),
);
assert_eq!(indexing_tasks_4.len(), 2);
assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 1, 2, 3, 4, 10]);
assert_eq!(&indexing_tasks_4[1].shard_ids, &[5, 6, 7, 8, 9]);
assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 1, 2, 3, 4, 7]);
assert_eq!(&indexing_tasks_4[1].shard_ids, &[5, 6, 8, 9, 10]);
}

/// We want to make sure for small pipelines, we still reschedule them with the same
/// pipeline uid.
#[test]
fn test_group_shards_into_pipeline_single_small_pipeline() {
let source_uid = source_id();
let pipeline_uid = PipelineUid::from_u128(1u128);
let indexing_tasks = group_shards_into_pipelines_aux(
&source_uid,
&[12],
&[(pipeline_uid, &[12])],
mcpu(100),
);
assert_eq!(indexing_tasks.len(), 1);
let indexing_task = &indexing_tasks[0];
assert_eq!(&indexing_task.shard_ids, &[12]);
assert_eq!(indexing_task.pipeline_uid.unwrap(), pipeline_uid);
}

#[test]
fn test_pick_indexer_for_shard() {
let indexer_capacity = vec![
("node1".to_string(), 1),
("node2".to_string(), 0),
("node3".to_string(), 2),
("node4".to_string(), 2),
("node5".to_string(), 0),
("node6".to_string(), 0),
];
let indexers: Vec<&str> = super::pick_indexer(&indexer_capacity).collect();
assert_eq!(indexers, &["node1", "node3", "node3", "node4", "node4"]);
}
}
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use quickwit_proto::ingest::ingester::{
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState};
use quickwit_proto::types::{NodeId, Position, QueueId};
use tokio::sync::{watch, RwLock};
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

use super::fetch::FetchStreamTask;
use super::models::IngesterShard;
Expand Down Expand Up @@ -468,7 +468,7 @@ impl IngesterService for Ingester {
.expect("rate limiter should be initialized");

if !rate_limiter.acquire_bytes(requested_capacity) {
warn!("failed to persist records to shard `{queue_id}`: rate limited");
debug!("failed to persist records to shard `{queue_id}`: rate limited");

let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-proto/src/types/pipeline_uid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ use ulid::Ulid;
const ULID_SIZE: usize = 16;

/// A pipeline uid identify an indexing pipeline and an indexing task.
#[derive(Debug, Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct PipelineUid(Ulid);

impl std::fmt::Debug for PipelineUid {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "Pipeline({})", self.0)
}
}

impl PipelineUid {
pub fn from_u128(ulid_u128: u128) -> PipelineUid {
PipelineUid(Ulid::from_bytes(ulid_u128.to_le_bytes()))
Expand Down

0 comments on commit 07bcb0b

Please sign in to comment.