From 68519508d2e12c8edcb7a4c4f95f2d5f8daeb5ae Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 13 Oct 2023 12:32:31 +0900 Subject: [PATCH] Extending the control plane model (#3938) * The control plane model now offers a concise view of the Metastore. The indexing scheduler query the metastore at all. Other refactoring We rely on SourceUid more broadly in the code. The unit test were using IndexUid in a broken way in places. This is now fixed. This PR introduces a LogicalIndexTask. The idea is to have it hold all information necessary for us to do the placement. Right now this means only the max pipeline per index parameter (allowing us to remove the ugly lookup in the source config map). In the next PR we will most likely also break the 1:1 relationship between LogicalIndexTask and IndexTask. The idea would be to have a single logical index task, and possibly break it into several IndexTask. This PR also cleans up the hack the populates the list of indexing task with ingest v2 tasks by mutating the list of tasks --- .../quickwit-config/src/source_config/mod.rs | 1 + .../src/control_plane.rs | 55 +-- .../src/control_plane_model.rs | 209 ++++++--- .../src/indexing_plan.rs | 395 +++++++++--------- .../{scheduler.rs => indexing_scheduler.rs} | 230 ++++------ .../src/ingest/ingest_controller.rs | 47 ++- quickwit/quickwit-control-plane/src/lib.rs | 11 +- quickwit/quickwit-control-plane/src/tests.rs | 8 +- .../src/metastore/index_metadata/mod.rs | 10 +- quickwit/quickwit-proto/src/types.rs | 5 +- 10 files changed, 524 insertions(+), 447 deletions(-) rename quickwit/quickwit-control-plane/src/{scheduler.rs => indexing_scheduler.rs} (71%) diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 895abf6c0b2..0c632371c31 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -43,6 +43,7 @@ pub const CLI_INGEST_SOURCE_ID: &str = "_ingest-cli-source"; pub const INGEST_API_SOURCE_ID: &str = "_ingest-api-source"; /// Reserved source ID used for native Quickwit ingest. +/// (this is for ingest v2) pub const INGEST_SOURCE_ID: &str = "_ingest-source"; pub const RESERVED_SOURCE_IDS: &[&str] = diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 30d200363cf..7a5e6ff4793 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -27,7 +27,7 @@ use quickwit_actors::{ }; use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_ingest::IngesterPool; -use quickwit_metastore::Metastore; +use quickwit_metastore::{IndexMetadata, Metastore}; use quickwit_proto::control_plane::{ ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, @@ -42,8 +42,8 @@ use serde::Serialize; use tracing::error; use crate::control_plane_model::{ControlPlaneModel, ControlPlaneModelMetrics}; +use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState}; use crate::ingest::IngestController; -use crate::scheduler::{IndexingScheduler, IndexingSchedulerState}; use crate::IndexerPool; /// Interval between two controls (or checks) of the desired plan VS running plan. @@ -126,15 +126,14 @@ impl Actor for ControlPlane { self.model .load_from_metastore(&*self.metastore, ctx.progress()) .await - .context("Failed to intiialize the model")?; + .context("failed to intialize the model")?; if let Err(error) = self .indexing_scheduler - .schedule_indexing_plan_if_needed() - .await + .schedule_indexing_plan_if_needed(&self.model) { // TODO inspect error. - error!("Error when scheduling indexing plan: `{}`.", error); + error!("error when scheduling indexing plan: `{}`.", error); } ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) @@ -153,7 +152,7 @@ impl Handler for ControlPlane { _message: ControlPlanLoop, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - if let Err(error) = self.indexing_scheduler.control_running_plan().await { + if let Err(error) = self.indexing_scheduler.control_running_plan(&self.model) { error!("error when controlling the running plan: `{}`", error); } ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) @@ -176,7 +175,7 @@ fn convert_metastore_error( ) -> Result, ActorExitStatus> { // If true, we know that the transactions has not been recorded in the Metastore. // If false, we simply are not sure whether the transaction has been recorded or not. - let metastore_failure_is_certain = match &metastore_error { + let is_transaction_certainly_aborted = match &metastore_error { MetastoreError::AlreadyExists(_) | MetastoreError::FailedPrecondition { .. } | MetastoreError::Forbidden { .. } @@ -190,15 +189,16 @@ fn convert_metastore_error( | MetastoreError::Connection { .. } | MetastoreError::Db { .. } => false, }; - if metastore_failure_is_certain { - // If the metastore failure is certain, this is actually a good thing. + if is_transaction_certainly_aborted { + // If the metastore transaction is certain to have been aborted, + // this is actually a good thing. // We do not need to restart the control plane. - error!(err=?metastore_error, transaction_outcome="certainly-failed", "metastore-error: The transaction certainly failed. We do not need to restart the control plane."); + error!(err=?metastore_error, transaction_outcome="aborted", "metastore error"); Ok(Err(ControlPlaneError::Metastore(metastore_error))) } else { - // If the metastore failure is uncertain, we need to restart the control plane + // If the metastore transaction may have been executed, we need to restart the control plane // so that it gets resynced with the metastore state. - error!(err=?metastore_error, transaction_outcome="uncertain", "metastore-error: Transaction outcome is uncertain. Restarting control plane."); + error!(err=?metastore_error, transaction_outcome="maybe-executed", "metastore error"); Err(ActorExitStatus::from(anyhow::anyhow!(metastore_error))) } } @@ -222,14 +222,17 @@ impl Handler for ControlPlane { } }; - let index_uid = match self.metastore.create_index(index_config).await { + let index_uid = match self.metastore.create_index(index_config.clone()).await { Ok(index_uid) => index_uid, Err(metastore_error) => { return convert_metastore_error(metastore_error); } }; - self.model.add_index(index_uid.clone()); + let index_metadata: IndexMetadata = + IndexMetadata::new_with_index_uid(index_uid.clone(), index_config); + + self.model.add_index(index_metadata); let response = CreateIndexResponse { index_uid: index_uid.into(), @@ -262,7 +265,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.indexing_scheduler.on_index_change().await?; + self.indexing_scheduler.on_index_change(&self.model).await?; Ok(Ok(response)) } @@ -287,21 +290,22 @@ impl Handler for ControlPlane { return Ok(Err(ControlPlaneError::from(error))); } }; - let source_id = source_config.source_id.clone(); if let Err(metastore_error) = self .metastore - .add_source(index_uid.clone(), source_config) + .add_source(index_uid.clone(), source_config.clone()) .await { return convert_metastore_error(metastore_error); }; - self.model.add_source(&index_uid, &source_id); + self.model + .add_source(&index_uid, source_config) + .context("failed to add source")?; // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.indexing_scheduler.on_index_change().await?; + self.indexing_scheduler.on_index_change(&self.model).await?; let response = EmptyResponse {}; Ok(Ok(response)) @@ -333,7 +337,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.indexing_scheduler.on_index_change().await?; + self.indexing_scheduler.on_index_change(&self.model).await?; let response = EmptyResponse {}; Ok(Ok(response)) @@ -362,7 +366,7 @@ impl Handler for ControlPlane { } self.model.delete_source(&index_uid, &request.source_id); - self.indexing_scheduler.on_index_change().await?; + self.indexing_scheduler.on_index_change(&self.model).await?; let response = EmptyResponse {}; Ok(Ok(response)) } @@ -542,6 +546,9 @@ mod tests { let indexer_pool = IndexerPool::default(); let ingester_pool = IngesterPool::default(); + let index_metadata = IndexMetadata::for_test("test-index", "ram://test"); + let index_uid = index_metadata.index_uid.clone(); + let mut mock_metastore = MockMetastore::default(); mock_metastore .expect_add_source() @@ -553,7 +560,7 @@ mod tests { }); mock_metastore .expect_list_indexes_metadatas() - .returning(|_| Ok(Vec::new())); + .returning(move |_| Ok(vec![index_metadata.clone()])); let metastore = Arc::new(mock_metastore); let replication_factor = 1; @@ -568,7 +575,7 @@ mod tests { ); let source_config = SourceConfig::for_test("test-source", SourceParams::void()); let add_source_request = AddSourceRequest { - index_uid: "test-index:0".to_string(), + index_uid: index_uid.to_string(), source_config_json: serde_json::to_string(&source_config).unwrap(), }; control_plane_mailbox diff --git a/quickwit/quickwit-control-plane/src/control_plane_model.rs b/quickwit/quickwit-control-plane/src/control_plane_model.rs index 439878134df..93fd3648721 100644 --- a/quickwit/quickwit-control-plane/src/control_plane_model.rs +++ b/quickwit/quickwit-control-plane/src/control_plane_model.rs @@ -24,16 +24,18 @@ use fnv::{FnvHashMap, FnvHashSet}; #[cfg(test)] use itertools::Itertools; use quickwit_common::Progress; -use quickwit_config::INGEST_SOURCE_ID; -use quickwit_metastore::{ListIndexesQuery, Metastore}; +use quickwit_config::{SourceConfig, INGEST_SOURCE_ID}; +use quickwit_metastore::{IndexMetadata, ListIndexesQuery, Metastore}; use quickwit_proto::control_plane::ControlPlaneResult; use quickwit_proto::ingest::{Shard, ShardState}; -use quickwit_proto::metastore::ListShardsSubrequest; +use quickwit_proto::metastore::{EntityKind, ListShardsSubrequest, MetastoreError}; use quickwit_proto::types::IndexId; use quickwit_proto::{metastore, IndexUid, NodeId, NodeIdRef, ShardId, SourceId}; use serde::Serialize; use tracing::{error, info}; +use crate::SourceUid; + type NextShardId = ShardId; #[derive(Debug, Eq, PartialEq)] struct ShardTableEntry { @@ -82,7 +84,8 @@ impl ShardTableEntry { /// Upon starts, it loads its entire state from the metastore. #[derive(Default, Debug)] pub struct ControlPlaneModel { - index_table: FnvHashMap, + index_uid_table: FnvHashMap, + index_table: FnvHashMap, shard_table: ShardTable, } @@ -110,33 +113,35 @@ impl ControlPlaneModel { ) -> ControlPlaneResult<()> { let now = Instant::now(); self.clear(); - let indexes = progress + let index_metadatas = progress .protect_future(metastore.list_indexes_metadatas(ListIndexesQuery::All)) .await?; - self.index_table.reserve(indexes.len()); + let num_indexes = index_metadatas.len(); + self.index_table.reserve(num_indexes); - let num_indexes = indexes.len(); let mut num_sources = 0; let mut num_shards = 0; - let mut subrequests = Vec::with_capacity(indexes.len()); + let mut subrequests = Vec::with_capacity(index_metadatas.len()); + + for index_metadata in index_metadatas { + self.add_index(index_metadata); + } - for index in indexes { - for source_id in index.sources.into_keys() { + for index_metadata in self.index_table.values() { + for source_id in index_metadata.sources.keys() { if source_id != INGEST_SOURCE_ID { continue; } let request = ListShardsSubrequest { - index_uid: index.index_uid.clone().into(), - source_id, + index_uid: index_metadata.index_uid.clone().into(), + source_id: source_id.to_string(), shard_state: Some(ShardState::Open as i32), }; num_sources += 1; subrequests.push(request); } - self.index_table - .insert(index.index_config.index_id, index.index_uid); } if !subrequests.is_empty() { let list_shards_request = metastore::ListShardsRequest { subrequests }; @@ -151,10 +156,10 @@ impl ControlPlaneModel { for list_shards_subresponse in list_shard_response.subresponses { num_shards += list_shards_subresponse.shards.len(); - let key = ( - list_shards_subresponse.index_uid.into(), - list_shards_subresponse.source_id, - ); + let source_uid = SourceUid { + index_uid: list_shards_subresponse.index_uid.into(), + source_id: list_shards_subresponse.source_id, + }; let shards: FnvHashMap = list_shards_subresponse .shards .into_iter() @@ -164,7 +169,9 @@ impl ControlPlaneModel { shards, next_shard_id: list_shards_subresponse.next_shard_id, }; - self.shard_table.table_entries.insert(key, table_entry); + self.shard_table + .table_entries + .insert(source_uid, table_entry); } } info!( @@ -178,19 +185,58 @@ impl ControlPlaneModel { Ok(()) } - pub(crate) fn add_index(&mut self, index_uid: IndexUid) { - let index_id = index_uid.index_id().to_string(); - self.index_table.insert(index_id, index_uid); + pub fn list_shards(&self, source_uid: &SourceUid) -> Vec { + self.shard_table.list_shards(source_uid) + } + + pub(crate) fn get_source_configs( + &self, + ) -> impl Iterator + '_ { + self.index_table.values().flat_map(|index_metadata| { + index_metadata + .sources + .iter() + .map(move |(source_id, source_config)| { + ( + SourceUid { + index_uid: index_metadata.index_uid.clone(), + source_id: source_id.clone(), + }, + source_config, + ) + }) + }) + } + + pub(crate) fn add_index(&mut self, index_metadata: IndexMetadata) { + let index_uid = index_metadata.index_uid.clone(); + self.index_uid_table + .insert(index_metadata.index_id().to_string(), index_uid.clone()); + self.index_table.insert(index_uid, index_metadata); } pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { // TODO: We need to let the routers and ingesters know. - self.index_table.remove(index_uid.index_id()); + self.index_table.remove(index_uid); self.shard_table.delete_index(index_uid.index_id()); } - pub(crate) fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { - self.shard_table.add_source(index_uid, source_id); + /// Adds a source to a given index. Returns an error if a source with the same source_id already + /// exists. + pub(crate) fn add_source( + &mut self, + index_uid: &IndexUid, + source_config: SourceConfig, + ) -> ControlPlaneResult<()> { + self.shard_table + .add_source(index_uid, &source_config.source_id); + let index_metadata = self.index_table.get_mut(index_uid).ok_or_else(|| { + MetastoreError::NotFound(EntityKind::Index { + index_id: index_uid.to_string(), + }) + })?; + index_metadata.add_source(source_config)?; + Ok(()) } pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { @@ -221,7 +267,7 @@ impl ControlPlaneModel { } pub fn index_uid(&self, index_id: &str) -> Option { - self.index_table.get(index_id).cloned() + self.index_uid_table.get(index_id).cloned() } /// Updates the shard table. @@ -252,7 +298,7 @@ impl ControlPlaneModel { // A table that keeps track of the existing shards for each index and source. #[derive(Debug, Default)] struct ShardTable { - table_entries: FnvHashMap<(IndexUid, SourceId), ShardTableEntry>, + table_entries: FnvHashMap, } impl ShardTable { @@ -260,10 +306,12 @@ impl ShardTable { /// /// TODO check and document the behavior on error (if the source was already here). fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { - let key = (index_uid.clone(), source_id.clone()); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; let table_entry = ShardTableEntry::default(); - let previous_table_entry_opt = self.table_entries.insert(key, table_entry); - + let previous_table_entry_opt = self.table_entries.insert(source_uid, table_entry); if let Some(previous_table_entry) = previous_table_entry_opt { if !previous_table_entry.is_default() { error!( @@ -276,14 +324,28 @@ impl ShardTable { } fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { - let key = (index_uid.clone(), source_id.clone()); - self.table_entries.remove(&key); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + self.table_entries.remove(&source_uid); } /// Removes all the entries that match the target index ID. fn delete_index(&mut self, index_id: &str) { self.table_entries - .retain(|(index_uid, _), _| index_uid.index_id() != index_id); + .retain(|source_uid, _| source_uid.index_uid.index_id() != index_id); + } + + fn list_shards(&self, source_uid: &SourceUid) -> Vec { + let Some(shard_table_entry) = self.table_entries.get(source_uid) else { + return Vec::new(); + }; + shard_table_entry + .shards + .values() + .map(|shard| shard.shard_id) + .collect() } /// Finds open shards for a given index and source and whose leaders are not in the set of @@ -294,8 +356,11 @@ impl ShardTable { source_id: &SourceId, unavailable_ingesters: &FnvHashSet, ) -> Option<(Vec, NextShardId)> { - let key = (index_uid.clone(), source_id.clone()); - let table_entry = self.table_entries.get(&key)?; + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let table_entry = self.table_entries.get(&source_uid)?; let open_shards: Vec = table_entry .shards .values() @@ -323,8 +388,11 @@ impl ShardTable { shards: &[Shard], next_shard_id: NextShardId, ) { - let key = (index_uid.clone(), source_id.clone()); - match self.table_entries.entry(key) { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + match self.table_entries.entry(source_uid) { Entry::Occupied(mut entry) => { for shard in shards { let table_entry = entry.get_mut(); @@ -358,8 +426,11 @@ impl ShardTable { source_id: &SourceId, shard_ids: &[ShardId], ) { - let key = (index_uid.clone(), source_id.clone()); - if let Some(table_entry) = self.table_entries.get_mut(&key) { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + if let Some(table_entry) = self.table_entries.get_mut(&source_uid) { for shard_id in shard_ids { if let Some(shard) = table_entry.shards.get_mut(shard_id) { shard.shard_state = ShardState::Closed as i32; @@ -375,8 +446,11 @@ impl ShardTable { source_id: &SourceId, shard_ids: &[ShardId], ) { - let key = (index_uid.clone(), source_id.clone()); - if let Some(table_entry) = self.table_entries.get_mut(&key) { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + if let Some(table_entry) = self.table_entries.get_mut(&source_uid) { for shard_id in shard_ids { table_entry.shards.remove(shard_id); } @@ -399,8 +473,11 @@ mod tests { let mut shard_table = ShardTable::default(); shard_table.add_source(&index_uid, &source_id); assert_eq!(shard_table.table_entries.len(), 1); - let key = (index_uid.clone(), source_id.clone()); - let table_entry = shard_table.table_entries.get(&key).unwrap(); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); assert!(table_entry.shards.is_empty()); assert_eq!(table_entry.next_shard_id, 1); } @@ -496,8 +573,11 @@ mod tests { assert_eq!(shard_table.table_entries.len(), 1); - let key = (index_uid_0.clone(), source_id.clone()); - let table_entry = shard_table.table_entries.get(&key).unwrap(); + let source_uid = SourceUid { + index_uid: index_uid_0.clone(), + source_id: source_id.clone(), + }; + let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); let shards = table_entry.shards(); assert_eq!(shards.len(), 1); assert_eq!(shards[0], shard_01); @@ -523,8 +603,11 @@ mod tests { assert_eq!(shard_table.table_entries.len(), 1); - let key = (index_uid_0.clone(), source_id.clone()); - let table_entry = shard_table.table_entries.get(&key).unwrap(); + let source_uid = SourceUid { + index_uid: index_uid_0.clone(), + source_id: source_id.clone(), + }; + let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); let shards = table_entry.shards(); assert_eq!(shards.len(), 2); assert_eq!(shards[0], shard_01); @@ -571,15 +654,21 @@ mod tests { assert_eq!(shard_table.table_entries.len(), 2); - let key = (index_uid_0.clone(), source_id.clone()); - let table_entry = shard_table.table_entries.get(&key).unwrap(); + let source_uid_0 = SourceUid { + index_uid: index_uid_0.clone(), + source_id: source_id.clone(), + }; + let table_entry = shard_table.table_entries.get(&source_uid_0).unwrap(); let shards = table_entry.shards(); assert_eq!(shards.len(), 1); assert_eq!(shards[0], shard_01); assert_eq!(table_entry.next_shard_id, 3); - let key = (index_uid_1.clone(), source_id.clone()); - let table_entry = shard_table.table_entries.get(&key).unwrap(); + let source_uid_1 = SourceUid { + index_uid: index_uid_1.clone(), + source_id: source_id.clone(), + }; + let table_entry = shard_table.table_entries.get(&source_uid_1).unwrap(); assert!(table_entry.is_empty()); assert_eq!(table_entry.next_shard_id, 2); } @@ -643,25 +732,31 @@ mod tests { assert_eq!(model.index_table.len(), 2); assert_eq!( - model.index_table["test-index-0"], + model.index_uid("test-index-0").unwrap(), IndexUid::from("test-index-0:0".to_string()) ); assert_eq!( - model.index_table["test-index-1"], + model.index_uid("test-index-1").unwrap(), IndexUid::from("test-index-1:0".to_string()) ); assert_eq!(model.shard_table.table_entries.len(), 2); - let key = ("test-index-0:0".into(), INGEST_SOURCE_ID.to_string()); - let table_entry = model.shard_table.table_entries.get(&key).unwrap(); + let source_uid_0 = SourceUid { + index_uid: "test-index-0:0".into(), + source_id: INGEST_SOURCE_ID.to_string(), + }; + let table_entry = model.shard_table.table_entries.get(&source_uid_0).unwrap(); let shards = table_entry.shards(); assert_eq!(shards.len(), 1); assert_eq!(shards[0].shard_id, 42); assert_eq!(table_entry.next_shard_id, 43); - let key = ("test-index-1:0".into(), INGEST_SOURCE_ID.to_string()); - let table_entry = model.shard_table.table_entries.get(&key).unwrap(); + let source_uid_1 = SourceUid { + index_uid: "test-index-1:0".into(), + source_id: INGEST_SOURCE_ID.to_string(), + }; + let table_entry = model.shard_table.table_entries.get(&source_uid_1).unwrap(); let shards = table_entry.shards(); assert_eq!(shards.len(), 0); assert_eq!(table_entry.next_shard_id, 1); diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs index 6433e4ffd57..fa83c3b49bd 100644 --- a/quickwit/quickwit-control-plane/src/indexing_plan.rs +++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs @@ -18,18 +18,17 @@ // along with this program. If not, see . use std::cmp::Ordering; -use std::collections::HashMap; -use std::hash::Hash; +use std::num::NonZeroUsize; +use fnv::FnvHashMap; use itertools::Itertools; use quickwit_common::rendezvous_hasher::sort_by_rendez_vous_hash; -use quickwit_config::{SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID}; use quickwit_proto::indexing::IndexingTask; use quickwit_proto::metastore::SourceType; -use quickwit_proto::{IndexUid, SourceId}; use serde::Serialize; -use crate::IndexerNodeInfo; +use crate::control_plane_model::ControlPlaneModel; +use crate::{IndexerNodeInfo, SourceUid}; /// A [`PhysicalIndexingPlan`] defines the list of indexing tasks /// each indexer, identified by its node ID, should run. @@ -37,7 +36,7 @@ use crate::IndexerNodeInfo; /// to identify if the plan is up to date with the metastore. #[derive(Debug, PartialEq, Clone, Serialize, Default)] pub struct PhysicalIndexingPlan { - indexing_tasks_per_node_id: HashMap>, + indexing_tasks_per_node_id: FnvHashMap>, } impl PhysicalIndexingPlan { @@ -63,11 +62,20 @@ impl PhysicalIndexingPlan { .unwrap_or(0) } - pub fn assign_indexing_task(&mut self, node_id: String, indexing_task: IndexingTask) { + fn assign_indexing_task( + &mut self, + node_id: String, + logical_indexing_task: LogicalIndexingTask, + ) { + let physical_indexing_task = IndexingTask { + index_uid: logical_indexing_task.source_uid.index_uid.to_string(), + source_id: logical_indexing_task.source_uid.source_id, + shard_ids: logical_indexing_task.shard_ids, + }; self.indexing_tasks_per_node_id .entry(node_id) .or_default() - .push(indexing_task); + .push(physical_indexing_task); } /// Returns the number of indexing tasks for the given node ID, index ID and source ID. @@ -84,7 +92,7 @@ impl PhysicalIndexingPlan { } /// Returns the hashmap of (node ID, indexing tasks). - pub fn indexing_tasks_per_node(&self) -> &HashMap> { + pub fn indexing_tasks_per_node(&self) -> &FnvHashMap> { &self.indexing_tasks_per_node_id } @@ -140,13 +148,10 @@ impl PhysicalIndexingPlan { /// task. pub(crate) fn build_physical_indexing_plan( indexers: &[(String, IndexerNodeInfo)], - source_configs: &HashMap, - mut indexing_tasks: Vec, + mut indexing_tasks: Vec, ) -> PhysicalIndexingPlan { // Sort by (index_id, source_id) to make the algorithm deterministic. - indexing_tasks.sort_by(|left, right| { - (&left.index_uid, &left.source_id).cmp(&(&right.index_uid, &right.source_id)) - }); + indexing_tasks.sort_by(|left, right| left.source_uid.cmp(&right.source_uid)); let mut node_ids = indexers .iter() .map(|indexer| indexer.0.clone()) @@ -156,11 +161,7 @@ pub(crate) fn build_physical_indexing_plan( let mut plan = PhysicalIndexingPlan::new(node_ids.clone()); for indexing_task in indexing_tasks { sort_by_rendez_vous_hash(&mut node_ids, &indexing_task); - let source_config = source_configs - // TODO(fmassot): remove this lame allocation to access the source... - .get(&SourceUid::from(indexing_task.clone())) - .expect("SourceConfig should always be present."); - let candidates = select_node_candidates(&node_ids, &plan, source_config, &indexing_task); + let candidates = select_node_candidates(&node_ids, &plan, &indexing_task); // It's theoretically possible to have no candidate as all indexers can already // have more than `max_num_pipelines_per_indexer` assigned for a given source. @@ -219,18 +220,18 @@ impl<'a> Ord for NodeScore<'a> { fn select_node_candidates<'a>( node_ids: &'a [String], physical_plan: &PhysicalIndexingPlan, - source_config: &SourceConfig, - indexing_task: &IndexingTask, + indexing_task: &LogicalIndexingTask, ) -> Vec<&'a str> { + let index_uid_str: String = indexing_task.source_uid.index_uid.to_string(); node_ids .iter() .map(String::as_str) .filter(|node_id| { physical_plan.num_indexing_tasks_for( node_id, - &indexing_task.index_uid, - &indexing_task.source_id, - ) < source_config.max_num_pipelines_per_indexer.get() + &index_uid_str, + &indexing_task.source_uid.source_id, + ) < indexing_task.max_num_pipelines_per_indexer.get() }) .collect_vec() } @@ -244,12 +245,6 @@ fn compute_node_score(node_id: &str, physical_plan: &PhysicalIndexingPlan) -> f3 - physical_plan.num_indexing_tasks_for_node(node_id) as f32 } -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] -pub(crate) struct SourceUid { - pub index_uid: IndexUid, - pub source_id: SourceId, -} - impl From for SourceUid { fn from(indexing_task: IndexingTask) -> Self { Self { @@ -259,6 +254,13 @@ impl From for SourceUid { } } +#[derive(Hash, Debug, Clone, Eq, PartialEq)] +pub(crate) struct LogicalIndexingTask { + pub source_uid: SourceUid, + pub shard_ids: Vec, + pub max_num_pipelines_per_indexer: NonZeroUsize, +} + /// Builds the indexing plan = `[Vec for SourceUid { /// forward documents to the right indexers. /// - Ignore disabled sources, `CLI_INGEST_SOURCE_ID` and files sources (Quickwit is not aware of /// the files locations and thus are ignored). -pub(crate) fn build_indexing_plan( - source_configs: &HashMap, +pub(crate) fn list_indexing_tasks( num_indexers: usize, -) -> Vec { - let mut indexing_tasks: Vec = Vec::new(); - for (source_uid, source_config) in source_configs - .iter() + model: &ControlPlaneModel, +) -> Vec { + let mut indexing_tasks: Vec = Vec::new(); + for (source_uid, source_config) in model + .get_source_configs() .sorted_by(|(left, _), (right, _)| left.cmp(right)) { if !source_config.enabled { continue; } - if [CLI_INGEST_SOURCE_ID, INGEST_SOURCE_ID].contains(&source_config.source_id.as_str()) { - continue; - } - // Ignore file sources as we don't know the file location. - if source_config.source_type() == SourceType::File { - continue; - } - let num_pipelines = if source_config.source_id == INGEST_API_SOURCE_ID { - num_indexers - } else { - // The num desired pipelines is constrained by the number of indexer and the maximum - // of pipelines that can run on each indexer. - std::cmp::min( - source_config.desired_num_pipelines.get(), - source_config.max_num_pipelines_per_indexer.get() * num_indexers, - ) - }; - for _ in 0..num_pipelines { - indexing_tasks.push(IndexingTask { - index_uid: source_uid.index_uid.to_string(), - source_id: source_uid.source_id.clone(), - shard_ids: Vec::new(), - }); + match source_config.source_type() { + SourceType::Cli | SourceType::File | SourceType::Vec | SourceType::Void => { + continue; + } + SourceType::IngestV1 => { + let logical_indexing_task = LogicalIndexingTask { + source_uid: source_uid.clone(), + shard_ids: Vec::new(), + max_num_pipelines_per_indexer: source_config.max_num_pipelines_per_indexer, + }; + // TODO fix that ugly logic. + indexing_tasks.extend(std::iter::repeat(logical_indexing_task).take(num_indexers)); + } + SourceType::IngestV2 => { + // TODO for the moment for ingest v2 we create one indexing task with all of the + // shards. We probably want to have something more unified between + // IngestV2 and kafka-alike... e.g. one indexing task with a bunch + // of weighted shards, one indexing task. + // + // The construction of the physical plan would then have to split that into more + // chewable physical tasks (one per pipeline, one per reasonable + // subset of shards). + + // TODO: More precisely, we want the shards that are open or closing or such that + // `shard.publish_position_inclusive` + // < `shard.replication_position_inclusive`. + // let list_shard_subrequest = ListShardsSubrequest { + // index_uid: source_uid.index_uid.clone().into(), + // source_id: source_uid.source_id.clone(), + // shard_state: None, + // }; + let shard_ids = model.list_shards(&source_uid); + let indexing_task = LogicalIndexingTask { + source_uid: source_uid.clone(), + shard_ids, + max_num_pipelines_per_indexer: source_config.max_num_pipelines_per_indexer, + }; + indexing_tasks.push(indexing_task); + } + SourceType::Kafka + | SourceType::Kinesis + | SourceType::GcpPubsub + | SourceType::Nats + | SourceType::Pulsar => { + let num_pipelines = + // The num desired pipelines is constrained by the number of indexer and the maximum + // of pipelines that can run on each indexer. + std::cmp::min( + source_config.desired_num_pipelines.get(), + source_config.max_num_pipelines_per_indexer.get() * num_indexers, + ); + let logical_indexing_task = LogicalIndexingTask { + source_uid: source_uid.clone(), + shard_ids: Vec::new(), + max_num_pipelines_per_indexer: source_config.max_num_pipelines_per_indexer, + }; + indexing_tasks.extend(std::iter::repeat(logical_indexing_task).take(num_pipelines)); + } } } indexing_tasks @@ -314,17 +351,18 @@ pub(crate) fn build_indexing_plan( #[cfg(test)] mod tests { - use std::collections::HashMap; use std::num::NonZeroUsize; + use fnv::FnvHashMap; use itertools::Itertools; use proptest::prelude::*; use quickwit_common::rand::append_random_suffix; use quickwit_config::service::QuickwitService; use quickwit_config::{ - FileSourceParams, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams, - CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, + FileSourceParams, IndexConfig, KafkaSourceParams, SourceConfig, SourceInputFormat, + SourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, }; + use quickwit_metastore::IndexMetadata; use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask}; use quickwit_proto::IndexUid; use rand::seq::SliceRandom; @@ -332,7 +370,8 @@ mod tests { use tonic::transport::Endpoint; use super::{build_physical_indexing_plan, SourceUid}; - use crate::indexing_plan::build_indexing_plan; + use crate::control_plane_model::ControlPlaneModel; + use crate::indexing_plan::{list_indexing_tasks, LogicalIndexingTask}; use crate::IndexerNodeInfo; fn kafka_source_params_for_test() -> SourceParams { @@ -368,7 +407,7 @@ mod tests { fn count_indexing_tasks_count_for_test( num_indexers: usize, - source_configs: &HashMap, + source_configs: &FnvHashMap, ) -> usize { source_configs .iter() @@ -383,37 +422,33 @@ mod tests { #[tokio::test] async fn test_build_indexing_plan_one_source() { - let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await; - let mut source_configs_map = HashMap::new(); - let index_source_id = SourceUid { - index_uid: "one-source-index:11111111111111111111111111" - .to_string() - .into(), - source_id: "source-0".to_string(), + let source_config = SourceConfig { + source_id: "source_id".to_string(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + desired_num_pipelines: NonZeroUsize::new(3).unwrap(), + enabled: true, + source_params: kafka_source_params_for_test(), + transform_config: None, + input_format: SourceInputFormat::Json, }; - source_configs_map.insert( - index_source_id.clone(), - SourceConfig { - source_id: index_source_id.source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(3).unwrap(), - enabled: true, - source_params: kafka_source_params_for_test(), - transform_config: None, - input_format: SourceInputFormat::Json, - }, - ); - - let indexing_tasks = build_indexing_plan(&source_configs_map, indexers.len()); - + let mut model = ControlPlaneModel::default(); + let index_metadata = IndexMetadata::for_test("test-index", "ram://test"); + let index_uid = index_metadata.index_uid.clone(); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_config.source_id.clone(), + }; + model.add_index(index_metadata); + model.add_source(&index_uid, source_config).unwrap(); + let indexing_tasks = list_indexing_tasks(4, &model); assert_eq!(indexing_tasks.len(), 3); for indexing_task in indexing_tasks { assert_eq!( indexing_task, - IndexingTask { - index_uid: index_source_id.index_uid.to_string(), - source_id: index_source_id.source_id.to_string(), + LogicalIndexingTask { + source_uid: source_uid.clone(), shard_ids: Vec::new(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), } ); } @@ -422,36 +457,40 @@ mod tests { #[tokio::test] async fn test_build_indexing_plan_with_ingest_api_source() { let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await; - let mut source_configs_map = HashMap::new(); - let index_source_id = SourceUid { - index_uid: "ingest-api-index:11111111111111111111111111" - .to_string() - .into(), + + let index_metadata = IndexMetadata::for_test("test-index", "ram://test"); + let index_uid = index_metadata.index_uid.clone(); + let source_uid = SourceUid { + index_uid: index_uid.clone(), source_id: INGEST_API_SOURCE_ID.to_string(), }; - source_configs_map.insert( - index_source_id.clone(), - SourceConfig { - source_id: index_source_id.source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(3).unwrap(), - enabled: true, - source_params: SourceParams::IngestApi, - transform_config: None, - input_format: SourceInputFormat::Json, - }, - ); - let indexing_tasks = build_indexing_plan(&source_configs_map, indexers.len()); + let source_config = SourceConfig { + source_id: source_uid.source_id.to_string(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + desired_num_pipelines: NonZeroUsize::new(3).unwrap(), + enabled: true, + source_params: SourceParams::IngestApi, + transform_config: None, + input_format: SourceInputFormat::Json, + }; + + let mut control_plane_model = ControlPlaneModel::default(); + control_plane_model.add_index(index_metadata); + control_plane_model + .add_source(&index_uid, source_config) + .unwrap(); + let indexing_tasks: Vec = + list_indexing_tasks(indexers.len(), &control_plane_model); assert_eq!(indexing_tasks.len(), 4); for indexing_task in indexing_tasks { assert_eq!( indexing_task, - IndexingTask { - index_uid: index_source_id.index_uid.to_string(), - source_id: index_source_id.source_id.to_string(), + LogicalIndexingTask { + source_uid: source_uid.clone(), shard_ids: Vec::new(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), } ); } @@ -460,7 +499,7 @@ mod tests { #[tokio::test] async fn test_build_indexing_plan_with_sources_to_ignore() { let indexers = cluster_members_for_test(4, QuickwitService::Indexer).await; - let mut source_configs_map = HashMap::new(); + let mut source_configs_map = FnvHashMap::default(); let file_index_source_id = SourceUid { index_uid: "one-source-index:11111111111111111111111111" .to_string() @@ -515,7 +554,9 @@ mod tests { input_format: SourceInputFormat::Json, }, ); - let indexing_tasks = build_indexing_plan(&source_configs_map, indexers.len()); + + let control_plane_model = ControlPlaneModel::default(); + let indexing_tasks = list_indexing_tasks(indexers.len(), &control_plane_model); assert_eq!(indexing_tasks.len(), 0); } @@ -529,7 +570,7 @@ mod tests { // Rdv hashing for (index 2, source) returns [node 1, node 2]. let index_2 = "2"; let source_2 = "0"; - let mut source_configs_map = HashMap::new(); + // let mut source_configs_map = FnvHashMap::default(); let kafka_index_source_id_1 = SourceUid { index_uid: IndexUid::from_parts(index_1, "11111111111111111111111111"), source_id: source_1.to_string(), @@ -538,57 +579,24 @@ mod tests { index_uid: IndexUid::from_parts(index_2, "11111111111111111111111111"), source_id: source_2.to_string(), }; - source_configs_map.insert( - kafka_index_source_id_1.clone(), - SourceConfig { - source_id: kafka_index_source_id_1.source_id, - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(4).unwrap(), - enabled: true, - source_params: kafka_source_params_for_test(), - transform_config: None, - input_format: SourceInputFormat::Json, - }, - ); - source_configs_map.insert( - kafka_index_source_id_2.clone(), - SourceConfig { - source_id: kafka_index_source_id_2.source_id, - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), - enabled: true, - source_params: kafka_source_params_for_test(), - transform_config: None, - input_format: SourceInputFormat::Json, - }, - ); let mut indexing_tasks = Vec::new(); for _ in 0..3 { - indexing_tasks.push(IndexingTask { - index_uid: IndexUid::from_parts( - index_1.to_string(), - "11111111111111111111111111".to_string(), - ) - .to_string(), - source_id: source_1.to_string(), + indexing_tasks.push(LogicalIndexingTask { + source_uid: kafka_index_source_id_1.clone(), shard_ids: Vec::new(), + max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), }); } for _ in 0..2 { - indexing_tasks.push(IndexingTask { - index_uid: IndexUid::from_parts( - index_2.to_string(), - "11111111111111111111111111".to_string(), - ) - .to_string(), - source_id: source_2.to_string(), + indexing_tasks.push(LogicalIndexingTask { + source_uid: kafka_index_source_id_2.clone(), shard_ids: Vec::new(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), }); } let indexers = cluster_members_for_test(2, QuickwitService::Indexer).await; - let physical_plan = - build_physical_indexing_plan(&indexers, &source_configs_map, indexing_tasks.clone()); + let physical_plan = build_physical_indexing_plan(&indexers, indexing_tasks.clone()); assert_eq!(physical_plan.indexing_tasks_per_node_id.len(), 2); let indexer_1_tasks = physical_plan .indexing_tasks_per_node_id @@ -600,64 +608,60 @@ mod tests { .unwrap(); // (index 1, source) tasks are first placed on indexer 2 by rdv hashing. // Thus task 0 => indexer 2, task 1 => indexer 1, task 2 => indexer 2, task 3 => indexer 1. - let expected_indexer_1_tasks = indexing_tasks + let expected_indexer_1_tasks: Vec = indexing_tasks .iter() .cloned() .enumerate() .filter(|(idx, _)| idx % 2 == 1) - .map(|(_, task)| task) + .map(|(_, task)| task.into()) .collect_vec(); assert_eq!(indexer_1_tasks, &expected_indexer_1_tasks); // (index 1, source) tasks are first placed on node 1 by rdv hashing. - let expected_indexer_2_tasks = indexing_tasks + let expected_indexer_2_tasks: Vec = indexing_tasks .into_iter() .enumerate() .filter(|(idx, _)| idx % 2 == 0) - .map(|(_, task)| task) + .map(|(_, task)| task.into()) .collect_vec(); assert_eq!(indexer_2_tasks, &expected_indexer_2_tasks); } + impl From for IndexingTask { + fn from(task: LogicalIndexingTask) -> Self { + IndexingTask { + index_uid: task.source_uid.index_uid.to_string(), + source_id: task.source_uid.source_id.clone(), + shard_ids: task.shard_ids, + } + } + } + #[tokio::test] async fn test_build_physical_indexing_plan_with_not_enough_indexers() { quickwit_common::setup_logging_for_tests(); let index_1 = "test-indexing-plan-1"; let source_1 = "source-1"; - let mut source_configs_map = HashMap::new(); - let kafka_index_source_id_1 = SourceUid { + let source_uid = SourceUid { index_uid: IndexUid::from_parts(index_1, "11111111111111111111111111"), source_id: source_1.to_string(), }; - source_configs_map.insert( - kafka_index_source_id_1.clone(), - SourceConfig { - source_id: kafka_index_source_id_1.source_id, - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), - enabled: true, - source_params: kafka_source_params_for_test(), - transform_config: None, - input_format: SourceInputFormat::Json, - }, - ); let indexing_tasks = vec![ - IndexingTask { - index_uid: IndexUid::from_parts(index_1, "11111111111111111111111111").to_string(), - source_id: source_1.to_string(), + LogicalIndexingTask { + source_uid: source_uid.clone(), shard_ids: Vec::new(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), }, - IndexingTask { - index_uid: IndexUid::from_parts(index_1, "11111111111111111111111111").to_string(), - source_id: source_1.to_string(), + LogicalIndexingTask { + source_uid: source_uid.clone(), shard_ids: Vec::new(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), }, ]; let indexers = cluster_members_for_test(1, QuickwitService::Indexer).await; // This case should never happens but we just check that the plan building is resilient // enough, it will ignore the tasks that cannot be allocated. - let physical_plan = - build_physical_indexing_plan(&indexers, &source_configs_map, indexing_tasks); + let physical_plan = build_physical_indexing_plan(&indexers, indexing_tasks); assert_eq!(physical_plan.num_indexing_tasks(), 1); } @@ -668,19 +672,36 @@ mod tests { let mut indexers = tokio::runtime::Runtime::new().unwrap().block_on( cluster_members_for_test(num_indexers, QuickwitService::Indexer) ); - let source_configs: HashMap = index_id_sources + + let index_uids: fnv::FnvHashSet = + index_id_sources.iter() + .map(|(index_uid, _)| index_uid.clone()) + .collect(); + + let mut control_plane_model = ControlPlaneModel::default(); + for index_uid in index_uids { + let index_config = IndexConfig::for_test(index_uid.index_id(), &format!("ram://test/{index_uid}")); + control_plane_model.add_index(IndexMetadata::new_with_index_uid(index_uid, index_config)); + } + for (index_uid, source_config) in &index_id_sources { + control_plane_model.add_source(index_uid, source_config.clone()).unwrap(); + } + + let source_configs: FnvHashMap = index_id_sources .into_iter() .map(|(index_uid, source_config)| { - (SourceUid { index_uid: index_uid.into(), source_id: source_config.source_id.to_string(), }, source_config) + (SourceUid { index_uid: index_uid.clone(), source_id: source_config.source_id.to_string(), }, source_config) }) .collect(); - let mut indexing_tasks = build_indexing_plan(&source_configs, indexers.len()); + + + let mut indexing_tasks = list_indexing_tasks(indexers.len(), &control_plane_model); let num_indexing_tasks = indexing_tasks.len(); assert_eq!(indexing_tasks.len(), count_indexing_tasks_count_for_test(indexers.len(), &source_configs)); - let physical_indexing_plan = build_physical_indexing_plan(&indexers, &source_configs, indexing_tasks.clone()); + let physical_indexing_plan = build_physical_indexing_plan(&indexers, indexing_tasks.clone()); indexing_tasks.shuffle(&mut rand::thread_rng()); indexers.shuffle(&mut rand::thread_rng()); - let physical_indexing_plan_with_shuffle = build_physical_indexing_plan(&indexers, &source_configs, indexing_tasks.clone()); + let physical_indexing_plan_with_shuffle = build_physical_indexing_plan(&indexers, indexing_tasks.clone()); assert_eq!(physical_indexing_plan, physical_indexing_plan_with_shuffle); // All indexing tasks must have been assigned to an indexer. assert_eq!(physical_indexing_plan.num_indexing_tasks(), num_indexing_tasks); @@ -694,10 +715,10 @@ mod tests { prop_compose! { fn gen_kafka_source() - (index_idx in 0usize..100usize, desired_num_pipelines in 1usize..51usize, max_num_pipelines_per_indexer in 1usize..5usize) -> (String, SourceConfig) { - let index_id = format!("index-id-{index_idx}"); + (index_idx in 0usize..100usize, desired_num_pipelines in 1usize..51usize, max_num_pipelines_per_indexer in 1usize..5usize) -> (IndexUid, SourceConfig) { + let index_uid = IndexUid::from_parts(format!("index-id-{index_idx}"), "" /* this is the index uid */); let source_id = append_random_suffix("kafka-source"); - (index_id, SourceConfig { + (index_uid, SourceConfig { source_id, desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer).unwrap(), diff --git a/quickwit/quickwit-control-plane/src/scheduler.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler.rs similarity index 71% rename from quickwit/quickwit-control-plane/src/scheduler.rs rename to quickwit/quickwit-control-plane/src/indexing_scheduler.rs index 5471886ce00..468cba7e76e 100644 --- a/quickwit/quickwit-control-plane/src/scheduler.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler.rs @@ -18,23 +18,22 @@ // along with this program. If not, see . use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; use std::fmt; use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::Context; +use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; -use quickwit_config::{SourceConfig, INGEST_SOURCE_ID}; -use quickwit_metastore::{ListIndexesQuery, Metastore}; +use quickwit_metastore::Metastore; use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask}; -use quickwit_proto::metastore::{ListShardsRequest, ListShardsSubrequest}; -use quickwit_proto::{NodeId, ShardId}; +use quickwit_proto::NodeId; use serde::Serialize; use tracing::{debug, error, info, warn}; +use crate::control_plane_model::ControlPlaneModel; use crate::indexing_plan::{ - build_indexing_plan, build_physical_indexing_plan, PhysicalIndexingPlan, SourceUid, + build_physical_indexing_plan, list_indexing_tasks, PhysicalIndexingPlan, }; use crate::{IndexerNodeInfo, IndexerPool}; @@ -54,24 +53,26 @@ pub struct IndexingSchedulerState { pub last_applied_plan_timestamp: Option, } -/// The [`IndexingScheduler`] is responsible for scheduling indexing tasks to indexers. -/// The scheduling executes the following steps: -/// 1. Fetches all indexes metadata. -/// 2. Builds an indexing plan = `[Vec]`, from the indexes metadatas. See -/// [`build_indexing_plan`] for the implementation details. -/// 3. Builds a [`PhysicalIndexingPlan`] from the list of indexing tasks. See +/// The [`IndexingScheduler`] is responsible for listing indexing tasks and assiging them to +/// indexers. +/// We call this duty `scheduling`. Contrary to what the name suggests, most indexing tasks are +/// ever running. We just borrowed the terminology to Kubernetes. +/// +/// Scheduling executes the following steps: +/// 1. List all of the logical indexing tasks, from the model. (See [`list_indexing_tasks`]) +/// 2. Builds a [`PhysicalIndexingPlan`] from the list of logical indexing tasks. See /// [`build_physical_indexing_plan`] for the implementation details. -/// 4. Apply the [`PhysicalIndexingPlan`]: for each indexer, the scheduler send the indexing tasks +/// 3. Apply the [`PhysicalIndexingPlan`]: for each indexer, the scheduler send the indexing tasks /// by gRPC. An indexer immediately returns an Ok and apply asynchronously the received plan. Any /// errors (network) happening in this step are ignored. The scheduler runs a control loop that /// regularly checks if indexers are effectively running their plans (more details in the next /// section). /// /// All events altering the list of indexes and sources are proxied through -/// through the control plane. The control plane state is therefore guaranteed to be up-to-date +/// through the control plane. The control plane model is therefore guaranteed to be up-to-date /// (at the cost of making the control plane a single point of failure). /// -/// They then trigger the production of a new `PhysicalIndexingPlan`. +/// Each change to the model triggers the production of a new `PhysicalIndexingPlan`. /// /// A `ControlPlanLoop` event is scheduled every `CONTROL_PLAN_LOOP_INTERVAL` and steers /// the cluster toward the last applied [`PhysicalIndexingPlan`]. @@ -90,7 +91,7 @@ pub struct IndexingSchedulerState { /// Concretely, it will send the faulty nodes of the plan they are supposed to follow. // /// Finally, in order to give the time for each indexer to run their indexing tasks, the control -/// plase will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired +/// plane will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired /// plan with the running plan. pub struct IndexingScheduler { cluster_id: String, @@ -134,27 +135,17 @@ impl IndexingScheduler { self.state.clone() } - pub(crate) async fn schedule_indexing_plan_if_needed(&mut self) -> anyhow::Result<()> { + pub(crate) fn schedule_indexing_plan_if_needed( + &mut self, + model: &ControlPlaneModel, + ) -> anyhow::Result<()> { let mut indexers = self.get_indexers_from_indexer_pool(); if indexers.is_empty() { warn!("No indexer available, cannot schedule an indexing plan."); return Ok(()); }; - let source_configs: HashMap = self.fetch_source_configs().await?; - let mut indexing_tasks = build_indexing_plan(&source_configs, indexers.len()); - - // TODO: This is a temporary hack to always create an indexing pipeline for the shards that - // exist. - let shards = self.fetch_shards(&source_configs).await?; - for (source_uid, shard_ids) in shards { - indexing_tasks.push(IndexingTask { - index_uid: source_uid.index_uid.into(), - source_id: source_uid.source_id.clone(), - shard_ids, - }); - } - let new_physical_plan = - build_physical_indexing_plan(&indexers, &source_configs, indexing_tasks); + let indexing_tasks = list_indexing_tasks(indexers.len(), model); + let new_physical_plan = build_physical_indexing_plan(&indexers, indexing_tasks); if let Some(last_applied_plan) = &self.state.last_applied_physical_plan { let plans_diff = get_indexing_plans_diff( last_applied_plan.indexing_tasks_per_node(), @@ -165,93 +156,16 @@ impl IndexingScheduler { return Ok(()); } } - self.apply_physical_indexing_plan(&mut indexers, new_physical_plan) - .await; + self.apply_physical_indexing_plan(&mut indexers, new_physical_plan); self.state.num_schedule_indexing_plan += 1; Ok(()) } - async fn fetch_source_configs(&self) -> anyhow::Result> { - let indexes_metadatas = self - .metastore - .list_indexes_metadatas(ListIndexesQuery::All) - .await?; - let source_configs: HashMap = indexes_metadatas - .into_iter() - .flat_map(|index_metadata| { - index_metadata - .sources - .into_iter() - .map(move |(source_id, source_config)| { - ( - SourceUid { - index_uid: index_metadata.index_uid.clone(), - source_id, - }, - source_config, - ) - }) - }) - .collect(); - Ok(source_configs) - } - - /// Returns, for all of the ingest api source, a map with the list of their available shard ids. - async fn fetch_shards( - &self, - source_configs: &HashMap, - ) -> anyhow::Result>> { - let mut list_shards_subrequests = Vec::new(); - - for (source_uid, source_config) in source_configs { - if source_uid.source_id != INGEST_SOURCE_ID || !source_config.enabled { - continue; - } - // TODO: More precisely, we want the shards that are open or closing or such that - // `shard.publish_position_inclusive` - // < `shard.replication_position_inclusive`. - let list_shard_subrequest = ListShardsSubrequest { - index_uid: source_uid.index_uid.clone().into(), - source_id: source_uid.source_id.clone(), - shard_state: None, - }; - list_shards_subrequests.push(list_shard_subrequest); - } - let list_shards_request = ListShardsRequest { - subrequests: list_shards_subrequests, - }; - let list_shards_response = self - .metastore - .list_shards(list_shards_request) - .await - .context("failed to list shards from metastore")?; - - let mut shards = HashMap::new(); - - for list_shards_subresponse in list_shards_response.subresponses { - let source_uid = SourceUid { - index_uid: list_shards_subresponse.index_uid.into(), - source_id: list_shards_subresponse.source_id, - }; - if list_shards_subresponse.shards.is_empty() { - continue; - } - let shard_ids = list_shards_subresponse - .shards - .into_iter() - .filter(|shard| shard.is_indexable()) - .map(|shard| shard.shard_id) - .collect(); - shards.insert(source_uid, shard_ids); - } - Ok(shards) - } - /// Checks if the last applied plan corresponds to the running indexing tasks present in the /// chitchat cluster state. If true, do nothing. /// - If node IDs differ, schedule a new indexing plan. /// - If indexing tasks differ, apply again the last plan. - pub(crate) async fn control_running_plan(&mut self) -> anyhow::Result<()> { + pub(crate) fn control_running_plan(&mut self, model: &ControlPlaneModel) -> anyhow::Result<()> { let last_applied_plan = if let Some(last_applied_plan) = self.state.last_applied_physical_plan.as_ref() { last_applied_plan @@ -259,7 +173,7 @@ impl IndexingScheduler { // If there is no plan, the node is probably starting and the scheduler did not find // indexers yet. In this case, we want to schedule as soon as possible to find new // indexers. - self.schedule_indexing_plan_if_needed().await?; + self.schedule_indexing_plan_if_needed(model)?; return Ok(()); }; @@ -272,7 +186,7 @@ impl IndexingScheduler { } let mut indexers = self.get_indexers_from_indexer_pool(); - let running_indexing_tasks_by_node_id: HashMap> = indexers + let running_indexing_tasks_by_node_id: FnvHashMap> = indexers .iter() .map(|indexer| (indexer.0.clone(), indexer.1.indexing_tasks.clone())) .collect(); @@ -283,12 +197,11 @@ impl IndexingScheduler { ); if !indexing_plans_diff.has_same_nodes() { info!(plans_diff=?indexing_plans_diff, "Running plan and last applied plan node IDs differ: schedule an indexing plan."); - self.schedule_indexing_plan_if_needed().await?; + self.schedule_indexing_plan_if_needed(model)?; } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "Running tasks and last applied tasks differ: reapply last plan."); - self.apply_physical_indexing_plan(&mut indexers, last_applied_plan.clone()) - .await; + self.apply_physical_indexing_plan(&mut indexers, last_applied_plan.clone()); } Ok(()) } @@ -297,7 +210,7 @@ impl IndexingScheduler { self.indexer_pool.pairs() } - async fn apply_physical_indexing_plan( + fn apply_physical_indexing_plan( &mut self, indexers: &mut [(String, IndexerNodeInfo)], new_physical_plan: PhysicalIndexingPlan, @@ -305,6 +218,8 @@ impl IndexingScheduler { debug!("Apply physical indexing plan: {:?}", new_physical_plan); for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_node() { // We don't want to block on a slow indexer so we apply this change asynchronously + // TODO not blocking is cool, but we need to make sure there is not accumulation + // possible here. tokio::spawn({ let indexer = indexers .iter() @@ -332,19 +247,21 @@ impl IndexingScheduler { // Should be called whenever a change in the list of index/shard // has happened - pub(crate) async fn on_index_change(&mut self) -> anyhow::Result<()> { - self.schedule_indexing_plan_if_needed() - .await + pub(crate) async fn on_index_change( + &mut self, + model: &ControlPlaneModel, + ) -> anyhow::Result<()> { + self.schedule_indexing_plan_if_needed(model) .context("error when scheduling indexing plan")?; Ok(()) } } struct IndexingPlansDiff<'a> { - pub missing_node_ids: HashSet<&'a str>, - pub unplanned_node_ids: HashSet<&'a str>, - pub missing_tasks_by_node_id: HashMap<&'a str, Vec<&'a IndexingTask>>, - pub unplanned_tasks_by_node_id: HashMap<&'a str, Vec<&'a IndexingTask>>, + pub missing_node_ids: FnvHashSet<&'a str>, + pub unplanned_node_ids: FnvHashSet<&'a str>, + pub missing_tasks_by_node_id: FnvHashMap<&'a str, Vec<&'a IndexingTask>>, + pub unplanned_tasks_by_node_id: FnvHashMap<&'a str, Vec<&'a IndexingTask>>, } impl<'a> IndexingPlansDiff<'a> { @@ -412,29 +329,30 @@ impl<'a> fmt::Debug for IndexingPlansDiff<'a> { /// Returns the difference between the `running_plan` retrieved from the chitchat state and /// the last plan applied by the scheduler. fn get_indexing_plans_diff<'a>( - running_plan: &'a HashMap>, - last_applied_plan: &'a HashMap>, + running_plan: &'a FnvHashMap>, + last_applied_plan: &'a FnvHashMap>, ) -> IndexingPlansDiff<'a> { // Nodes diff. - let running_node_ids: HashSet<&str> = running_plan + let running_node_ids: FnvHashSet<&str> = running_plan .iter() .map(|(node_id, _)| node_id.as_str()) .collect(); - let planned_node_ids: HashSet<&str> = last_applied_plan + let planned_node_ids: FnvHashSet<&str> = last_applied_plan .iter() .map(|(node_id, _)| node_id.as_str()) .collect(); - let missing_node_ids: HashSet<&str> = planned_node_ids + let missing_node_ids: FnvHashSet<&str> = planned_node_ids .difference(&running_node_ids) .copied() .collect(); - let unplanned_node_ids: HashSet<&str> = running_node_ids + let unplanned_node_ids: FnvHashSet<&str> = running_node_ids .difference(&planned_node_ids) .copied() .collect(); // Tasks diff. - let mut missing_tasks_by_node_id: HashMap<&str, Vec<&IndexingTask>> = HashMap::new(); - let mut unplanned_tasks_by_node_id: HashMap<&str, Vec<&IndexingTask>> = HashMap::new(); + let mut missing_tasks_by_node_id: FnvHashMap<&str, Vec<&IndexingTask>> = FnvHashMap::default(); + let mut unplanned_tasks_by_node_id: FnvHashMap<&str, Vec<&IndexingTask>> = + FnvHashMap::default(); for node_id in running_node_ids.iter().chain(planned_node_ids.iter()) { let running_tasks = running_plan .get(*node_id) @@ -466,20 +384,20 @@ fn get_indexing_tasks_diff<'a>( ) -> (Vec<&'a IndexingTask>, Vec<&'a IndexingTask>) { let mut missing_tasks: Vec<&IndexingTask> = Vec::new(); let mut unplanned_tasks: Vec<&IndexingTask> = Vec::new(); - let grouped_running_tasks: HashMap<&IndexingTask, usize> = running_tasks + let grouped_running_tasks: FnvHashMap<&IndexingTask, usize> = running_tasks .iter() .group_by(|&task| task) .into_iter() .map(|(key, group)| (key, group.count())) .collect(); - let grouped_last_applied_tasks: HashMap<&IndexingTask, usize> = last_applied_tasks + let grouped_last_applied_tasks: FnvHashMap<&IndexingTask, usize> = last_applied_tasks .iter() .group_by(|&task| task) .into_iter() .map(|(key, group)| (key, group.count())) .collect(); - let all_tasks: HashSet<&IndexingTask> = - HashSet::from_iter(running_tasks.iter().chain(last_applied_tasks.iter())); + let all_tasks: FnvHashSet<&IndexingTask> = + FnvHashSet::from_iter(running_tasks.iter().chain(last_applied_tasks.iter())); for task in all_tasks { let running_task_count = grouped_running_tasks.get(task).unwrap_or(&0); let desired_task_count = grouped_last_applied_tasks.get(task).unwrap_or(&0); @@ -506,14 +424,14 @@ mod tests { #[test] fn test_indexing_plans_diff() { { - let running_plan = HashMap::new(); - let desired_plan = HashMap::new(); + let running_plan = FnvHashMap::default(); + let desired_plan = FnvHashMap::default(); let indexing_plans_diff = get_indexing_plans_diff(&running_plan, &desired_plan); assert!(indexing_plans_diff.is_empty()); } { - let mut running_plan = HashMap::new(); - let mut desired_plan = HashMap::new(); + let mut running_plan = FnvHashMap::default(); + let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), @@ -536,8 +454,8 @@ mod tests { assert!(indexing_plans_diff.is_empty()); } { - let mut running_plan = HashMap::new(); - let mut desired_plan = HashMap::new(); + let mut running_plan = FnvHashMap::default(); + let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), @@ -557,17 +475,17 @@ mod tests { assert!(!indexing_plans_diff.has_same_tasks()); assert_eq!( indexing_plans_diff.unplanned_tasks_by_node_id, - HashMap::from_iter([("indexer-1", vec![&task_1])]) + FnvHashMap::from_iter([("indexer-1", vec![&task_1])]) ); assert_eq!( indexing_plans_diff.missing_tasks_by_node_id, - HashMap::from_iter([("indexer-1", vec![&task_2])]) + FnvHashMap::from_iter([("indexer-1", vec![&task_2])]) ); } { // Task assigned to indexer-1 in desired plan but another one running. - let mut running_plan = HashMap::new(); - let mut desired_plan = HashMap::new(); + let mut running_plan = FnvHashMap::default(); + let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), @@ -587,25 +505,25 @@ mod tests { assert!(!indexing_plans_diff.has_same_tasks()); assert_eq!( indexing_plans_diff.missing_node_ids, - HashSet::from_iter(["indexer-1"]) + FnvHashSet::from_iter(["indexer-1"]) ); assert_eq!( indexing_plans_diff.unplanned_node_ids, - HashSet::from_iter(["indexer-2"]) + FnvHashSet::from_iter(["indexer-2"]) ); assert_eq!( indexing_plans_diff.missing_tasks_by_node_id, - HashMap::from_iter([("indexer-1", vec![&task_1]), ("indexer-2", Vec::new())]) + FnvHashMap::from_iter([("indexer-1", vec![&task_1]), ("indexer-2", Vec::new())]) ); assert_eq!( indexing_plans_diff.unplanned_tasks_by_node_id, - HashMap::from_iter([("indexer-2", vec![&task_2]), ("indexer-1", Vec::new())]) + FnvHashMap::from_iter([("indexer-2", vec![&task_2]), ("indexer-1", Vec::new())]) ); } { // Diff with 3 same tasks running but only one on the desired plan. - let mut running_plan = HashMap::new(); - let mut desired_plan = HashMap::new(); + let mut running_plan = FnvHashMap::default(); + let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), @@ -623,13 +541,13 @@ mod tests { assert!(!indexing_plans_diff.has_same_tasks()); assert_eq!( indexing_plans_diff.missing_tasks_by_node_id, - HashMap::from_iter([("indexer-1", vec![&task_1, &task_1])]) + FnvHashMap::from_iter([("indexer-1", vec![&task_1, &task_1])]) ); } { // Diff with 3 same tasks on desired plan but only one running. - let mut running_plan = HashMap::new(); - let mut desired_plan = HashMap::new(); + let mut running_plan = FnvHashMap::default(); + let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), @@ -647,7 +565,7 @@ mod tests { assert!(!indexing_plans_diff.has_same_tasks()); assert_eq!( indexing_plans_diff.unplanned_tasks_by_node_id, - HashMap::from_iter([("indexer-1", vec![&task_1, &task_1])]) + FnvHashMap::from_iter([("indexer-1", vec![&task_1, &task_1])]) ); } } diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 9d3875cba9f..5e94de90854 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -214,6 +214,7 @@ impl IngestController { source_id: get_open_shards_subrequest.source_id.clone(), }) })?; + if !open_shards.is_empty() { let get_open_shards_subresponse = GetOpenShardsSubresponse { index_uid: index_uid.into(), @@ -237,9 +238,11 @@ impl IngestController { follower_id: follower_id.map(|follower_id| follower_id.into()), next_shard_id, }; + open_shards_subrequests.push(open_shards_subrequest); } } + if !open_shards_subrequests.is_empty() { let open_shards_request = metastore::OpenShardsRequest { subrequests: open_shards_subrequests, @@ -265,10 +268,9 @@ impl IngestController { get_open_shards_subresponses.push(get_open_shards_subresponse); } } - let get_open_shards_response = GetOrCreateOpenShardsResponse { + Ok(GetOrCreateOpenShardsResponse { subresponses: get_open_shards_subresponses, - }; - Ok(get_open_shards_response) + }) } } @@ -281,7 +283,8 @@ pub enum PingError { #[cfg(test)] mod tests { - use quickwit_metastore::MockMetastore; + use quickwit_config::{SourceConfig, SourceParams}; + use quickwit_metastore::{IndexMetadata, MockMetastore}; use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; use quickwit_proto::ingest::ingester::{ IngesterServiceClient, MockIngesterService, PingResponse, @@ -487,25 +490,31 @@ mod tests { #[tokio::test] async fn test_ingest_controller_get_open_shards() { - let index_uid_0_str: &'static str = "test-index-0:0"; - let index_uid_0: IndexUid = index_uid_0_str.to_string().into(); - let index_uid_1_str: &'static str = "test-index-1:0"; - let index_uid_1: IndexUid = index_uid_1_str.to_string().into(); let source_id: &'static str = "test-source"; + let index_0 = "test-index-0"; + let index_metadata_0 = IndexMetadata::for_test(index_0, "ram://indexes/test-index0"); + let index_uid_0 = index_metadata_0.index_uid.clone(); + let index_uid_0_str = index_uid_0.to_string(); + let index_1 = "test-index-1"; + let index_metadata_1 = IndexMetadata::for_test(index_1, "ram://indexes/test-index1"); + let index_uid_1 = index_metadata_1.index_uid.clone(); + let index_uid_1_str = index_uid_1.to_string(); + let progress = Progress::default(); + let index_uid_1_str_clone = index_uid_1_str.clone(); let mut mock_metastore = MockMetastore::default(); mock_metastore .expect_open_shards() .once() .returning(move |request| { assert_eq!(request.subrequests.len(), 1); - assert_eq!(&request.subrequests[0].index_uid, index_uid_1_str); + assert_eq!(&request.subrequests[0].index_uid, &index_uid_1_str_clone); assert_eq!(&request.subrequests[0].source_id, source_id); let subresponses = vec![metastore::OpenShardsSubresponse { - index_uid: index_uid_1_str.to_string(), + index_uid: index_uid_1_str_clone.to_string(), source_id: "test-source".to_string(), open_shards: vec![Shard { shard_id: 1, @@ -540,10 +549,14 @@ mod tests { let mut model = ControlPlaneModel::default(); - model.add_index(index_uid_0.clone()); - model.add_index(index_uid_1.clone()); - model.add_source(&index_uid_0, &source_id.into()); - model.add_source(&index_uid_1, &source_id.into()); + let source_config = SourceConfig::for_test(source_id, SourceParams::stdin()); + + model.add_index(index_metadata_0.clone()); + model.add_index(index_metadata_1.clone()); + model + .add_source(&index_uid_0, source_config.clone()) + .unwrap(); + model.add_source(&index_uid_1, source_config).unwrap(); let shards = vec![ Shard { @@ -559,16 +572,19 @@ mod tests { ..Default::default() }, ]; + model.update_shards(&index_uid_0, &source_id.into(), &shards, 3); let request = GetOrCreateOpenShardsRequest { subrequests: Vec::new(), unavailable_ingesters: Vec::new(), }; + let response = ingest_controller .get_or_create_open_shards(request, &mut model, &progress) .await .unwrap(); + assert_eq!(response.subresponses.len(), 0); let subrequests = vec![ @@ -592,6 +608,7 @@ mod tests { .get_or_create_open_shards(request, &mut model, &progress) .await .unwrap(); + assert_eq!(response.subresponses.len(), 2); assert_eq!(response.subresponses[0].index_uid, index_uid_0_str); @@ -603,7 +620,7 @@ mod tests { "test-ingester-1" ); - assert_eq!(response.subresponses[1].index_uid, index_uid_1_str); + assert_eq!(&response.subresponses[1].index_uid, &index_uid_1_str); assert_eq!(response.subresponses[1].source_id, source_id); assert_eq!(response.subresponses[1].open_shards.len(), 1); assert_eq!(response.subresponses[1].open_shards[0].shard_id, 1); diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index c6a3d274170..738615fe3ac 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -20,8 +20,8 @@ pub mod control_plane; pub(crate) mod control_plane_model; pub mod indexing_plan; +pub mod indexing_scheduler; pub mod ingest; -pub mod scheduler; use async_trait::async_trait; use quickwit_common::pubsub::EventSubscriber; @@ -29,8 +29,17 @@ use quickwit_common::tower::Pool; use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClient}; use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask}; use quickwit_proto::metastore::{CloseShardsRequest, DeleteShardsRequest}; +use quickwit_proto::{IndexUid, SourceId}; use tracing::error; +/// It can however appear only once in a given index. +/// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is. +#[derive(PartialEq, Eq, Debug, PartialOrd, Ord, Hash, Clone)] +pub struct SourceUid { + pub index_uid: IndexUid, + pub source_id: SourceId, +} + /// Indexer-node specific information stored in the pool of available indexer nodes #[derive(Debug, Clone)] pub struct IndexerNodeInfo { diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index d8c0a0c8b8d..4873cbf0483 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -17,12 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use chitchat::transport::ChannelTransport; +use fnv::FnvHashMap; use futures::{Stream, StreamExt}; use quickwit_actors::{Inbox, Mailbox, Observe, Universe}; use quickwit_cluster::{create_cluster_for_test, Cluster, ClusterChange}; @@ -38,7 +38,7 @@ use quickwit_proto::NodeId; use serde_json::json; use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL}; -use crate::scheduler::MIN_DURATION_BETWEEN_SCHEDULING; +use crate::indexing_scheduler::MIN_DURATION_BETWEEN_SCHEDULING; use crate::IndexerNodeInfo; fn index_metadata_for_test( @@ -72,7 +72,7 @@ fn index_metadata_for_test( pub fn test_indexer_change_stream( cluster_change_stream: impl Stream + Send + 'static, - indexing_clients: HashMap>, + indexing_clients: FnvHashMap>, ) -> impl Stream> + Send + 'static { cluster_change_stream.filter_map(move |cluster_change| { let indexing_clients = indexing_clients.clone(); @@ -128,7 +128,7 @@ async fn start_control_plane( let indexer_pool = Pool::default(); let ingester_pool = Pool::default(); let change_stream = cluster.ready_nodes_change_stream().await; - let mut indexing_clients = HashMap::new(); + let mut indexing_clients = FnvHashMap::default(); for indexer in indexers { let (indexing_service_mailbox, indexing_service_inbox) = universe.create_test_mailbox(); diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index 909b2be6b3c..e8144d3d163 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -55,8 +55,14 @@ pub struct IndexMetadata { impl IndexMetadata { /// Panics if `index_config` is missing `index_uri`. pub fn new(index_config: IndexConfig) -> Self { + let index_uid = IndexUid::new(index_config.index_id.clone()); + IndexMetadata::new_with_index_uid(index_uid, index_config) + } + + /// Panics if `index_config` is missing `index_uri`. + pub fn new_with_index_uid(index_uid: IndexUid, index_config: IndexConfig) -> Self { IndexMetadata { - index_uid: IndexUid::new(index_config.index_id.clone()), + index_uid, index_config, checkpoint: Default::default(), create_timestamp: OffsetDateTime::now_utc().unix_timestamp(), @@ -65,6 +71,8 @@ impl IndexMetadata { } /// Returns an [`IndexMetadata`] object with multiple hard coded values for tests. + /// + /// An incarnation id of `0` will be used to complete the index id into a index uuid. #[cfg(any(test, feature = "testsuite"))] pub fn for_test(index_id: &str, index_uri: &str) -> Self { let index_uid = IndexUid::from_parts(index_id, "0"); diff --git a/quickwit/quickwit-proto/src/types.rs b/quickwit/quickwit-proto/src/types.rs index 0ff71123fbe..d818005ede3 100644 --- a/quickwit/quickwit-proto/src/types.rs +++ b/quickwit/quickwit-proto/src/types.rs @@ -60,13 +60,14 @@ pub fn split_queue_id(queue_id: &str) -> Option<(IndexUid, SourceId, ShardId)> { pub struct IndexUid(String); impl fmt::Display for IndexUid { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) } } impl IndexUid { - /// Creates a new index uid form index_id and incarnation_id + /// Creates a new index uid from index_id. + /// A random UUID will be used as incarnation pub fn new(index_id: impl Into) -> Self { Self::from_parts(index_id, Ulid::new().to_string()) }