From 075be7ab2635b174db80d026cbb57780c8b7d9ef Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 8 Jul 2024 16:41:26 +0900 Subject: [PATCH] Emitting both a short term average and a long term average of shard throughput. Scaling up relies on the short term average in order to rapidly react to a change in throughput, while scaling down and the indexing scheduler relies on the long term average. --- .../src/indexing_scheduler/mod.rs | 2 +- .../src/ingest/ingest_controller.rs | 88 ++++--- .../quickwit-control-plane/src/ingest/mod.rs | 1 + .../src/ingest/scaling_arbiter.rs | 119 ++++++++++ .../src/model/shard_table.rs | 63 +++-- .../src/ingest_v2/broadcast.rs | 223 ++++++++++++++---- .../quickwit-ingest/src/ingest_v2/ingester.rs | 2 +- .../quickwit-ingest/src/ingest_v2/router.rs | 6 +- 8 files changed, 400 insertions(+), 104 deletions(-) create mode 100644 quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 110e08a5139..a177931adf3 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -144,7 +144,7 @@ fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { let num_shards = shard_entries.len().max(1) as u64; let average_throughput_per_shard_bytes: u64 = shard_entries .iter() - .map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB) + .map(|shard_entry| shard_entry.long_term_ingestion_rate.0 as u64 * bytesize::MIB) .sum::() .div_ceil(num_shards) // A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 10b774841d1..25185da878e 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -55,6 +55,7 @@ use tokio::task::JoinHandle; use tracing::{debug, enabled, error, info, warn, Level}; use ulid::Ulid; +use super::scaling_arbiter::ScalingArbiter; use crate::control_plane::ControlPlane; use crate::ingest::wait_handle::WaitHandle; use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats}; @@ -102,10 +103,7 @@ pub struct IngestController { // This lock ensures that only one rebalance operation is performed at a time. rebalance_lock: Arc>, pub stats: IngestControllerStats, - // Threshold in MiB/s below which we decrease the number of shards. - scale_down_shards_threshold_mib_per_sec: f32, - // Threshold in MiB/s above which we increase the number of shards. - scale_up_shards_threshold_mib_per_sec: f32, + scaling_arbiter: ScalingArbiter, } impl fmt::Debug for IngestController { @@ -192,9 +190,9 @@ impl IngestController { replication_factor, rebalance_lock: Arc::new(Mutex::new(())), stats: IngestControllerStats::default(), - scale_up_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec * 0.8, - scale_down_shards_threshold_mib_per_sec: max_shard_ingestion_throughput_mib_per_sec - * 0.2, + scaling_arbiter: ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec( + max_shard_ingestion_throughput_mib_per_sec, + ), } } @@ -291,20 +289,31 @@ impl IngestController { &local_shards_update.source_uid, &local_shards_update.shard_infos, ); - if shard_stats.avg_ingestion_rate >= self.scale_up_shards_threshold_mib_per_sec { - self.try_scale_up_shards(local_shards_update.source_uid, shard_stats, model, progress) + let Some(scaling_mode) = self.scaling_arbiter.should_scale(shard_stats) else { + return Ok(()); + }; + + match scaling_mode { + ScalingMode::Up => { + self.try_scale_up_shards( + local_shards_update.source_uid, + shard_stats, + model, + progress, + ) .await?; - } else if shard_stats.avg_ingestion_rate <= self.scale_down_shards_threshold_mib_per_sec - && shard_stats.num_open_shards > 1 - { - self.try_scale_down_shards( - local_shards_update.source_uid, - shard_stats, - model, - progress, - ) - .await?; + } + ScalingMode::Down => { + self.try_scale_down_shards( + local_shards_update.source_uid, + shard_stats, + model, + progress, + ) + .await?; + } } + Ok(()) } @@ -1111,8 +1120,8 @@ fn find_scale_down_candidate( *num_shards += 1; if shard - .ingestion_rate - .cmp(&candidate.ingestion_rate) + .long_term_ingestion_rate + .cmp(&candidate.long_term_ingestion_rate) .then_with(|| shard.shard_id.cmp(&candidate.shard_id)) .is_gt() { @@ -2082,13 +2091,14 @@ mod tests { let shard_entries: Vec = model.all_shards().cloned().collect(); assert_eq!(shard_entries.len(), 1); - assert_eq!(shard_entries[0].ingestion_rate, 0); + assert_eq!(shard_entries[0].short_term_ingestion_rate, 0); // Test update shard ingestion rate but no scale down because num open shards is 1. let shard_infos = BTreeSet::from_iter([ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(1), + short_term_ingestion_rate: RateMibPerSec(1), + long_term_ingestion_rate: RateMibPerSec(1), }]); let local_shards_update = LocalShardsUpdate { leader_id: "test-ingester".into(), @@ -2103,7 +2113,7 @@ mod tests { let shard_entries: Vec = model.all_shards().cloned().collect(); assert_eq!(shard_entries.len(), 1); - assert_eq!(shard_entries[0].ingestion_rate, 1); + assert_eq!(shard_entries[0].short_term_ingestion_rate, 1); // Test update shard ingestion rate with failing scale down. let shards = vec![Shard { @@ -2155,12 +2165,14 @@ mod tests { ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(1), + short_term_ingestion_rate: RateMibPerSec(1), + long_term_ingestion_rate: RateMibPerSec(1), }, ShardInfo { shard_id: ShardId::from(2), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(1), + short_term_ingestion_rate: RateMibPerSec(1), + long_term_ingestion_rate: RateMibPerSec(1), }, ]); let local_shards_update = LocalShardsUpdate { @@ -2178,12 +2190,14 @@ mod tests { ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(4), + short_term_ingestion_rate: RateMibPerSec(4), + long_term_ingestion_rate: RateMibPerSec(4), }, ShardInfo { shard_id: ShardId::from(2), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(4), + short_term_ingestion_rate: RateMibPerSec(4), + long_term_ingestion_rate: RateMibPerSec(4), }, ]); let local_shards_update = LocalShardsUpdate { @@ -2544,32 +2558,38 @@ mod tests { ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: quickwit_ingest::RateMibPerSec(1), + short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(1), + long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(1), }, ShardInfo { shard_id: ShardId::from(2), shard_state: ShardState::Open, - ingestion_rate: quickwit_ingest::RateMibPerSec(2), + short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(2), + long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(2), }, ShardInfo { shard_id: ShardId::from(3), shard_state: ShardState::Open, - ingestion_rate: quickwit_ingest::RateMibPerSec(3), + short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(3), + long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(3), }, ShardInfo { shard_id: ShardId::from(4), shard_state: ShardState::Open, - ingestion_rate: quickwit_ingest::RateMibPerSec(4), + short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(4), + long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(4), }, ShardInfo { shard_id: ShardId::from(5), shard_state: ShardState::Open, - ingestion_rate: quickwit_ingest::RateMibPerSec(5), + short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(5), + long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(5), }, ShardInfo { shard_id: ShardId::from(6), shard_state: ShardState::Open, - ingestion_rate: quickwit_ingest::RateMibPerSec(6), + short_term_ingestion_rate: quickwit_ingest::RateMibPerSec(6), + long_term_ingestion_rate: quickwit_ingest::RateMibPerSec(6), }, ]); model.update_shards(&source_uid, &shard_infos); diff --git a/quickwit/quickwit-control-plane/src/ingest/mod.rs b/quickwit/quickwit-control-plane/src/ingest/mod.rs index a9ec456403d..9336482befd 100644 --- a/quickwit/quickwit-control-plane/src/ingest/mod.rs +++ b/quickwit/quickwit-control-plane/src/ingest/mod.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . pub(crate) mod ingest_controller; +mod scaling_arbiter; mod wait_handle; pub use ingest_controller::IngestController; diff --git a/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs new file mode 100644 index 00000000000..0bd4961f125 --- /dev/null +++ b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs @@ -0,0 +1,119 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::model::{ScalingMode, ShardStats}; + +pub(crate) struct ScalingArbiter { + // Threshold in MiB/s below which we decrease the number of shards. + scale_down_shards_threshold_mib_per_sec: f32, + // Threshold in MiB/s above which we increase the number of shards. + scale_up_shards_short_term_threshold_mib_per_sec: f32, + scale_up_shards_long_term_threshold_mib_per_sec: f32, +} + +impl ScalingArbiter { + pub fn with_max_shard_ingestion_throughput_mib_per_sec( + max_shard_throughput_mib_per_sec: f32, + ) -> ScalingArbiter { + ScalingArbiter { + scale_up_shards_short_term_threshold_mib_per_sec: max_shard_throughput_mib_per_sec + * 0.8f32, + scale_up_shards_long_term_threshold_mib_per_sec: max_shard_throughput_mib_per_sec + * 0.3f32, + scale_down_shards_threshold_mib_per_sec: max_shard_throughput_mib_per_sec * 0.2f32, + } + } + + pub(crate) fn should_scale(&self, shard_stats: ShardStats) -> Option { + // We scale up based on the short term threshold to scale up more aggressively. + if shard_stats.avg_short_term_ingestion_rate + >= self.scale_up_shards_short_term_threshold_mib_per_sec + { + let long_term_ingestion_rate_after_scale_up = shard_stats.avg_long_term_ingestion_rate + * (shard_stats.num_open_shards as f32) + / (shard_stats.num_open_shards as f32 + 1.0f32); + if long_term_ingestion_rate_after_scale_up + >= self.scale_up_shards_long_term_threshold_mib_per_sec + { + return Some(ScalingMode::Up); + } + } + + // On the other hand, we scale down based on the long term ingestion rate, to avoid + // scaling down just due to a very short drop in ingestion + if shard_stats.avg_long_term_ingestion_rate <= self.scale_down_shards_threshold_mib_per_sec + && shard_stats.num_open_shards > 1 + { + return Some(ScalingMode::Down); + } + + None + } +} + +#[cfg(test)] +mod tests { + use super::ScalingArbiter; + use crate::model::{ScalingMode, ShardStats}; + + #[test] + fn test_scaling_arbiter() { + let scaling_arbiter = ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(10.0); + assert_eq!( + scaling_arbiter.should_scale(ShardStats { + num_open_shards: 1, + avg_short_term_ingestion_rate: 5.0, + avg_long_term_ingestion_rate: 6.0, + }), + None + ); + assert_eq!( + scaling_arbiter.should_scale(ShardStats { + num_open_shards: 1, + avg_short_term_ingestion_rate: 8.1, + avg_long_term_ingestion_rate: 8.1, + }), + Some(ScalingMode::Up) + ); + assert_eq!( + scaling_arbiter.should_scale(ShardStats { + num_open_shards: 2, + avg_short_term_ingestion_rate: 3.0, + avg_long_term_ingestion_rate: 1.5, + }), + Some(ScalingMode::Down) + ); + assert_eq!( + scaling_arbiter.should_scale(ShardStats { + num_open_shards: 1, + avg_short_term_ingestion_rate: 3.0, + avg_long_term_ingestion_rate: 1.5, + }), + None, + ); + assert_eq!( + scaling_arbiter.should_scale(ShardStats { + num_open_shards: 1, + avg_short_term_ingestion_rate: 8.0f32, + avg_long_term_ingestion_rate: 3.0f32, + }), + None, + ); + } +} diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 8eb75e2ccf2..f07f93839c1 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -44,7 +44,7 @@ const SCALING_DOWN_RATE_LIMITER_SETTINGS: RateLimiterSettings = RateLimiterSetti refill_period: Duration::from_secs(60), }; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum ScalingMode { Up, Down, @@ -53,7 +53,8 @@ pub(crate) enum ScalingMode { #[derive(Debug, Clone)] pub(crate) struct ShardEntry { pub shard: Shard, - pub ingestion_rate: RateMibPerSec, + pub short_term_ingestion_rate: RateMibPerSec, + pub long_term_ingestion_rate: RateMibPerSec, } impl Deref for ShardEntry { @@ -74,7 +75,8 @@ impl From for ShardEntry { fn from(shard: Shard) -> Self { Self { shard, - ingestion_rate: RateMibPerSec::default(), + short_term_ingestion_rate: RateMibPerSec::default(), + long_term_ingestion_rate: RateMibPerSec::default(), } } } @@ -449,18 +451,21 @@ impl ShardTable { shard_infos: &ShardInfos, ) -> ShardStats { let mut num_open_shards = 0; - let mut ingestion_rate_sum = RateMibPerSec::default(); + let mut short_term_ingestion_rate_sum = RateMibPerSec::default(); + let mut long_term_ingestion_rate_sum = RateMibPerSec::default(); if let Some(table_entry) = self.table_entries.get_mut(source_uid) { for shard_info in shard_infos { let ShardInfo { shard_id, shard_state, - ingestion_rate, + short_term_ingestion_rate, + long_term_ingestion_rate, } = shard_info; if let Some(shard_entry) = table_entry.shard_entries.get_mut(shard_id) { - shard_entry.ingestion_rate = *ingestion_rate; + shard_entry.short_term_ingestion_rate = *short_term_ingestion_rate; + shard_entry.long_term_ingestion_rate = *long_term_ingestion_rate; // `ShardInfos` are broadcasted via Chitchat and eventually consistent. As a // result, we can only trust the `Closed` state, which is final. if shard_state.is_closed() { @@ -471,19 +476,27 @@ impl ShardTable { for shard_entry in table_entry.shard_entries.values() { if shard_entry.is_open() { num_open_shards += 1; - ingestion_rate_sum += shard_entry.ingestion_rate; + short_term_ingestion_rate_sum += shard_entry.short_term_ingestion_rate; + long_term_ingestion_rate_sum += shard_entry.long_term_ingestion_rate; } } } - let avg_ingestion_rate = if num_open_shards > 0 { - ingestion_rate_sum.0 as f32 / num_open_shards as f32 + let avg_short_term_ingestion_rate = if num_open_shards > 0 { + short_term_ingestion_rate_sum.0 as f32 / num_open_shards as f32 + } else { + 0.0 + }; + + let avg_long_term_ingestion_rate = if num_open_shards > 0 { + long_term_ingestion_rate_sum.0 as f32 / num_open_shards as f32 } else { 0.0 }; ShardStats { num_open_shards, - avg_ingestion_rate, + avg_short_term_ingestion_rate, + avg_long_term_ingestion_rate, } } @@ -574,7 +587,8 @@ impl ShardTable { #[derive(Clone, Copy, Default)] pub(crate) struct ShardStats { pub num_open_shards: usize, - pub avg_ingestion_rate: f32, + pub avg_short_term_ingestion_rate: f32, + pub avg_long_term_ingestion_rate: f32, } #[cfg(test)] @@ -865,32 +879,39 @@ mod tests { ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(1), + short_term_ingestion_rate: RateMibPerSec(1), + long_term_ingestion_rate: RateMibPerSec(1), }, ShardInfo { shard_id: ShardId::from(2), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(2), + short_term_ingestion_rate: RateMibPerSec(2), + long_term_ingestion_rate: RateMibPerSec(2), }, ShardInfo { shard_id: ShardId::from(3), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(3), + short_term_ingestion_rate: RateMibPerSec(3), + long_term_ingestion_rate: RateMibPerSec(3), }, ShardInfo { shard_id: ShardId::from(4), shard_state: ShardState::Closed, - ingestion_rate: RateMibPerSec(4), + short_term_ingestion_rate: RateMibPerSec(4), + long_term_ingestion_rate: RateMibPerSec(4), }, ShardInfo { shard_id: ShardId::from(5), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(5), + short_term_ingestion_rate: RateMibPerSec(5), + long_term_ingestion_rate: RateMibPerSec(5), }, ]); let shard_stats = shard_table.update_shards(&source_uid, &shard_infos); assert_eq!(shard_stats.num_open_shards, 2); - assert_eq!(shard_stats.avg_ingestion_rate, 1.5); + assert_eq!(shard_stats.avg_short_term_ingestion_rate, 1.5); + + assert_eq!(shard_stats.avg_short_term_ingestion_rate, 1.5); let shard_entries: Vec = shard_table .get_shards(&source_uid) @@ -903,22 +924,22 @@ mod tests { assert_eq!(shard_entries[0].shard.shard_id(), ShardId::from(1)); assert_eq!(shard_entries[0].shard.shard_state(), ShardState::Open); - assert_eq!(shard_entries[0].ingestion_rate, RateMibPerSec(1)); + assert_eq!(shard_entries[0].short_term_ingestion_rate, RateMibPerSec(1)); assert_eq!(shard_entries[1].shard.shard_id(), ShardId::from(2)); assert_eq!(shard_entries[1].shard.shard_state(), ShardState::Open); - assert_eq!(shard_entries[1].ingestion_rate, RateMibPerSec(2)); + assert_eq!(shard_entries[1].short_term_ingestion_rate, RateMibPerSec(2)); assert_eq!(shard_entries[2].shard.shard_id(), ShardId::from(3)); assert_eq!( shard_entries[2].shard.shard_state(), ShardState::Unavailable ); - assert_eq!(shard_entries[2].ingestion_rate, RateMibPerSec(3)); + assert_eq!(shard_entries[2].short_term_ingestion_rate, RateMibPerSec(3)); assert_eq!(shard_entries[3].shard.shard_id(), ShardId::from(4)); assert_eq!(shard_entries[3].shard.shard_state(), ShardState::Closed); - assert_eq!(shard_entries[3].ingestion_rate, RateMibPerSec(4)); + assert_eq!(shard_entries[3].short_term_ingestion_rate, RateMibPerSec(4)); } #[test] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 36b91e8fced..8c4502b8270 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::time::Duration; use bytesize::ByteSize; @@ -25,7 +25,7 @@ use quickwit_cluster::{Cluster, ListenerHandle}; use quickwit_common::pubsub::{Event, EventBroker}; use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator}; -use quickwit_common::tower::Rate; +use quickwit_common::tower::{ConstantRate, Rate}; use quickwit_proto::ingest::ShardState; use quickwit_proto::types::{split_queue_id, NodeId, QueueId, ShardId, SourceUid}; use serde::{Deserialize, Serialize, Serializer}; @@ -50,16 +50,20 @@ pub struct ShardInfo { pub shard_id: ShardId, pub shard_state: ShardState, /// Shard ingestion rate in MiB/s. - pub ingestion_rate: RateMibPerSec, + /// Short term ingestion rate. It is measured over a short period of time. + pub short_term_ingestion_rate: RateMibPerSec, + /// Long term ingestion rate. It is measured over a larger period of time. + pub long_term_ingestion_rate: RateMibPerSec, } impl Serialize for ShardInfo { fn serialize(&self, serializer: S) -> Result { serializer.serialize_str(&format!( - "{}:{}:{}", + "{}:{}:{}:{}", self.shard_id, self.shard_state.as_json_str_name(), - self.ingestion_rate.0, + self.short_term_ingestion_rate.0, + self.long_term_ingestion_rate.0, )) } } @@ -81,7 +85,14 @@ impl<'de> Deserialize<'de> for ShardInfo { let shard_state = ShardState::from_json_str_name(shard_state_str) .ok_or_else(|| serde::de::Error::custom("invalid shard state"))?; - let ingestion_rate = parts + let short_term_ingestion_rate = parts + .next() + .ok_or_else(|| serde::de::Error::custom("invalid shard info"))? + .parse::() + .map(RateMibPerSec) + .map_err(|_| serde::de::Error::custom("invalid shard ingestion rate"))?; + + let long_term_ingestion_rate = parts .next() .ok_or_else(|| serde::de::Error::custom("invalid shard info"))? .parse::() @@ -91,7 +102,8 @@ impl<'de> Deserialize<'de> for ShardInfo { Ok(Self { shard_id, shard_state, - ingestion_rate, + short_term_ingestion_rate, + long_term_ingestion_rate, }) } } @@ -148,6 +160,101 @@ impl LocalShardsSnapshot { pub(super) struct BroadcastLocalShardsTask { cluster: Cluster, weak_state: WeakIngesterState, + shard_throughput_time_series: ShardThroughputTimeSeries, +} + +const SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN: usize = 12; + +#[derive(Default)] +struct ShardThroughputTimeSeries { + shard_time_series: HashMap<(SourceUid, ShardId), ShardThroughputTimeSerie>, +} + +impl ShardThroughputTimeSeries { + // Records a list of shard throughputs. + // + // A new time series is created for each new shard_ids. + // If a shard_id had a time series, and it is not present in the + // `shard_throughput`, the time series will be removed. + #[allow(clippy::mutable_key_type)] + pub fn record_shard_throughputs( + &mut self, + shard_throughputs: HashMap<(SourceUid, ShardId), (ShardState, ConstantRate)>, + ) { + self.shard_time_series + .retain(|key, _| shard_throughputs.contains_key(key)); + for ((source_uid, shard_id), (shard_state, throughput)) in shard_throughputs { + let throughput_measurement = throughput.rescale(Duration::from_secs(1)).work_bytes(); + let shard_time_serie = self + .shard_time_series + .entry((source_uid.clone(), shard_id.clone())) + .or_default(); + shard_time_serie.shard_state = shard_state; + shard_time_serie.record(throughput_measurement); + } + } + + pub fn get_per_source_shard_infos(&self) -> BTreeMap { + let mut per_source_shard_infos: BTreeMap = BTreeMap::new(); + for ((source_uid, shard_id), shard_time_series) in self.shard_time_series.iter() { + let shard_state = shard_time_series.shard_state; + let short_term_ingestion_rate_mib_per_sec_u64 = + shard_time_series.last().as_u64() / ONE_MIB.as_u64(); + let short_term_ingestion_rate = + RateMibPerSec(short_term_ingestion_rate_mib_per_sec_u64 as u16); + let long_term_ingestion_rate_mib_per_sec_u64 = + shard_time_series.average().as_u64() / ONE_MIB.as_u64(); + let long_term_ingestion_rate = + RateMibPerSec(long_term_ingestion_rate_mib_per_sec_u64 as u16); + let shard_info = ShardInfo { + shard_id: shard_id.clone(), + shard_state, + short_term_ingestion_rate, + long_term_ingestion_rate, + }; + per_source_shard_infos + .entry(source_uid.clone()) + .or_default() + .insert(shard_info); + } + per_source_shard_infos + } +} + +#[derive(Default)] +struct ShardThroughputTimeSerie { + shard_state: ShardState, + measurements: [ByteSize; SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN], + len: usize, +} + +impl ShardThroughputTimeSerie { + fn last(&self) -> ByteSize { + self.measurements.last().copied().unwrap_or_default() + } + + fn average(&self) -> ByteSize { + if self.len == 0 { + return ByteSize::default(); + } + let sum = self + .measurements + .iter() + .rev() + .take(self.len) + .map(ByteSize::as_u64) + .sum::(); + ByteSize::b(sum / self.len as u64) + } + + fn record(&mut self, new_throughput_measurement: ByteSize) { + self.len = (self.len + 1).min(SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN); + self.measurements.rotate_left(1); + let Some(last_measurement) = self.measurements.last_mut() else { + return; + }; + *last_measurement = new_throughput_measurement; + } } impl BroadcastLocalShardsTask { @@ -155,17 +262,17 @@ impl BroadcastLocalShardsTask { let mut broadcaster = Self { cluster, weak_state, + shard_throughput_time_series: Default::default(), }; tokio::spawn(async move { broadcaster.run().await }) } - async fn snapshot_local_shards(&self) -> Option { + async fn snapshot_local_shards(&mut self) -> Option { let state = self.weak_state.upgrade()?; let Ok(mut state_guard) = state.lock_partially().await else { return Some(LocalShardsSnapshot::default()); }; - let mut per_source_shard_infos: BTreeMap = BTreeMap::new(); let queue_ids: Vec<(QueueId, ShardState)> = state_guard .shards @@ -182,36 +289,35 @@ impl BroadcastLocalShardsTask { let mut num_open_shards = 0; let mut num_closed_shards = 0; - for (queue_id, shard_state) in queue_ids { - let Some((_rate_limiter, rate_meter)) = state_guard.rate_trackers.get_mut(&queue_id) - else { - warn!( - "rate limiter `{queue_id}` not found: this should never happen, please report" - ); - continue; - }; - let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else { - continue; - }; - let source_uid = SourceUid { - index_uid, - source_id, - }; - // Shard ingestion rate in MiB/s. - let ingestion_rate_per_sec = rate_meter.harvest().rescale(Duration::from_secs(1)); - let ingestion_rate_mib_per_sec_u64 = ingestion_rate_per_sec.work() / ONE_MIB.as_u64(); - let ingestion_rate = RateMibPerSec(ingestion_rate_mib_per_sec_u64 as u16); + #[allow(clippy::mutable_key_type)] + let ingestion_rates: HashMap<(SourceUid, ShardId), (ShardState, ConstantRate)> = queue_ids + .iter() + .flat_map(|(queue_id, shard_state)| { + let Some((_rate_limiter, rate_meter)) = state_guard.rate_trackers.get_mut(queue_id) + else { + warn!( + "rate limiter `{queue_id}` not found: this should never happen, please \ + report" + ); + return None; + }; + let (index_uid, source_id, shard_id) = split_queue_id(queue_id)?; + let source_uid = SourceUid { + index_uid, + source_id, + }; + // Shard ingestion rate in MiB/s. + Some(((source_uid, shard_id), (*shard_state, rate_meter.harvest()))) + }) + .collect(); + + self.shard_throughput_time_series + .record_shard_throughputs(ingestion_rates); + + let per_source_shard_infos = self + .shard_throughput_time_series + .get_per_source_shard_infos(); - let shard_info = ShardInfo { - shard_id, - shard_state, - ingestion_rate, - }; - per_source_shard_infos - .entry(source_uid) - .or_default() - .insert(shard_info); - } for shard_infos in per_source_shard_infos.values() { for shard_info in shard_infos { match shard_info.shard_state { @@ -349,10 +455,11 @@ mod tests { let shard_info = ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(42), + short_term_ingestion_rate: RateMibPerSec(42), + long_term_ingestion_rate: RateMibPerSec(40), }; let serialized = serde_json::to_string(&shard_info).unwrap(); - assert_eq!(serialized, r#""00000000000000000001:open:42""#); + assert_eq!(serialized, r#""00000000000000000001:open:42:40""#); let deserialized = serde_json::from_str::(&serialized).unwrap(); assert_eq!(deserialized, shard_info); @@ -376,7 +483,8 @@ mod tests { vec![ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(42), + short_term_ingestion_rate: RateMibPerSec(42), + long_term_ingestion_rate: RateMibPerSec(42), }] .into_iter() .collect(), @@ -416,7 +524,8 @@ mod tests { vec![ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Closed, - ingestion_rate: RateMibPerSec(42), + short_term_ingestion_rate: RateMibPerSec(42), + long_term_ingestion_rate: RateMibPerSec(42), }] .into_iter() .collect(), @@ -469,9 +578,10 @@ mod tests { .unwrap(); let (_temp_dir, state) = IngesterState::for_test().await; let weak_state = state.weak(); - let task = BroadcastLocalShardsTask { + let mut task = BroadcastLocalShardsTask { cluster, weak_state, + shard_throughput_time_series: Default::default(), }; let previous_snapshot = task.snapshot_local_shards().await.unwrap(); assert!(previous_snapshot.per_source_shard_infos.is_empty()); @@ -592,7 +702,7 @@ mod tests { let shard_info = event.shard_infos.iter().next().unwrap(); assert_eq!(shard_info.shard_id, ShardId::from(1)); assert_eq!(shard_info.shard_state, ShardState::Open); - assert_eq!(shard_info.ingestion_rate, 42u16); + assert_eq!(shard_info.short_term_ingestion_rate, 42u16); }) .forever(); @@ -608,7 +718,8 @@ mod tests { let value = serde_json::to_string(&vec![ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(42), + short_term_ingestion_rate: RateMibPerSec(42), + long_term_ingestion_rate: RateMibPerSec(42), }]) .unwrap(); @@ -617,4 +728,26 @@ mod tests { assert_eq!(local_shards_update_counter.load(Ordering::Acquire), 1); } + + #[test] + fn test_shard_throughput_time_serie() { + let mut time_serie = ShardThroughputTimeSerie::default(); + assert_eq!(time_serie.last(), ByteSize::mb(0)); + assert_eq!(time_serie.average(), ByteSize::mb(0)); + time_serie.record(ByteSize::mb(2)); + assert_eq!(time_serie.last(), ByteSize::mb(2)); + assert_eq!(time_serie.average(), ByteSize::mb(2)); + time_serie.record(ByteSize::mb(1)); + assert_eq!(time_serie.last(), ByteSize::mb(1)); + assert_eq!(time_serie.average(), ByteSize::kb(1500)); + time_serie.record(ByteSize::mb(3)); + assert_eq!(time_serie.last(), ByteSize::mb(3)); + assert_eq!(time_serie.average(), ByteSize::mb(2)); + for _ in 0..SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN { + time_serie.record(ByteSize::mb(4)); + assert_eq!(time_serie.last(), ByteSize::mb(4)); + } + + assert_eq!(time_serie.last(), ByteSize::mb(4)); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 163a9f5cab4..591849516c2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -1574,7 +1574,7 @@ mod tests { let shard_info = shard_infos.iter().next().unwrap(); assert_eq!(shard_info.shard_id, ShardId::from(1)); assert_eq!(shard_info.shard_state, ShardState::Open); - assert_eq!(shard_info.ingestion_rate, 0); + assert_eq!(shard_info.short_term_ingestion_rate, 0); let mut state_guard = ingester.state.lock_fully().await.unwrap(); state_guard diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 4eaf7803efb..4249e04be67 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -1665,12 +1665,14 @@ mod tests { ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Closed, - ingestion_rate: RateMibPerSec(0), + short_term_ingestion_rate: RateMibPerSec(0), + long_term_ingestion_rate: RateMibPerSec(0), }, ShardInfo { shard_id: ShardId::from(2), shard_state: ShardState::Open, - ingestion_rate: RateMibPerSec(0), + short_term_ingestion_rate: RateMibPerSec(0), + long_term_ingestion_rate: RateMibPerSec(0), }, ]), };