From aab1e5e347c6fa8ed832ce5cb5b902ad10bccd6e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 15 Dec 2023 11:20:14 +0900 Subject: [PATCH] Issue/4244 delete index delete shard (#4270) * Remove shards from ingesters on source/index deletion. The PR works by adding a RPC on ingester, to "retain shards": the control plane can send a list of shards that it expects to see on an indexer and the ingester deletes the shards that are not in that list. The control plane performs this RPC when a source is deleted, or what an index is deleted, or upon restart. This PR is incomplete and needs some work as described in #4274. The "retain" mechanics was preferred over sending the list of shard to remove precisely in prevision of #4274. Also - Bugfix in num shards metrics (we were returning the number of source) - Adding a ingester -> shard table in the control plane model Closes #4244. Co-authored-by: Adrien Guillo --- .../src/control_plane.rs | 238 +++++++++++++++++- .../src/indexing_scheduler/mod.rs | 2 +- .../src/ingest/ingest_controller.rs | 214 +++++++++++++--- .../quickwit-control-plane/src/model/mod.rs | 103 +++++--- .../src/model/shard_table.rs | 184 +++++++++++++- .../quickwit-ingest/src/ingest_v2/fetch.rs | 3 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 96 ++++++- .../protos/quickwit/ingester.proto | 17 ++ .../quickwit/quickwit.ingest.ingester.rs | 226 +++++++++++++++++ quickwit/quickwit-proto/src/ingest/mod.rs | 10 + 10 files changed, 1012 insertions(+), 81 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 419df93f884..3f211b70f66 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::BTreeSet; use std::time::Duration; use anyhow::Context; @@ -134,8 +135,13 @@ impl Actor for ControlPlane { self.indexing_scheduler .schedule_indexing_plan_if_needed(&self.model); + self.ingest_controller + .sync_with_all_ingesters(&self.model) + .await; + ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) .await; + Ok(()) } } @@ -184,7 +190,10 @@ impl Handler for ControlPlane { shard_positions_update: ShardPositionsUpdate, _ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - let Some(shard_entries) = self.model.list_shards(&shard_positions_update.source_uid) else { + let Some(shard_entries) = self + .model + .list_shards_for_source(&shard_positions_update.source_uid) + else { // The source no longer exists. return Ok(()); }; @@ -332,15 +341,24 @@ impl Handler for ControlPlane { return convert_metastore_error(metastore_error); }; - self.model.delete_index(&index_uid); + let ingester_needing_resync: BTreeSet = self + .model + .list_shards_for_index(&index_uid) + .flat_map(|shard_entry| shard_entry.ingester_nodes()) + .collect(); - let response = EmptyResponse {}; + self.ingest_controller + .sync_with_ingesters(&ingester_needing_resync, &self.model) + .await; + + self.model.delete_index(&index_uid); // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. self.indexing_scheduler .schedule_indexing_plan_if_needed(&self.model); + let response = EmptyResponse {}; Ok(Ok(response)) } } @@ -422,16 +440,42 @@ impl Handler for ControlPlane { &mut self, request: DeleteSourceRequest, _ctx: &ActorContext, - ) -> Result { + ) -> Result, ActorExitStatus> { let index_uid: IndexUid = request.index_uid.clone().into(); let source_id = request.source_id.clone(); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + if let Err(metastore_error) = self.metastore.delete_source(request).await { + // TODO If the metastore fails returns an error but somehow succeed deleting the source, + // the control plane will restart and the shards will be remaining on the ingesters. + // + // This is tracked in #4274 return convert_metastore_error(metastore_error); }; - self.model.delete_source(&index_uid, &source_id); + + let ingester_needing_resync: BTreeSet = + if let Some(shards) = self.model.list_shards_for_source(&source_uid) { + shards + .flat_map(|shard_entry| shard_entry.ingester_nodes()) + .collect() + } else { + BTreeSet::new() + }; + + self.ingest_controller + .sync_with_ingesters(&ingester_needing_resync, &self.model) + .await; + + self.model.delete_source(&source_uid); + self.indexing_scheduler .schedule_indexing_plan_if_needed(&self.model); let response = EmptyResponse {}; + Ok(Ok(response)) } } @@ -531,6 +575,7 @@ mod tests { }; use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; use quickwit_proto::indexing::{ApplyIndexingPlanRequest, CpuCapacity, IndexingServiceClient}; + use quickwit_proto::ingest::ingester::{IngesterServiceClient, RetainShardsResponse}; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ DeleteShardsResponse, EntityKind, ListIndexesMetadataRequest, ListIndexesMetadataResponse, @@ -1246,4 +1291,187 @@ mod tests { universe.assert_quit().await; } + + #[tokio::test] + async fn test_delete_index() { + quickwit_common::setup_logging_for_tests(); + let universe = Universe::default(); + let node_id = NodeId::new("control-plane-node".to_string()); + let indexer_pool = IndexerPool::default(); + + let ingester_pool = IngesterPool::default(); + let mut ingester_mock = IngesterServiceClient::mock(); + ingester_mock + .expect_retain_shards() + .times(2) + .returning(|mut request| { + assert_eq!(request.retain_shards_for_sources.len(), 1); + let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap(); + assert_eq!(&retain_shards_for_source.shard_ids, &[15]); + Ok(RetainShardsResponse {}) + }); + ingester_pool.insert("node1".into(), ingester_mock.into()); + + let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0"); + let index_uid_clone = index_0.index_uid.clone(); + + let mut mock_metastore = MetastoreServiceClient::mock(); + mock_metastore.expect_delete_index().return_once( + move |delete_index_request: DeleteIndexRequest| { + assert_eq!(delete_index_request.index_uid, index_uid_clone.to_string()); + Ok(EmptyResponse {}) + }, + ); + + let mut source = SourceConfig::ingest_v2_default(); + source.enabled = true; + index_0.add_source(source.clone()).unwrap(); + + let index_0_clone = index_0.clone(); + mock_metastore.expect_list_indexes_metadata().return_once( + move |list_indexes_request: ListIndexesMetadataRequest| { + assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all()); + Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![ + index_0_clone.clone() + ]) + .unwrap()) + }, + ); + + let index_uid_clone = index_0.index_uid.clone(); + mock_metastore.expect_list_shards().return_once( + move |_list_shards_request: ListShardsRequest| { + let list_shards_resp = ListShardsResponse { + subresponses: vec![ListShardsSubresponse { + index_uid: index_uid_clone.to_string(), + source_id: INGEST_SOURCE_ID.to_string(), + shards: vec![Shard { + index_uid: index_uid_clone.to_string(), + source_id: source.source_id.to_string(), + shard_id: 15, + leader_id: "node1".to_string(), + follower_id: None, + shard_state: ShardState::Open as i32, + publish_position_inclusive: None, + publish_token: None, + }], + next_shard_id: 18, + }], + }; + Ok(list_shards_resp) + }, + ); + + let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( + &universe, + "cluster".to_string(), + node_id, + indexer_pool, + ingester_pool, + MetastoreServiceClient::from(mock_metastore), + 1, + ); + // This update should not trigger anything in the control plane. + control_plane_mailbox + .ask(DeleteIndexRequest { + index_uid: index_0.index_uid.to_string(), + }) + .await + .unwrap() + .unwrap(); + + universe.assert_quit().await; + } + #[tokio::test] + async fn test_delete_source() { + quickwit_common::setup_logging_for_tests(); + let universe = Universe::default(); + let node_id = NodeId::new("control-plane-node".to_string()); + let indexer_pool = IndexerPool::default(); + + let ingester_pool = IngesterPool::default(); + let mut ingester_mock = IngesterServiceClient::mock(); + ingester_mock + .expect_retain_shards() + .times(2) + .returning(|mut request| { + assert_eq!(request.retain_shards_for_sources.len(), 1); + let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap(); + assert_eq!(&retain_shards_for_source.shard_ids, &[15]); + Ok(RetainShardsResponse {}) + }); + ingester_pool.insert("node1".into(), ingester_mock.into()); + + let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0"); + let index_uid_clone = index_0.index_uid.clone(); + + let mut mock_metastore = MetastoreServiceClient::mock(); + mock_metastore.expect_delete_source().return_once( + move |delete_source_request: DeleteSourceRequest| { + assert_eq!(delete_source_request.index_uid, index_uid_clone.to_string()); + assert_eq!(&delete_source_request.source_id, INGEST_SOURCE_ID); + Ok(EmptyResponse {}) + }, + ); + + let mut source = SourceConfig::ingest_v2_default(); + source.enabled = true; + index_0.add_source(source.clone()).unwrap(); + + let index_0_clone = index_0.clone(); + mock_metastore.expect_list_indexes_metadata().return_once( + move |list_indexes_request: ListIndexesMetadataRequest| { + assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all()); + Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![ + index_0_clone.clone() + ]) + .unwrap()) + }, + ); + + let index_uid_clone = index_0.index_uid.clone(); + mock_metastore.expect_list_shards().return_once( + move |_list_shards_request: ListShardsRequest| { + let list_shards_resp = ListShardsResponse { + subresponses: vec![ListShardsSubresponse { + index_uid: index_uid_clone.to_string(), + source_id: INGEST_SOURCE_ID.to_string(), + shards: vec![Shard { + index_uid: index_uid_clone.to_string(), + source_id: source.source_id.to_string(), + shard_id: 15, + leader_id: "node1".to_string(), + follower_id: None, + shard_state: ShardState::Open as i32, + publish_position_inclusive: None, + publish_token: None, + }], + next_shard_id: 18, + }], + }; + Ok(list_shards_resp) + }, + ); + + let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( + &universe, + "cluster".to_string(), + node_id, + indexer_pool, + ingester_pool, + MetastoreServiceClient::from(mock_metastore), + 1, + ); + // This update should not trigger anything in the control plane. + control_plane_mailbox + .ask(DeleteSourceRequest { + index_uid: index_0.index_uid.to_string(), + source_id: INGEST_SOURCE_ID.to_string(), + }) + .await + .unwrap() + .unwrap(); + + universe.assert_quit().await; + } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 8b54bdebdbd..09adddb7abb 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -140,7 +140,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { SourceType::IngestV2 => { // Expect: the source should exist since we just read it from `get_source_configs`. let shard_ids: Vec = model - .list_shards(&source_uid) + .list_shards_for_source(&source_uid) .expect("source should exist") .map(|shard| shard.shard_id) .collect(); diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 496d79afc24..c54cd366eb8 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -17,8 +17,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::fmt; +use std::future::Future; use std::time::Duration; use fnv::{FnvHashMap, FnvHashSet}; @@ -31,7 +32,8 @@ use quickwit_proto::control_plane::{ GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, }; use quickwit_proto::ingest::ingester::{ - CloseShardsRequest, IngesterService, InitShardsRequest, PingRequest, + CloseShardsRequest, IngesterService, InitShardsRequest, PingRequest, RetainShardsForSource, + RetainShardsRequest, }; use quickwit_proto::ingest::{IngestV2Error, Shard, ShardIds, ShardState}; use quickwit_proto::metastore; @@ -39,7 +41,7 @@ use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; use rand::seq::SliceRandom; use tokio::time::timeout; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use crate::metrics::CONTROL_PLANE_METRICS; use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats}; @@ -60,6 +62,23 @@ const PING_LEADER_TIMEOUT: Duration = if cfg!(test) { Duration::from_secs(2) }; +const FIRE_AND_FORGET_TIMEOUT: Duration = Duration::from_secs(3); + +/// Spawn a new task to execute the given future, +/// and stops polling it/drops it after a timeout. +/// +/// All errors are ignored, and not even logged. +fn fire_and_forget( + fut: impl Future + Send + 'static, + operation: impl std::fmt::Display + Send + Sync + 'static, +) { + tokio::spawn(async move { + if let Err(_timeout_elapsed) = tokio::time::timeout(FIRE_AND_FORGET_TIMEOUT, fut).await { + error!(operation=%operation, "timeout elapsed"); + } + }); +} + pub struct IngestController { metastore: MetastoreServiceClient, ingester_pool: IngesterPool, @@ -89,6 +108,60 @@ impl IngestController { } } + /// Sends a retain shard request to the given list of ingesters. + /// + /// If the request fails, we just log an error. + pub(crate) async fn sync_with_ingesters( + &self, + ingesters: &BTreeSet, + model: &ControlPlaneModel, + ) { + for ingester in ingesters { + self.sync_with_ingester(ingester, model).await; + } + } + + pub(crate) async fn sync_with_all_ingesters(&self, model: &ControlPlaneModel) { + let ingesters: Vec = self.ingester_pool.keys(); + for ingester in ingesters { + self.sync_with_ingester(&ingester, model).await; + } + } + + async fn sync_with_ingester(&self, ingester: &NodeId, model: &ControlPlaneModel) { + info!(ingester = %ingester, "sync_with_ingester"); + let Some(mut ingester_client) = self.ingester_pool.get(ingester) else { + // TODO: (Maybe) We should mark the ingester as unavailable, and stop advertise its + // shard to routers. + warn!("failed to sync with ingester `{ingester}`: not available"); + return; + }; + + let mut retain_shards_req = RetainShardsRequest::default(); + for (source_uid, shard_ids) in &*model.list_shards_for_node(ingester) { + let shards_for_source = RetainShardsForSource { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.clone(), + shard_ids: shard_ids.iter().copied().collect(), + }; + retain_shards_req + .retain_shards_for_sources + .push(shards_for_source); + } + info!(ingester = %ingester, "retain shards ingester"); + let operation: String = format!("retain shards `{ingester}`"); + fire_and_forget( + async move { + if let Err(retain_shards_err) = + ingester_client.retain_shards(retain_shards_req).await + { + error!(%retain_shards_err, "retain shards error"); + } + }, + operation, + ); + } + /// Pings an ingester to determine whether it is available for hosting a shard. If a follower ID /// is provided, the leader candidate is in charge of pinging the follower candidate as /// well. @@ -357,7 +430,6 @@ impl IngestController { for open_shards_subresponse in open_shards_response.subresponses { let index_uid: IndexUid = open_shards_subresponse.index_uid.clone().into(); let source_id = open_shards_subresponse.source_id.clone(); - model.insert_newly_opened_shards( &index_uid, &source_id, @@ -569,7 +641,7 @@ fn find_scale_down_candidate( ) -> Option<(NodeId, ShardId)> { let mut per_leader_candidates: HashMap<&String, (usize, &ShardEntry)> = HashMap::new(); - for shard in model.list_shards(source_uid)? { + for shard in model.list_shards_for_source(source_uid)? { if shard.is_open() { per_leader_candidates .entry(&shard.leader_id) @@ -606,7 +678,7 @@ mod tests { use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; use quickwit_proto::ingest::ingester::{ CloseShardsResponse, IngesterServiceClient, InitShardsResponse, MockIngesterService, - PingResponse, + PingResponse, RetainShardsResponse, }; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::MetastoreError; @@ -1008,7 +1080,7 @@ mod tests { GetOrCreateOpenShardsFailureReason::SourceNotFound ); - assert_eq!(model.observable_state().num_shards, 2); + assert_eq!(model.observable_state().num_shards, 3); } #[tokio::test] @@ -1026,6 +1098,8 @@ mod tests { let shards = vec![Shard { shard_id: 1, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), leader_id: "test-ingester-0".to_string(), shard_state: ShardState::Open as i32, ..Default::default() @@ -1049,7 +1123,7 @@ mod tests { .unwrap(); let shard_1 = model - .all_shards_mut() + .all_shards() .find(|shard| shard.shard_id == 1) .unwrap(); assert!(shard_1.is_closed()); @@ -1075,18 +1149,24 @@ mod tests { let shards = vec![ Shard { shard_id: 1, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), leader_id: "test-ingester-0".to_string(), shard_state: ShardState::Open as i32, ..Default::default() }, Shard { shard_id: 2, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), leader_id: "test-ingester-0".to_string(), shard_state: ShardState::Closed as i32, ..Default::default() }, Shard { shard_id: 3, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), leader_id: "test-ingester-1".to_string(), shard_state: ShardState::Open as i32, ..Default::default() @@ -1107,19 +1187,19 @@ mod tests { .unwrap(); let shard_1 = model - .all_shards_mut() + .all_shards() .find(|shard| shard.shard_id == 1) .unwrap(); assert!(shard_1.is_unavailable()); let shard_2 = model - .all_shards_mut() + .all_shards() .find(|shard| shard.shard_id == 2) .unwrap(); assert!(shard_2.is_closed()); let shard_3 = model - .all_shards_mut() + .all_shards() .find(|shard| shard.shard_id == 3) .unwrap(); assert!(shard_3.is_open()); @@ -1145,17 +1225,16 @@ mod tests { let progress = Progress::default(); let shards = vec![Shard { + index_uid: index_uid.to_string(), + source_id: source_id.clone(), shard_id: 1, leader_id: "test-ingester".to_string(), shard_state: ShardState::Open as i32, ..Default::default() }]; model.insert_newly_opened_shards(&index_uid, &source_id, shards, 2); + let shard_entries: Vec = model.all_shards().cloned().collect(); - let shard_entries: Vec = model - .all_shards_mut() - .map(|shard_entry| shard_entry.clone()) - .collect(); assert_eq!(shard_entries.len(), 1); assert_eq!(shard_entries[0].ingestion_rate, 0); @@ -1174,26 +1253,22 @@ mod tests { .handle_local_shards_update(local_shards_update, &mut model, &progress) .await; - let shard_entries: Vec = model - .all_shards_mut() - .map(|shard_entry| shard_entry.clone()) - .collect(); + let shard_entries: Vec = model.all_shards().cloned().collect(); assert_eq!(shard_entries.len(), 1); assert_eq!(shard_entries[0].ingestion_rate, 1); // Test update shard ingestion rate with failing scale down. let shards = vec![Shard { shard_id: 2, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), leader_id: "test-ingester".to_string(), shard_state: ShardState::Open as i32, ..Default::default() }]; model.insert_newly_opened_shards(&index_uid, &source_id, shards, 2); - let shard_entries: Vec = model - .all_shards_mut() - .map(|shard_entry| shard_entry.clone()) - .collect(); + let shard_entries: Vec = model.all_shards().cloned().collect(); assert_eq!(shard_entries.len(), 2); let mut ingester_mock = IngesterServiceClient::mock(); @@ -1371,23 +1446,20 @@ mod tests { ingest_controller .try_scale_up_shards(source_uid.clone(), shard_stats, &mut model, &progress) .await; - assert_eq!(model.all_shards_mut().count(), 0); + assert_eq!(model.all_shards().count(), 0); // Test failed to init shards. ingest_controller .try_scale_up_shards(source_uid.clone(), shard_stats, &mut model, &progress) .await; - assert_eq!(model.all_shards_mut().count(), 0); + assert_eq!(model.all_shards().count(), 0); // Test successfully opened shard. ingest_controller .try_scale_up_shards(source_uid.clone(), shard_stats, &mut model, &progress) .await; assert_eq!( - model - .all_shards_mut() - .filter(|shard| shard.is_open()) - .count(), + model.all_shards().filter(|shard| shard.is_open()).count(), 1 ); } @@ -1422,6 +1494,8 @@ mod tests { let shards = vec![Shard { shard_id: 1, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), leader_id: "test-ingester".to_string(), shard_state: ShardState::Open as i32, ..Default::default() @@ -1465,16 +1539,18 @@ mod tests { ingest_controller .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) .await; - assert!(model.all_shards_mut().all(|shard| shard.is_open())); + assert!(model.all_shards().all(|shard| shard.is_open())); // Test successfully closed shard. ingest_controller .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) .await; - assert!(model.all_shards_mut().all(|shard| shard.is_closed())); + assert!(model.all_shards().all(|shard| shard.is_closed())); let shards = vec![Shard { shard_id: 2, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), leader_id: "test-ingester".to_string(), shard_state: ShardState::Open as i32, ..Default::default() @@ -1485,7 +1561,7 @@ mod tests { ingest_controller .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) .await; - assert!(model.all_shards_mut().any(|shard| shard.is_open())); + assert!(model.all_shards().any(|shard| shard.is_open())); } #[test] @@ -1505,35 +1581,47 @@ mod tests { Shard { shard_id: 1, leader_id: "test-ingester-0".to_string(), + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), shard_state: ShardState::Open as i32, ..Default::default() }, Shard { shard_id: 2, + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), leader_id: "test-ingester-0".to_string(), shard_state: ShardState::Open as i32, ..Default::default() }, Shard { shard_id: 3, + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), leader_id: "test-ingester-0".to_string(), shard_state: ShardState::Closed as i32, ..Default::default() }, Shard { shard_id: 4, + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), leader_id: "test-ingester-1".to_string(), shard_state: ShardState::Open as i32, ..Default::default() }, Shard { shard_id: 5, + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), leader_id: "test-ingester-1".to_string(), shard_state: ShardState::Open as i32, ..Default::default() }, Shard { shard_id: 6, + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), leader_id: "test-ingester-1".to_string(), shard_state: ShardState::Open as i32, ..Default::default() @@ -1579,4 +1667,66 @@ mod tests { assert_eq!(leader_id, "test-ingester-0"); assert_eq!(shard_id, 2); } + + #[tokio::test] + async fn test_sync_with_ingesters() { + let metastore = MetastoreServiceClient::mock().into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 2; + + let ingest_controller = + IngestController::new(metastore, ingester_pool.clone(), replication_factor); + + let index_uid: IndexUid = "test-index:0".into(); + let source_id: SourceId = "test-source".into(); + let mut model = ControlPlaneModel::default(); + let shards = vec![ + Shard { + shard_id: 1, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), + leader_id: "node-1".to_string(), + follower_id: Some("node-2".to_string()), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + Shard { + shard_id: 2, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), + leader_id: "node-2".to_string(), + follower_id: Some("node-3".to_string()), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + Shard { + shard_id: 3, + index_uid: index_uid.to_string(), + source_id: source_id.clone(), + leader_id: "node-2".to_string(), + follower_id: Some("node-1".to_string()), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + ]; + model.insert_newly_opened_shards(&index_uid, &source_id, shards, 2); + + let mut ingester_mock1 = IngesterServiceClient::mock(); + let ingester_mock2 = IngesterServiceClient::mock(); + let ingester_mock3 = IngesterServiceClient::mock(); + ingester_mock1 + .expect_retain_shards() + .once() + .returning(|mut request| { + assert_eq!(request.retain_shards_for_sources.len(), 1); + let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap(); + assert_eq!(&retain_shards_for_source.shard_ids, &[1, 3]); + Ok(RetainShardsResponse {}) + }); + ingester_pool.insert("node-1".into(), ingester_mock1.into()); + ingester_pool.insert("node-2".into(), ingester_mock2.into()); + ingester_pool.insert("node-3".into(), ingester_mock3.into()); + let node_id = "node-1".into(); + ingest_controller.sync_with_ingester(&node_id, &model).await; + } } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 785475b863a..ba7cdee7a00 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -19,6 +19,9 @@ mod shard_table; +use std::borrow::Cow; +use std::collections::BTreeSet; +use std::ops::Deref; use std::time::Instant; use anyhow::bail; @@ -27,17 +30,15 @@ use quickwit_common::Progress; use quickwit_config::SourceConfig; use quickwit_ingest::ShardInfos; use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt}; -use quickwit_proto::control_plane::ControlPlaneResult; +use quickwit_proto::control_plane::{ControlPlaneError, ControlPlaneResult}; use quickwit_proto::ingest::Shard; use quickwit_proto::metastore::{ - self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, - MetastoreService, MetastoreServiceClient, SourceType, + self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, ListShardsSubresponse, + MetastoreError, MetastoreService, MetastoreServiceClient, SourceType, }; use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, SourceUid}; use serde::Serialize; -pub(super) use shard_table::{ - NextShardId, ScalingMode, ShardEntry, ShardStats, ShardTable, ShardTableEntry, -}; +pub(super) use shard_table::{NextShardId, ScalingMode, ShardEntry, ShardStats, ShardTable}; use tracing::{info, instrument, warn}; /// The control plane maintains a model in sync with the metastore. @@ -69,7 +70,7 @@ impl ControlPlaneModel { pub fn observable_state(&self) -> ControlPlaneModelMetrics { ControlPlaneModelMetrics { - num_shards: self.shard_table.table_entries.len(), + num_shards: self.shard_table.num_shards(), } } @@ -120,24 +121,24 @@ impl ControlPlaneModel { .protect_future(metastore.list_shards(list_shards_request)) .await?; - self.shard_table - .table_entries - .reserve(list_shard_response.subresponses.len()); - for list_shards_subresponse in list_shard_response.subresponses { num_shards += list_shards_subresponse.shards.len(); - + let ListShardsSubresponse { + index_uid, + source_id, + shards, + next_shard_id, + } = list_shards_subresponse; let source_uid = SourceUid { - index_uid: list_shards_subresponse.index_uid.into(), - source_id: list_shards_subresponse.source_id, + index_uid: IndexUid::parse(&index_uid).map_err(|invalid_index_uri| { + ControlPlaneError::Internal(format!( + "invalid index uid received from the metastore: {invalid_index_uri:?}" + )) + })?, + source_id, }; - let table_entry = ShardTableEntry::from_shards( - list_shards_subresponse.shards, - list_shards_subresponse.next_shard_id, - ); self.shard_table - .table_entries - .insert(source_uid, table_entry); + .initialize_source_shards(source_uid, shards, next_shard_id); } } info!( @@ -205,16 +206,17 @@ impl ControlPlaneModel { Ok(()) } - pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + pub(crate) fn delete_source(&mut self, source_uid: &SourceUid) { // Removing shards from shard table. - self.shard_table.delete_source(index_uid, source_id); + self.shard_table + .delete_source(&source_uid.index_uid, &source_uid.source_id); // Remove source from index config. - let Some(index_model) = self.index_table.get_mut(index_uid) else { - warn!(index_uid=%index_uid, source_id=%source_id, "delete source: index not found"); + let Some(index_model) = self.index_table.get_mut(&source_uid.index_uid) else { + warn!(index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, "delete source: index not found"); return; }; - if index_model.sources.remove(source_id).is_none() { - warn!(index_uid=%index_uid, source_id=%source_id, "delete source: source not found"); + if index_model.sources.remove(&source_uid.source_id).is_none() { + warn!(index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, "delete source: source not found"); }; } @@ -237,12 +239,38 @@ impl ControlPlaneModel { Ok(has_changed) } - pub fn all_shards_mut(&mut self) -> impl Iterator + '_ { + pub(crate) fn all_shards_mut(&mut self) -> impl Iterator + '_ { self.shard_table.all_shards_mut() } + #[cfg(test)] + pub(crate) fn all_shards(&self) -> impl Iterator + '_ { + self.shard_table.all_shards() + } + + pub fn list_shards_for_node( + &self, + ingester: &NodeId, + ) -> impl Deref>> + '_ { + if let Some(shards_for_node) = self.shard_table.list_shards_for_node(ingester) { + Cow::Borrowed(shards_for_node) + } else { + Cow::Owned(FnvHashMap::default()) + } + } + + pub fn list_shards_for_index<'a>( + &'a self, + index_uid: &'a IndexUid, + ) -> impl Iterator + 'a { + self.shard_table.list_shards_for_index(index_uid) + } + /// Lists the shards of a given source. Returns `None` if the source does not exist. - pub fn list_shards(&self, source_uid: &SourceUid) -> Option> { + pub fn list_shards_for_source( + &self, + source_uid: &SourceUid, + ) -> Option> { self.shard_table.list_shards(source_uid) } @@ -372,7 +400,10 @@ mod tests { source_id: INGEST_SOURCE_ID.to_string(), shards: vec![Shard { shard_id: 42, + index_uid: "test-index-0:0".to_string(), + source_id: INGEST_SOURCE_ID.to_string(), shard_state: ShardState::Open as i32, + leader_id: "node1".to_string(), ..Default::default() }], next_shard_id: 43, @@ -408,14 +439,17 @@ mod tests { "test-index-2:0" ); - assert_eq!(model.shard_table.table_entries.len(), 2); + assert_eq!(model.shard_table.num_shards(), 1); let source_uid_0 = SourceUid { index_uid: "test-index-0:0".into(), source_id: INGEST_SOURCE_ID.to_string(), }; - let table_entry = model.shard_table.table_entries.get(&source_uid_0).unwrap(); - let shards = table_entry.shards(); + let shards: Vec<&ShardEntry> = model + .shard_table + .list_shards(&source_uid_0) + .unwrap() + .collect(); assert_eq!(shards.len(), 1); assert_eq!(shards[0].shard_id, 42); @@ -426,8 +460,11 @@ mod tests { index_uid: "test-index-1:0".into(), source_id: INGEST_SOURCE_ID.to_string(), }; - let table_entry = model.shard_table.table_entries.get(&source_uid_1).unwrap(); - let shards = table_entry.shards(); + let shards: Vec<&ShardEntry> = model + .shard_table + .list_shards(&source_uid_1) + .unwrap() + .collect(); assert_eq!(shards.len(), 0); let next_shard_id = model.next_shard_id(&source_uid_1).unwrap(); diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index a129e227028..33b2c20119b 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::hash_map::Entry; +use std::collections::BTreeSet; use std::ops::{Deref, DerefMut}; use std::time::Duration; @@ -129,17 +130,111 @@ impl ShardTableEntry { } } -// A table that keeps track of the existing shards for each index and source. +// A table that keeps track of the existing shards for each index and source, +// and for each ingester, the list of shards it is supposed to host. +// +// (All mutable methods must maintain the two consistent) #[derive(Debug, Default)] pub(crate) struct ShardTable { - pub table_entries: FnvHashMap, + table_entries: FnvHashMap, + ingester_shards: FnvHashMap>>, +} + +// Removes the shards from the ingester_shards map. +// +// This function is used to maintain the shard table invariant. +fn remove_shard_from_ingesters_internal( + source_uid: &SourceUid, + shard: &Shard, + ingester_shards: &mut FnvHashMap>>, +) { + for node in shard.ingester_nodes() { + let ingester_shards = ingester_shards + .get_mut(&node) + .expect("shard table reached inconsistent state"); + let shard_ids = ingester_shards.get_mut(source_uid).unwrap(); + shard_ids.remove(&shard.shard_id); + } } impl ShardTable { /// Removes all the entries that match the target index ID. pub fn delete_index(&mut self, index_id: &str) { + let shards_removed = self + .table_entries + .iter() + .filter(|(source_uid, _)| source_uid.index_uid.index_id() == index_id) + .flat_map(|(source_uid, shard_table_entry)| { + shard_table_entry + .shard_entries + .values() + .map(move |shard_entry: &ShardEntry| (source_uid, &shard_entry.shard)) + }); + for (source_uid, shard) in shards_removed { + remove_shard_from_ingesters_internal(source_uid, shard, &mut self.ingester_shards); + } self.table_entries .retain(|source_uid, _| source_uid.index_uid.index_id() != index_id); + self.check_invariant(); + } + + /// Checks whether the shard table is consistent. + /// + /// Panics if it is not. + fn check_invariant(&self) { + // This function is expensive! Let's not call it in release mode. + if !cfg!(debug_assertions) { + return; + }; + let mut shard_sets_in_shard_table = FnvHashSet::default(); + for (source_uid, shard_table_entry) in &self.table_entries { + for (shard_id, shard_entry) in &shard_table_entry.shard_entries { + debug_assert_eq!(shard_id, &shard_entry.shard.shard_id); + debug_assert_eq!(source_uid.index_uid.as_str(), &shard_entry.shard.index_uid); + for node in shard_entry.shard.ingester_nodes() { + shard_sets_in_shard_table.insert((node, source_uid, shard_id)); + } + } + } + for (node, ingester_shards) in &self.ingester_shards { + for (source_uid, shard_ids) in ingester_shards { + for shard_id in shard_ids { + let shard_table_entry = self.table_entries.get(source_uid).unwrap(); + debug_assert!(shard_table_entry.shard_entries.contains_key(shard_id)); + debug_assert!(shard_sets_in_shard_table.remove(&( + node.clone(), + source_uid, + shard_id + ))); + } + } + } + } + + /// Lists all the shards hosted on a given node, regardless of whether it is a + /// leader or a follower. + pub fn list_shards_for_node( + &self, + ingester: &NodeId, + ) -> Option<&FnvHashMap>> { + self.ingester_shards.get(ingester) + } + + pub fn list_shards_for_index<'a>( + &'a self, + index_uid: &'a IndexUid, + ) -> impl Iterator + 'a { + self.table_entries + .iter() + .filter(move |(source_uid, _)| source_uid.index_uid == *index_uid) + .flat_map(|(_, shard_table_entry)| shard_table_entry.shard_entries.values()) + } + + pub fn num_shards(&self) -> usize { + self.table_entries + .values() + .map(|shard_table_entry| shard_table_entry.shard_entries.len()) + .sum() } /// Adds a new empty entry for the given index and source. @@ -161,6 +256,7 @@ impl ShardTable { ); } } + self.check_invariant(); } pub fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { @@ -168,10 +264,27 @@ impl ShardTable { index_uid: index_uid.clone(), source_id: source_id.clone(), }; - self.table_entries.remove(&source_uid); + let Some(shard_table_entry) = self.table_entries.remove(&source_uid) else { + return; + }; + for shard_entry in shard_table_entry.shard_entries.values() { + remove_shard_from_ingesters_internal( + &source_uid, + &shard_entry.shard, + &mut self.ingester_shards, + ); + } + self.check_invariant(); + } + + #[cfg(test)] + pub(crate) fn all_shards(&self) -> impl Iterator + '_ { + self.table_entries + .values() + .flat_map(|table_entry| table_entry.shard_entries.values()) } - pub fn all_shards_mut(&mut self) -> impl Iterator + '_ { + pub(crate) fn all_shards_mut(&mut self) -> impl Iterator + '_ { self.table_entries .values_mut() .flat_map(|table_entry| table_entry.shard_entries.values_mut()) @@ -202,6 +315,23 @@ impl ShardTable { index_uid: index_uid.clone(), source_id: source_id.clone(), }; + for shard in &opened_shards { + if shard.index_uid != source_uid.index_uid.as_str() + || shard.source_id != source_uid.source_id + { + panic!( + "shard source UID `{}/{}` does not match source UID `{source_uid}`", + shard.index_uid, shard.source_id, + ); + } + } + for shard in &opened_shards { + for node in shard.ingester_nodes() { + let ingester_shards = self.ingester_shards.entry(node).or_default(); + let shard_ids = ingester_shards.entry(source_uid.clone()).or_default(); + shard_ids.insert(shard.shard_id); + } + } match self.table_entries.entry(source_uid) { Entry::Occupied(mut entry) => { let table_entry = entry.get_mut(); @@ -232,6 +362,7 @@ impl ShardTable { entry.insert(table_entry); } } + self.check_invariant(); } /// Finds open shards for a given index and source and whose leaders are not in the set of @@ -296,6 +427,7 @@ impl ShardTable { } else { 0.0 }; + ShardStats { num_open_shards, avg_ingestion_rate, @@ -322,13 +454,45 @@ impl ShardTable { /// Removes the shards identified by their index UID, source ID, and shard IDs. pub fn delete_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) { + let mut shard_entries_to_remove: Vec = Vec::new(); if let Some(table_entry) = self.table_entries.get_mut(source_uid) { for shard_id in shard_ids { - if table_entry.shard_entries.remove(shard_id).is_none() { + if let Some(shard_entry) = table_entry.shard_entries.remove(shard_id) { + shard_entries_to_remove.push(shard_entry); + } else { warn!(shard = *shard_id, "deleting a non-existing shard"); } } } + for shard_entry in shard_entries_to_remove { + remove_shard_from_ingesters_internal( + source_uid, + &shard_entry.shard, + &mut self.ingester_shards, + ); + } + self.check_invariant(); + } + + /// Set the shards for a given source. + /// This function panics if an entry was previously associated to the source uid. + pub(crate) fn initialize_source_shards( + &mut self, + source_uid: SourceUid, + shards: Vec, + next_shard_id: NextShardId, + ) { + for shard in &shards { + for node in shard.ingester_nodes() { + let ingester_shards = self.ingester_shards.entry(node).or_default(); + let shard_ids = ingester_shards.entry(source_uid.clone()).or_default(); + shard_ids.insert(shard.shard_id); + } + } + let table_entry = ShardTableEntry::from_shards(shards, next_shard_id); + let previous_entry = self.table_entries.insert(source_uid, table_entry); + assert!(previous_entry.is_none()); + self.check_invariant(); } pub fn acquire_scaling_permits( @@ -621,21 +785,29 @@ mod tests { let mut shard_table = ShardTable::default(); let shard_01 = Shard { + index_uid: index_uid.to_string(), + source_id: source_id.clone(), shard_id: 1, shard_state: ShardState::Open as i32, ..Default::default() }; let shard_02 = Shard { + index_uid: index_uid.to_string(), + source_id: source_id.clone(), shard_id: 2, shard_state: ShardState::Open as i32, ..Default::default() }; let shard_03 = Shard { + index_uid: index_uid.to_string(), + source_id: source_id.clone(), shard_id: 3, shard_state: ShardState::Unavailable as i32, ..Default::default() }; let shard_04 = Shard { + index_uid: index_uid.to_string(), + source_id: source_id.clone(), shard_id: 4, shard_state: ShardState::Open as i32, ..Default::default() @@ -747,7 +919,7 @@ mod tests { vec![shard_01, shard_02], 3, ); - shard_table.insert_newly_opened_shards(&index_uid_0, &source_id, vec![shard_11], 2); + shard_table.insert_newly_opened_shards(&index_uid_1, &source_id, vec![shard_11], 2); let source_uid_0 = SourceUid { index_uid: index_uid_0, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 9e3e23b2a76..7ed1a201317 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -207,7 +207,8 @@ impl FetchStreamTask { } } if !to_position_inclusive.is_eof() { - error!( + // This can happen if we delete the associated source or index. + warn!( client_id=%self.client_id, index_uid=%self.index_uid, source_id=%self.source_id, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 67c0c746cbf..762cff87cff 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::iter::once; use std::path::Path; @@ -44,8 +44,9 @@ use quickwit_proto::ingest::ingester::{ ObservationMessage, OpenFetchStreamRequest, OpenObservationStreamRequest, OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure, PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, PingRequest, - PingResponse, ReplicateFailureReason, ReplicateSubrequest, SynReplicationMessage, - TruncateShardsRequest, TruncateShardsResponse, + PingResponse, ReplicateFailureReason, ReplicateSubrequest, RetainShardsForSource, + RetainShardsRequest, RetainShardsResponse, SynReplicationMessage, TruncateShardsRequest, + TruncateShardsResponse, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; use quickwit_proto::types::{queue_id, NodeId, Position, QueueId}; @@ -932,6 +933,41 @@ impl IngesterService for Ingester { ) } + async fn retain_shards( + &mut self, + request: RetainShardsRequest, + ) -> IngestV2Result { + let retain_queue_ids: HashSet = request + .retain_shards_for_sources + .into_iter() + .flat_map(|retain_shards_for_source: RetainShardsForSource| { + retain_shards_for_source + .shard_ids + .into_iter() + .map(move |shard_id| { + queue_id( + &retain_shards_for_source.index_uid, + &retain_shards_for_source.source_id, + shard_id, + ) + }) + }) + .collect(); + let mut state_guard = self.state.write().await; + let remove_queue_ids: HashSet = state_guard + .shards + .keys() + .filter(move |shard_id| !retain_queue_ids.contains(*shard_id)) + .map(ToString::to_string) + .collect(); + info!(queues=?remove_queue_ids, "removing queues"); + for queue_id in remove_queue_ids { + state_guard.delete_shard(&queue_id).await; + } + self.check_decommissioning_status(&mut state_guard); + Ok(RetainShardsResponse {}) + } + async fn truncate_shards( &mut self, truncate_shards_request: TruncateShardsRequest, @@ -2209,6 +2245,60 @@ mod tests { assert!(!state_guard.mrecordlog.queue_exists(&queue_id_02)); } + #[tokio::test] + async fn test_ingester_retain_shards() { + let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + + let shard_17 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 17, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + + let shard_18 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 18, + shard_state: ShardState::Closed as i32, + ..Default::default() + }; + let queue_id_17 = queue_id(&shard_17.index_uid, &shard_17.source_id, shard_17.shard_id); + + let mut state_guard = ingester.state.write().await; + ingester + .init_primary_shard(&mut state_guard, shard_17) + .await + .unwrap(); + ingester + .init_primary_shard(&mut state_guard, shard_18) + .await + .unwrap(); + + drop(state_guard); + + { + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 2); + } + + let retain_shard_request = RetainShardsRequest { + retain_shards_for_sources: vec![RetainShardsForSource { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_ids: vec![17u64], + }], + }; + ingester.retain_shards(retain_shard_request).await.unwrap(); + + { + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 1); + assert!(state_guard.shards.contains_key(&queue_id_17)); + } + } + #[tokio::test] async fn test_ingester_close_shards() { let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await; diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 99da1935891..a90962bceae 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -40,6 +40,10 @@ service IngesterService { // Creates and initializes a set of newly opened shards. This RPC is called by the control plane on leaders. rpc InitShards(InitShardsRequest) returns (InitShardsResponse); + // Only retain the shards that are listed in the request. + // Other shards are deleted. + rpc RetainShards(RetainShardsRequest) returns (RetainShardsResponse); + // Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. rpc TruncateShards(TruncateShardsRequest) returns (TruncateShardsResponse); @@ -54,6 +58,19 @@ service IngesterService { } +message RetainShardsForSource { + string index_uid = 1; + string source_id = 2; + repeated uint64 shard_ids = 3; +} + +message RetainShardsRequest { + repeated RetainShardsForSource retain_shards_for_sources = 1; +} + +message RetainShardsResponse { +} + message PersistRequest { string leader_id = 1; quickwit.ingest.CommitTypeV2 commit_type = 3; diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 10de27e48e4..aba68ec4d95 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -1,6 +1,28 @@ #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct RetainShardsForSource { + #[prost(string, tag = "1")] + pub index_uid: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub source_id: ::prost::alloc::string::String, + #[prost(uint64, repeated, tag = "3")] + pub shard_ids: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RetainShardsRequest { + #[prost(message, repeated, tag = "1")] + pub retain_shards_for_sources: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RetainShardsResponse {} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PersistRequest { #[prost(string, tag = "1")] pub leader_id: ::prost::alloc::string::String, @@ -534,6 +556,12 @@ pub trait IngesterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + &mut self, request: InitShardsRequest, ) -> crate::ingest::IngestV2Result; + /// Only retain the shards that are listed in the request. + /// Other shards are deleted. + async fn retain_shards( + &mut self, + request: RetainShardsRequest, + ) -> crate::ingest::IngestV2Result; /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. async fn truncate_shards( &mut self, @@ -664,6 +692,12 @@ impl IngesterService for IngesterServiceClient { ) -> crate::ingest::IngestV2Result { self.inner.init_shards(request).await } + async fn retain_shards( + &mut self, + request: RetainShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.retain_shards(request).await + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -732,6 +766,12 @@ pub mod ingester_service_mock { ) -> crate::ingest::IngestV2Result { self.inner.lock().await.init_shards(request).await } + async fn retain_shards( + &mut self, + request: super::RetainShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.lock().await.retain_shards(request).await + } async fn truncate_shards( &mut self, request: super::TruncateShardsRequest, @@ -853,6 +893,22 @@ impl tower::Service for Box { Box::pin(fut) } } +impl tower::Service for Box { + type Response = RetainShardsResponse; + type Error = crate::ingest::IngestV2Error; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: RetainShardsRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.retain_shards(request).await }; + Box::pin(fut) + } +} impl tower::Service for Box { type Response = TruncateShardsResponse; type Error = crate::ingest::IngestV2Error; @@ -946,6 +1002,11 @@ struct IngesterServiceTowerBlock { InitShardsResponse, crate::ingest::IngestV2Error, >, + retain_shards_svc: quickwit_common::tower::BoxService< + RetainShardsRequest, + RetainShardsResponse, + crate::ingest::IngestV2Error, + >, truncate_shards_svc: quickwit_common::tower::BoxService< TruncateShardsRequest, TruncateShardsResponse, @@ -976,6 +1037,7 @@ impl Clone for IngesterServiceTowerBlock { open_fetch_stream_svc: self.open_fetch_stream_svc.clone(), open_observation_stream_svc: self.open_observation_stream_svc.clone(), init_shards_svc: self.init_shards_svc.clone(), + retain_shards_svc: self.retain_shards_svc.clone(), truncate_shards_svc: self.truncate_shards_svc.clone(), close_shards_svc: self.close_shards_svc.clone(), ping_svc: self.ping_svc.clone(), @@ -1015,6 +1077,12 @@ impl IngesterService for IngesterServiceTowerBlock { ) -> crate::ingest::IngestV2Result { self.init_shards_svc.ready().await?.call(request).await } + async fn retain_shards( + &mut self, + request: RetainShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.retain_shards_svc.ready().await?.call(request).await + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -1088,6 +1156,15 @@ pub struct IngesterServiceTowerBlockBuilder { >, >, #[allow(clippy::type_complexity)] + retain_shards_layer: Option< + quickwit_common::tower::BoxLayer< + Box, + RetainShardsRequest, + RetainShardsResponse, + crate::ingest::IngestV2Error, + >, + >, + #[allow(clippy::type_complexity)] truncate_shards_layer: Option< quickwit_common::tower::BoxLayer< Box, @@ -1162,6 +1239,12 @@ impl IngesterServiceTowerBlockBuilder { Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, + L::Service: tower::Service< + RetainShardsRequest, + Response = RetainShardsResponse, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, L::Service: tower::Service< TruncateShardsRequest, Response = TruncateShardsResponse, @@ -1204,6 +1287,10 @@ impl IngesterServiceTowerBlockBuilder { .init_shards_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); + self + .retain_shards_layer = Some( + quickwit_common::tower::BoxLayer::new(layer.clone()), + ); self .truncate_shards_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), @@ -1294,6 +1381,19 @@ impl IngesterServiceTowerBlockBuilder { self.init_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn retain_shards_layer(mut self, layer: L) -> Self + where + L: tower::Layer> + Send + Sync + 'static, + L::Service: tower::Service< + RetainShardsRequest, + Response = RetainShardsResponse, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.retain_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn truncate_shards_layer(mut self, layer: L) -> Self where L: tower::Layer> + Send + Sync + 'static, @@ -1412,6 +1512,11 @@ impl IngesterServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; + let retain_shards_svc = if let Some(layer) = self.retain_shards_layer { + layer.layer(boxed_instance.clone()) + } else { + quickwit_common::tower::BoxService::new(boxed_instance.clone()) + }; let truncate_shards_svc = if let Some(layer) = self.truncate_shards_layer { layer.layer(boxed_instance.clone()) } else { @@ -1439,6 +1544,7 @@ impl IngesterServiceTowerBlockBuilder { open_fetch_stream_svc, open_observation_stream_svc, init_shards_svc, + retain_shards_svc, truncate_shards_svc, close_shards_svc, ping_svc, @@ -1558,6 +1664,12 @@ where Error = crate::ingest::IngestV2Error, Future = BoxFuture, > + + tower::Service< + RetainShardsRequest, + Response = RetainShardsResponse, + Error = crate::ingest::IngestV2Error, + Future = BoxFuture, + > + tower::Service< TruncateShardsRequest, Response = TruncateShardsResponse, @@ -1613,6 +1725,12 @@ where ) -> crate::ingest::IngestV2Result { self.call(request).await } + async fn retain_shards( + &mut self, + request: RetainShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.call(request).await + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -1734,6 +1852,16 @@ where .map(|response| response.into_inner()) .map_err(|error| error.into()) } + async fn retain_shards( + &mut self, + request: RetainShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner + .retain_shards(request) + .await + .map(|response| response.into_inner()) + .map_err(|error| error.into()) + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -1857,6 +1985,17 @@ for IngesterServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(|error| error.into()) } + async fn retain_shards( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .clone() + .retain_shards(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(|error| error.into()) + } async fn truncate_shards( &self, request: tonic::Request, @@ -2145,6 +2284,38 @@ pub mod ingester_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Only retain the shards that are listed in the request. + /// Other shards are deleted. + pub async fn retain_shards( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.ingest.ingester.IngesterService/RetainShards", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.ingest.ingester.IngesterService", + "RetainShards", + ), + ); + self.inner.unary(req, path, codec).await + } /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. pub async fn truncate_shards( &mut self, @@ -2328,6 +2499,15 @@ pub mod ingester_service_grpc_server { tonic::Response, tonic::Status, >; + /// Only retain the shards that are listed in the request. + /// Other shards are deleted. + async fn retain_shards( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. async fn truncate_shards( &self, @@ -2670,6 +2850,52 @@ pub mod ingester_service_grpc_server { }; Box::pin(fut) } + "/quickwit.ingest.ingester.IngesterService/RetainShards" => { + #[allow(non_camel_case_types)] + struct RetainShardsSvc(pub Arc); + impl< + T: IngesterServiceGrpc, + > tonic::server::UnaryService + for RetainShardsSvc { + type Response = super::RetainShardsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).retain_shards(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = RetainShardsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.ingest.ingester.IngesterService/TruncateShards" => { #[allow(non_camel_case_types)] struct TruncateShardsSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index e82b1051b07..db3ab069627 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -104,6 +104,16 @@ impl ServiceError for IngestV2Error { } } +impl Shard { + /// List of nodes that are storing the shard (the leader, and optionally the follower). + pub fn ingester_nodes(&self) -> impl Iterator + '_ { + [Some(&self.leader_id), self.follower_id.as_ref()] + .into_iter() + .flatten() + .map(|node_id| NodeId::new(node_id.clone())) + } +} + impl DocBatchV2 { pub fn docs(&self) -> impl Iterator + '_ { self.doc_lengths.iter().scan(0, |start_offset, doc_length| {