diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 3f93357d32a..56541ee3d25 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -20,7 +20,7 @@ use std::fmt; use std::time::Duration; -use fnv::FnvHashSet; +use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; use quickwit_common::{PrettySample, Progress}; use quickwit_ingest::IngesterPool; @@ -29,14 +29,14 @@ use quickwit_proto::control_plane::{ GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, }; -use quickwit_proto::ingest::ingester::{IngesterService, PingRequest}; -use quickwit_proto::ingest::{IngestV2Error, ShardIds, ShardState}; +use quickwit_proto::ingest::ingester::{IngesterService, InitShardsRequest, PingRequest}; +use quickwit_proto::ingest::{IngestV2Error, Shard, ShardIds, ShardState}; use quickwit_proto::metastore; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId}; use rand::seq::SliceRandom; use tokio::time::timeout; -use tracing::info; +use tracing::{info, warn}; use crate::control_plane_model::ControlPlaneModel; @@ -302,6 +302,9 @@ impl IngestController { let open_shards_response = progress .protect_future(self.metastore.open_shards(open_shards_request)) .await?; + + self.init_shards(&open_shards_response, progress).await; + for open_shards_subresponse in open_shards_response.subresponses { let index_uid: IndexUid = open_shards_subresponse.index_uid.clone().into(); let source_id = open_shards_subresponse.source_id.clone(); @@ -330,6 +333,38 @@ impl IngestController { failures: get_or_create_open_shards_failures, }) } + + /// Calls init shards on the leaders hosting newly opened shards. + async fn init_shards( + &self, + open_shard_response: &metastore::OpenShardsResponse, + progress: &Progress, + ) { + let mut per_leader_opened_shards: FnvHashMap<&String, Vec> = FnvHashMap::default(); + + for subresponse in &open_shard_response.subresponses { + for shard in &subresponse.opened_shards { + per_leader_opened_shards + .entry(&shard.leader_id) + .or_default() + .push(shard.clone()); + } + } + for (leader_id, shards) in per_leader_opened_shards { + let init_shards_request = InitShardsRequest { shards }; + + let Some(mut leader) = self.ingester_pool.get(leader_id) else { + warn!("failed to init shards: ingester `{leader_id}` is unavailable"); + continue; + }; + if let Err(error) = progress + .protect_future(leader.init_shards(init_shards_request)) + .await + { + warn!("failed to init shards: {error}"); + } + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -345,7 +380,7 @@ mod tests { use quickwit_metastore::IndexMetadata; use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; use quickwit_proto::ingest::ingester::{ - IngesterServiceClient, MockIngesterService, PingResponse, + IngesterServiceClient, InitShardsResponse, MockIngesterService, PingResponse, }; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::types::SourceId; @@ -606,7 +641,19 @@ mod tests { let ingester: IngesterServiceClient = mock_ingester.into(); ingester_pool.insert("test-ingester-1".into(), ingester.clone()); - let mock_ingester = MockIngesterService::default(); + let mut mock_ingester = MockIngesterService::default(); + mock_ingester + .expect_init_shards() + .once() + .returning(|request| { + assert_eq!(request.shards.len(), 1); + assert_eq!(request.shards[0].index_uid, "test-index-1:0"); + assert_eq!(request.shards[0].source_id, "test-source"); + assert_eq!(request.shards[0].shard_id, 1); + assert_eq!(request.shards[0].leader_id, "test-ingester-2"); + + Ok(InitShardsResponse {}) + }); let ingester: IngesterServiceClient = mock_ingester.into(); ingester_pool.insert("test-ingester-2".into(), ingester.clone()); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index badf55a97a3..ebb8ea83ff6 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -37,13 +37,14 @@ use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest, DecommissionResponse, FetchResponseV2, IngesterService, IngesterServiceClient, - IngesterServiceStream, IngesterStatus, ObservationMessage, OpenFetchStreamRequest, - OpenObservationStreamRequest, OpenReplicationStreamRequest, OpenReplicationStreamResponse, - PersistFailure, PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, - PingRequest, PingResponse, ReplicateFailureReason, ReplicateRequest, ReplicateSubrequest, - SynReplicationMessage, TruncateShardsRequest, TruncateShardsResponse, + IngesterServiceStream, IngesterStatus, InitShardsRequest, InitShardsResponse, + ObservationMessage, OpenFetchStreamRequest, OpenObservationStreamRequest, + OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure, + PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, PingRequest, + PingResponse, ReplicateFailureReason, ReplicateSubrequest, SynReplicationMessage, + TruncateShardsRequest, TruncateShardsResponse, }; -use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; +use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; use quickwit_proto::types::{NodeId, Position, QueueId}; use tokio::sync::{watch, RwLock}; use tracing::{error, info, warn}; @@ -55,8 +56,8 @@ use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capac use super::rate_limiter::{RateLimiter, RateLimiterSettings}; use super::rate_meter::RateMeter; use super::replication::{ - ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, ReplicationTaskHandle, - SYN_REPLICATION_STREAM_CAPACITY, + ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, + ReplicationTaskHandle, SYN_REPLICATION_STREAM_CAPACITY, }; use super::IngesterPool; use crate::ingest_v2::broadcast::BroadcastLocalShardsTask; @@ -218,14 +219,23 @@ impl Ingester { Ok(()) } - async fn create_shard<'a>( + /// Initializes a primary shard by creating a queue in the write-ahead log and inserting a new + /// [`IngesterShard`] into the ingester state. If replication is enabled, this method will + /// also: + /// - open a replication stream between the leader and the follower if one does not already + /// exist. + /// - initialize the replica shard. + async fn init_primary_shard( &self, - state: &'a mut IngesterState, - queue_id: &QueueId, - leader_id: &NodeId, - follower_id_opt: Option<&NodeId>, - ) -> IngestV2Result<&'a IngesterShard> { - match state.mrecordlog.create_queue(queue_id).await { + state: &mut IngesterState, + shard: Shard, + ) -> IngestV2Result<()> { + let queue_id = shard.queue_id(); + + let Entry::Vacant(entry) = state.shards.entry(queue_id.clone()) else { + return Ok(()); + }; + match state.mrecordlog.create_queue(&queue_id).await { Ok(_) => {} Err(CreateQueueError::AlreadyExists) => panic!("queue should not exist"), Err(CreateQueueError::IoError(io_error)) => { @@ -235,7 +245,7 @@ impl Ingester { queue_id, io_error ); return Err(IngestV2Error::IngesterUnavailable { - ingester_id: leader_id.clone(), + ingester_id: shard.leader_id.into(), }); } }; @@ -243,14 +253,28 @@ impl Ingester { let rate_meter = RateMeter::default(); state .rate_trackers - .insert(queue_id.clone(), (rate_limiter, rate_meter)); + .insert(queue_id, (rate_limiter, rate_meter)); + + let primary_shard = if let Some(follower_id) = &shard.follower_id { + let leader_id: NodeId = shard.leader_id.clone().into(); + let follower_id: NodeId = follower_id.clone().into(); - let shard = if let Some(follower_id) = follower_id_opt { - self.init_replication_stream(state, leader_id, follower_id) + let replication_client = self + .init_replication_stream( + &mut state.replication_streams, + leader_id, + follower_id.clone(), + ) .await?; + if let Err(error) = replication_client.init_replica(shard).await { + error!("failed to initialize replica shard: {error}",); + return Err(IngestV2Error::Internal(format!( + "failed to initialize replica shard: {error}" + ))); + } IngesterShard::new_primary( - follower_id.clone(), + follower_id, ShardState::Open, Position::Beginning, Position::Beginning, @@ -258,35 +282,27 @@ impl Ingester { } else { IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning) }; - let entry = state.shards.entry(queue_id.clone()); - Ok(entry.or_insert(shard)) - } - - async fn close_shards_inner(&self, state: &mut IngesterState, queue_ids: &[QueueId]) { - for queue_id in queue_ids { - append_eof_record_if_necessary(&mut state.mrecordlog, queue_id).await; - - if let Some(shard) = state.shards.get_mut(queue_id) { - shard.shard_state = ShardState::Closed; - shard.notify_new_records(); - } - } - // TODO: Handle replicated shards. + entry.insert(primary_shard); + Ok(()) } async fn init_replication_stream( &self, - state: &mut IngesterState, - leader_id: &NodeId, - follower_id: &NodeId, - ) -> IngestV2Result<()> { - let Entry::Vacant(entry) = state.replication_streams.entry(follower_id.clone()) else { - // A replication stream with this follower is already opened. - return Ok(()); + replication_streams: &mut HashMap, + leader_id: NodeId, + follower_id: NodeId, + ) -> IngestV2Result { + let entry = match replication_streams.entry(follower_id.clone()) { + Entry::Occupied(entry) => { + // A replication stream with this follower is already opened. + return Ok(entry.get().replication_client()); + } + Entry::Vacant(entry) => entry, }; let open_request = OpenReplicationStreamRequest { leader_id: leader_id.clone().into(), follower_id: follower_id.clone().into(), + replication_seqno: 0, }; let open_message = SynReplicationMessage::new_open_request(open_request); let (syn_replication_stream_tx, syn_replication_stream) = @@ -297,7 +313,7 @@ impl Ingester { let mut ingester = self.ingester_pool - .get(follower_id) + .get(&follower_id) .ok_or(IngestV2Error::IngesterUnavailable { ingester_id: follower_id.clone(), })?; @@ -318,8 +334,21 @@ impl Ingester { syn_replication_stream_tx, ack_replication_stream, ); + let replication_client = replication_stream_task_handle.replication_client(); entry.insert(replication_stream_task_handle); - Ok(()) + Ok(replication_client) + } + + async fn close_shards_inner(&self, state: &mut IngesterState, queue_ids: &[QueueId]) { + for queue_id in queue_ids { + append_eof_record_if_necessary(&mut state.mrecordlog, queue_id).await; + + if let Some(shard) = state.shards.get_mut(queue_id) { + shard.shard_state = ShardState::Closed; + shard.notify_new_records(); + } + } + // TODO: Handle replicated shards. } } @@ -367,21 +396,18 @@ impl IngesterService for Ingester { } for subrequest in persist_request.subrequests { let queue_id = subrequest.queue_id(); - let follower_id_opt: Option = subrequest.follower_id.map(Into::into); - let shard = if let Some(shard) = state_guard.shards.get_mut(&queue_id) { - shard - } else { - self.create_shard( - &mut state_guard, - &queue_id, - &leader_id, - follower_id_opt.as_ref(), - ) - .await - .expect("TODO") - }; - let from_position_exclusive = shard.replication_position_inclusive.clone(); + let Some(shard) = state_guard.shards.get_mut(&queue_id) else { + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: PersistFailureReason::ShardNotFound as i32, + }; + persist_failures.push(persist_failure); + continue; + }; if shard.shard_state.is_closed() { let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, @@ -393,6 +419,9 @@ impl IngesterService for Ingester { persist_failures.push(persist_failure); continue; } + let follower_id_opt = shard.follower_id_opt().cloned(); + let from_position_exclusive = shard.replication_position_inclusive.clone(); + let doc_batch = match subrequest.doc_batch { Some(doc_batch) if !doc_batch.is_empty() => doc_batch, _ => { @@ -521,19 +550,15 @@ impl IngesterService for Ingester { let mut replicate_futures = FuturesUnordered::new(); for (follower_id, subrequests) in replicate_subrequests { - let replication_stream = state_guard + let replication_client = state_guard .replication_streams .get(&follower_id) - .expect("replication stream should be initialized"); - let replication_seqno = replication_stream.next_replication_seqno(); - let replicate_request = ReplicateRequest { - leader_id: self.self_node_id.clone().into(), - follower_id: follower_id.clone().into(), - subrequests, - commit_type: persist_request.commit_type, - replication_seqno, - }; - replicate_futures.push(replication_stream.replicate(replicate_request)); + .expect("replication stream should be initialized") + .replication_client(); + let leader_id = self.self_node_id.clone(); + let replicate_future = + replication_client.replicate(leader_id, follower_id, subrequests, commit_type); + replicate_futures.push(replicate_future); } // Drop the write lock AFTER pushing the replicate request into the replication client // channel to ensure that sequential writes in mrecordlog turn into sequential replicate @@ -567,6 +592,7 @@ impl IngesterService for Ingester { // already. let persist_failure_reason = match replicate_failure.reason() { ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, + ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound, ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, ReplicateFailureReason::ResourceExhausted => { PersistFailureReason::ResourceExhausted @@ -622,7 +648,9 @@ impl IngesterService for Ingester { // Channel capacity: there is no need to bound the capacity of the channel here because it // is already virtually bounded by the capacity of the SYN replication stream. let (ack_replication_stream_tx, ack_replication_stream) = ServiceStream::new_unbounded(); - let open_response = OpenReplicationStreamResponse {}; + let open_response = OpenReplicationStreamResponse { + replication_seqno: 0, + }; let ack_replication_message = AckReplicationMessage::new_open_response(open_response); ack_replication_stream_tx .send(Ok(ack_replication_message)) @@ -689,21 +717,20 @@ impl IngesterService for Ingester { Ok(ping_response) } - async fn close_shards( + async fn init_shards( &mut self, - close_shards_request: CloseShardsRequest, - ) -> IngestV2Result { + init_shards_request: InitShardsRequest, + ) -> IngestV2Result { let mut state_guard = self.state.write().await; - let queue_ids: Vec = close_shards_request - .shards - .iter() - .flat_map(|shards| shards.queue_ids()) - .collect(); - - self.close_shards_inner(&mut state_guard, &queue_ids).await; + if state_guard.status != IngesterStatus::Ready { + return Err(IngestV2Error::Internal("node decommissioned".to_string())); + } - Ok(CloseShardsResponse {}) + for shard in init_shards_request.shards { + self.init_primary_shard(&mut state_guard, shard).await?; + } + Ok(InitShardsResponse {}) } async fn truncate_shards( @@ -756,6 +783,23 @@ impl IngesterService for Ingester { Ok(truncate_response) } + async fn close_shards( + &mut self, + close_shards_request: CloseShardsRequest, + ) -> IngestV2Result { + let queue_ids: Vec = close_shards_request + .shards + .iter() + .flat_map(|shards| shards.queue_ids()) + .collect(); + + let mut state_guard = self.state.write().await; + + self.close_shards_inner(&mut state_guard, &queue_ids).await; + + Ok(CloseShardsResponse {}) + } + async fn decommission( &mut self, _decommission_request: DecommissionRequest, @@ -1114,6 +1158,28 @@ mod tests { async fn test_ingester_persist() { let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + let init_shards_request = InitShardsRequest { + shards: vec![ + Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + ..Default::default() + }, + Shard { + index_uid: "test-index:1".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + ..Default::default() + }, + ], + }; + ingester.init_shards(init_shards_request).await.unwrap(); + let persist_request = PersistRequest { leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Force as i32, @@ -1204,6 +1270,7 @@ mod tests { let open_stream_request = OpenReplicationStreamRequest { leader_id: "test-leader".to_string(), follower_id: "test-follower".to_string(), + replication_seqno: 0, }; let syn_replication_message = SynReplicationMessage::new_open_request(open_stream_request); syn_replication_stream_tx @@ -1246,6 +1313,30 @@ mod tests { IngesterServiceClient::new(follower.clone()), ); + let init_shards_request = InitShardsRequest { + shards: vec![ + Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: leader_ctx.node_id.to_string(), + follower_id: Some(follower_ctx.node_id.to_string()), + ..Default::default() + }, + Shard { + index_uid: "test-index:1".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: leader_ctx.node_id.to_string(), + follower_id: Some(follower_ctx.node_id.to_string()), + ..Default::default() + }, + ], + }; + leader.init_shards(init_shards_request).await.unwrap(); + let persist_request = PersistRequest { leader_id: "test-leader".to_string(), commit_type: CommitTypeV2::Force as i32, @@ -1406,6 +1497,30 @@ mod tests { .ingester_pool .insert(follower_ctx.node_id.clone(), follower_grpc_client); + let init_shards_request = InitShardsRequest { + shards: vec![ + Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: leader_ctx.node_id.to_string(), + follower_id: Some(follower_ctx.node_id.to_string()), + ..Default::default() + }, + Shard { + index_uid: "test-index:1".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: leader_ctx.node_id.to_string(), + follower_id: Some(follower_ctx.node_id.to_string()), + ..Default::default() + }, + ], + }; + leader.init_shards(init_shards_request).await.unwrap(); + let persist_request = PersistRequest { leader_id: "test-leader".to_string(), commit_type: CommitTypeV2::Auto as i32, @@ -1563,6 +1678,23 @@ mod tests { .build() .await; + let mut state_guard = ingester.state.write().await; + + let primary_shard = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + ..Default::default() + }; + ingester + .init_primary_shard(&mut state_guard, primary_shard) + .await + .unwrap(); + + drop(state_guard); + let persist_request = PersistRequest { leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, @@ -1608,6 +1740,24 @@ mod tests { .with_disk_capacity(ByteSize(0)) .build() .await; + + let mut state_guard = ingester.state.write().await; + + let primary_shard = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + ..Default::default() + }; + ingester + .init_primary_shard(&mut state_guard, primary_shard) + .await + .unwrap(); + + drop(state_guard); + let persist_request = PersistRequest { leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, @@ -1653,6 +1803,28 @@ mod tests { async fn test_ingester_open_fetch_stream() { let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + let init_shards_request = InitShardsRequest { + shards: vec![ + Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + ..Default::default() + }, + Shard { + index_uid: "test-index:1".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + ..Default::default() + }, + ], + }; + ingester.init_shards(init_shards_request).await.unwrap(); + let persist_request = PersistRequest { leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Auto as i32, @@ -1738,16 +1910,32 @@ mod tests { async fn test_ingester_truncate() { let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + let shard_01 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + ..Default::default() + }; let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + let shard_02 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 2, + shard_state: ShardState::Open as i32, + ..Default::default() + }; let queue_id_02 = queue_id("test-index:0", "test-source", 2); let mut state_guard = ingester.state.write().await; + ingester - .create_shard(&mut state_guard, &queue_id_01, &ingester_ctx.node_id, None) + .init_primary_shard(&mut state_guard, shard_01) .await .unwrap(); ingester - .create_shard(&mut state_guard, &queue_id_02, &ingester_ctx.node_id, None) + .init_primary_shard(&mut state_guard, shard_02) .await .unwrap(); @@ -1817,16 +2005,44 @@ mod tests { #[tokio::test] async fn test_ingester_close_shards() { - let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + + let shard_01 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + let shard_02 = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 2, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let queue_id_02 = queue_id("test-index:0", "test-source", 2); - let queue_id_01 = queue_id("test-index:0", "test-source:0", 1); - let queue_id_02 = queue_id("test-index:0", "test-source:0", 2); - let queue_id_03 = queue_id("test-index:1", "test-source:1", 3); + let shard_13 = Shard { + index_uid: "test-index:1".to_string(), + source_id: "test-source".to_string(), + shard_id: 3, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let queue_id_13 = queue_id("test-index:1", "test-source", 3); let mut state_guard = ingester.state.write().await; - for queue_id in &[&queue_id_01, &queue_id_02, &queue_id_03] { + + for (shard, queue_id) in [ + (shard_01, &queue_id_01), + (shard_02, &queue_id_02), + (shard_13, &queue_id_13), + ] { ingester - .create_shard(&mut state_guard, queue_id, &ingester_ctx.node_id, None) + .init_primary_shard(&mut state_guard, shard) .await .unwrap(); let records = [ @@ -1836,7 +2052,7 @@ mod tests { .into_iter(); state_guard .mrecordlog - .append_records(&queue_id_01, None, records) + .append_records(queue_id, None, records) .await .unwrap(); } @@ -1847,7 +2063,7 @@ mod tests { let open_fetch_stream_request = OpenFetchStreamRequest { client_id: client_id.clone(), index_uid: "test-index:0".to_string(), - source_id: "test-source:0".to_string(), + source_id: "test-source".to_string(), shard_id: 1, from_position_exclusive: None, }; @@ -1863,17 +2079,17 @@ mod tests { let close_shard_1 = ShardIds { index_uid: "test-index:0".to_string(), - source_id: "test-source:0".to_string(), + source_id: "test-source".to_string(), shard_ids: vec![1, 2], }; let close_shard_2 = ShardIds { index_uid: "test-index:1".to_string(), - source_id: "test-source:1".to_string(), + source_id: "test-source".to_string(), shard_ids: vec![3], }; let close_shard_with_no_queue = ShardIds { index_uid: "test-index:2".to_string(), - source_id: "test-source:2".to_string(), + source_id: "test-source".to_string(), shard_ids: vec![4], }; let closed_shards = vec![ diff --git a/quickwit/quickwit-ingest/src/ingest_v2/models.rs b/quickwit/quickwit-ingest/src/ingest_v2/models.rs index 578a8470b69..9656b387f66 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/models.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/models.rs @@ -98,6 +98,14 @@ impl IngesterShard { matches!(self.shard_type, IngesterShardType::Replica { .. }) } + pub fn follower_id_opt(&self) -> Option<&NodeId> { + match &self.shard_type { + IngesterShardType::Primary { follower_id } => Some(follower_id), + IngesterShardType::Replica { .. } => None, + IngesterShardType::Solo => None, + } + } + pub fn notify_new_records(&mut self) { // `new_records_tx` is guaranteed to be open because `self` also holds a receiver. self.new_records_tx diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 50755fcab01..38b32ff746f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -27,10 +27,11 @@ use futures::{Future, StreamExt}; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus, - ReplicateFailure, ReplicateFailureReason, ReplicateRequest, ReplicateResponse, - ReplicateSuccess, SynReplicationMessage, + InitReplicaRequest, InitReplicaResponse, ReplicateFailure, ReplicateFailureReason, + ReplicateRequest, ReplicateResponse, ReplicateSubrequest, ReplicateSuccess, + SynReplicationMessage, }; -use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; +use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; use quickwit_proto::types::{NodeId, Position}; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, oneshot, RwLock}; @@ -42,7 +43,6 @@ use super::models::IngesterShard; use super::mrecord::MRecord; use super::mrecordlog_utils::check_enough_capacity; use crate::estimate_size; -use crate::ingest_v2::models::IngesterShardType; use crate::metrics::INGEST_METRICS; pub(super) const SYN_REPLICATION_STREAM_CAPACITY: usize = 5; @@ -54,7 +54,56 @@ const REPLICATION_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "tests Duration::from_secs(3) }; -type OneShotReplicateRequest = (ReplicateRequest, oneshot::Sender); +/// A replication request is sent by the leader to its follower to update the state of a replica +/// shard. +#[derive(Debug)] +pub(super) enum ReplicationRequest { + Init(InitReplicaRequest), + Replicate(ReplicateRequest), +} + +impl ReplicationRequest { + fn replication_seqno(&self) -> ReplicationSeqNo { + match self { + ReplicationRequest::Init(init_replica_request) => { + init_replica_request.replication_seqno + } + ReplicationRequest::Replicate(replicate_request) => replicate_request.replication_seqno, + } + } + + fn into_syn_replication_message(self) -> SynReplicationMessage { + match self { + ReplicationRequest::Init(init_replica_request) => { + SynReplicationMessage::new_init_replica_request(init_replica_request) + } + ReplicationRequest::Replicate(replicate_request) => { + SynReplicationMessage::new_replicate_request(replicate_request) + } + } + } +} + +#[derive(Debug)] +pub(super) enum ReplicationResponse { + Init(InitReplicaResponse), + Replicate(ReplicateResponse), +} + +impl ReplicationResponse { + fn replication_seqno(&self) -> ReplicationSeqNo { + match self { + ReplicationResponse::Init(init_replica_response) => { + init_replica_response.replication_seqno + } + ReplicationResponse::Replicate(replicate_response) => { + replicate_response.replication_seqno + } + } + } +} + +type OneShotReplicationRequest = (ReplicationRequest, oneshot::Sender); /// Replication sequence number. type ReplicationSeqNo = u64; @@ -63,7 +112,7 @@ type ReplicationSeqNo = u64; pub(super) struct ReplicationStreamTask { leader_id: NodeId, follower_id: NodeId, - replicate_request_queue_rx: mpsc::Receiver, + replication_request_rx: mpsc::Receiver, syn_replication_stream_tx: mpsc::Sender, ack_replication_stream: ServiceStream>, } @@ -76,13 +125,13 @@ impl ReplicationStreamTask { syn_replication_stream_tx: mpsc::Sender, ack_replication_stream: ServiceStream>, ) -> ReplicationStreamTaskHandle { - let (replicate_request_queue_tx, replicate_request_queue_rx) = - mpsc::channel::(3); + let (replication_request_tx, replication_request_rx) = + mpsc::channel::(3); let replication_stream_task = Self { leader_id, follower_id, - replicate_request_queue_rx, + replication_request_rx, syn_replication_stream_tx, ack_replication_stream, }; @@ -90,8 +139,8 @@ impl ReplicationStreamTask { replication_stream_task.run(); ReplicationStreamTaskHandle { - replicate_request_queue_tx, - replication_seqno_sequence: AtomicU64::default(), + replication_request_tx, + replication_seqno_sequence: Default::default(), enqueue_syn_requests_join_handle, dequeue_ack_responses_join_handle, } @@ -103,9 +152,6 @@ impl ReplicationStreamTask { /// processed and returned in the same order. Conceptually, it is akin to "zipping" the SYN and /// ACK replication streams together. fn run(mut self) -> (JoinHandle<()>, JoinHandle<()>) { - let leader_id = self.leader_id.clone(); - let follower_id = self.follower_id.clone(); - // Response sequencer channel. It ensures that requests and responses are processed and // returned in the same order. // @@ -117,31 +163,21 @@ impl ReplicationStreamTask { // This loop enqueues SYN replication requests into the SYN replication stream and passes // the one-shot response sender to the "dequeue" loop via the sequencer channel. let enqueue_syn_requests_fut = async move { - while let Some((replicate_request, oneshot_replicate_response_tx)) = - self.replicate_request_queue_rx.recv().await + while let Some((replication_request, oneshot_replication_response_tx)) = + self.replication_request_rx.recv().await { - assert_eq!( - replicate_request.leader_id, leader_id, - "expected leader ID `{}`, got `{}`", - leader_id, replicate_request.leader_id - ); - assert_eq!( - replicate_request.follower_id, follower_id, - "expected follower ID `{}`, got `{}`", - follower_id, replicate_request.follower_id - ); if response_sequencer_tx .send(( - replicate_request.replication_seqno, - oneshot_replicate_response_tx, + replication_request.replication_seqno(), + oneshot_replication_response_tx, )) .is_err() { // The response sequencer receiver was dropped. return; } - let syn_replication_message = - SynReplicationMessage::new_replicate_request(replicate_request); + let syn_replication_message = replication_request.into_syn_replication_message(); + if self .syn_replication_stream_tx .send(syn_replication_message) @@ -164,26 +200,40 @@ impl ReplicationStreamTask { return; } }; - let replicate_response = into_replicate_response(ack_replication_message); - - let oneshot_replicate_response_tx = match response_sequencer_rx.try_recv() { - Ok((replication_seqno, oneshot_replicate_response_tx)) => { - if replicate_response.replication_seqno != replication_seqno { + let replication_response = match ack_replication_message.message { + Some(ack_replication_message::Message::InitResponse(init_replica_response)) => { + ReplicationResponse::Init(init_replica_response) + } + Some(ack_replication_message::Message::ReplicateResponse( + replicate_response, + )) => ReplicationResponse::Replicate(replicate_response), + Some(ack_replication_message::Message::OpenResponse(_)) => { + warn!("received unexpected ACK replication message"); + continue; + } + None => { + warn!("received empty ACK replication message"); + continue; + } + }; + let oneshot_replication_response_tx = match response_sequencer_rx.try_recv() { + Ok((replication_seqno, oneshot_replication_response_tx)) => { + if replication_response.replication_seqno() != replication_seqno { error!( "received out-of-order replication response: expected replication \ seqno `{}`, got `{}`; closing replication stream from leader \ `{}` to follower `{}`", replication_seqno, - replicate_response.replication_seqno, + replication_response.replication_seqno(), self.leader_id, self.follower_id, ); return; } - oneshot_replicate_response_tx + oneshot_replication_response_tx } Err(TryRecvError::Empty) => { - panic!("the response sequencer should not be empty"); + panic!("response sequencer should not be empty"); } Err(TryRecvError::Disconnected) => { // The response sequencer sender was dropped. @@ -192,7 +242,7 @@ impl ReplicationStreamTask { }; // We intentionally ignore the error here. It is the responsibility of the // `replicate` method to surface it. - let _ = oneshot_replicate_response_tx.send(replicate_response); + let _ = oneshot_replication_response_tx.send(replication_response); } // The ACK replication stream was closed. }; @@ -205,42 +255,24 @@ impl ReplicationStreamTask { pub(super) struct ReplicationStreamTaskHandle { replication_seqno_sequence: AtomicU64, - replicate_request_queue_tx: mpsc::Sender, + replication_request_tx: mpsc::Sender, enqueue_syn_requests_join_handle: JoinHandle<()>, dequeue_ack_responses_join_handle: JoinHandle<()>, } impl ReplicationStreamTaskHandle { - /// Enqueues a replication request into the replication stream and waits for the response. Times - /// out after [`REPLICATION_REQUEST_TIMEOUT`] seconds. - pub fn replicate( - &self, - replicate_request: ReplicateRequest, - ) -> impl Future> + Send + 'static { - let (oneshot_replicate_response_tx, oneshot_replicate_response_rx) = oneshot::channel(); - let replicate_request_queue_tx = self.replicate_request_queue_tx.clone(); - - let send_recv_fut = async move { - replicate_request_queue_tx - .send((replicate_request, oneshot_replicate_response_tx)) - .await - .map_err(|_| ReplicationError::Closed)?; - let replicate_response = oneshot_replicate_response_rx - .await - .map_err(|_| ReplicationError::Closed)?; - Ok(replicate_response) - }; - async { - tokio::time::timeout(REPLICATION_REQUEST_TIMEOUT, send_recv_fut) - .await - .map_err(|_| ReplicationError::Timeout)? + /// Returns a [`ReplicationClient`] that can be used to enqueue replication requests + /// into the replication stream. + pub fn replication_client(&self) -> ReplicationClient { + let replication_seqno = self + .replication_seqno_sequence + .fetch_add(1, Ordering::Relaxed); + + ReplicationClient { + replication_seqno, + replication_request_tx: self.replication_request_tx.clone(), } } - - pub fn next_replication_seqno(&self) -> ReplicationSeqNo { - self.replication_seqno_sequence - .fetch_add(1, Ordering::Relaxed) - } } impl Drop for ReplicationStreamTaskHandle { @@ -250,7 +282,7 @@ impl Drop for ReplicationStreamTaskHandle { } } -/// Error returned by [`ReplicationClient::replicate`]. +/// Error returned by the [`ReplicationClient`]. #[derive(Debug, Clone, Copy, thiserror::Error)] #[error("failed to replicate records from leader to follower")] pub(super) enum ReplicationError { @@ -262,6 +294,100 @@ pub(super) enum ReplicationError { Timeout, } +// DO NOT derive or implement `Clone` for this object. +#[derive(Debug)] +pub(super) struct ReplicationClient { + replication_seqno: u64, + replication_request_tx: mpsc::Sender, +} + +/// Single-use client that enqueues replication requests into the replication stream. +/// +/// The `init_replica`, `replicate`, and `submit` methods take `self` instead of `&self` +/// to produce 'static futures and enforce single-use semantics. +impl ReplicationClient { + /// Enqueues an init replica request into the replication stream and waits for the response. + /// Times out after [`REPLICATION_REQUEST_TIMEOUT`] seconds. + pub fn init_replica( + self, + replica_shard: Shard, + ) -> impl Future> + Send + 'static { + let init_replica_request = InitReplicaRequest { + replica_shard: Some(replica_shard), + replication_seqno: self.replication_seqno, + }; + let replication_request = ReplicationRequest::Init(init_replica_request); + + async { + self.submit(replication_request) + .await + .map(|replication_response| { + if let ReplicationResponse::Init(init_replica_response) = replication_response { + init_replica_response + } else { + panic!("response should be an init replica response") + } + }) + } + } + + /// Enqueues a replicate request into the replication stream and waits for the response. Times + /// out after [`REPLICATION_REQUEST_TIMEOUT`] seconds. + pub fn replicate( + self, + leader_id: NodeId, + follower_id: NodeId, + subrequests: Vec, + commit_type: CommitTypeV2, + ) -> impl Future> + Send + 'static { + let replicate_request = ReplicateRequest { + leader_id: leader_id.into(), + follower_id: follower_id.into(), + subrequests, + commit_type: commit_type as i32, + replication_seqno: self.replication_seqno, + }; + let replication_request = ReplicationRequest::Replicate(replicate_request); + + async { + self.submit(replication_request) + .await + .map(|replication_response| { + if let ReplicationResponse::Replicate(replicate_response) = replication_response + { + replicate_response + } else { + panic!("response should be a replicate response") + } + }) + } + } + + /// Submits a replication request to the replication stream and waits for the response. + fn submit( + self, + replication_request: ReplicationRequest, + ) -> impl Future> + Send + 'static { + let (oneshot_replication_response_tx, oneshot_replication_response_rx) = oneshot::channel(); + + let send_recv_fut = async move { + self.replication_request_tx + .send((replication_request, oneshot_replication_response_tx)) + .await + .map_err(|_| ReplicationError::Closed)?; + let replicate_response = oneshot_replication_response_rx + .await + .map_err(|_| ReplicationError::Closed)?; + Ok(replicate_response) + }; + async { + tokio::time::timeout(REPLICATION_REQUEST_TIMEOUT, send_recv_fut) + .await + .map_err(|_| ReplicationError::Timeout)? + } + } +} + /// Replication task executed for each replication stream. pub(super) struct ReplicationTask { leader_id: NodeId, @@ -298,6 +424,50 @@ impl ReplicationTask { ReplicationTaskHandle { join_handle } } + async fn init_replica( + &mut self, + init_replica_request: InitReplicaRequest, + ) -> IngestV2Result { + if init_replica_request.replication_seqno != self.current_replication_seqno { + return Err(IngestV2Error::Internal(format!( + "received out-of-order replication request: expected replication seqno `{}`, got \ + `{}`", + self.current_replication_seqno, init_replica_request.replication_seqno + ))); + } + self.current_replication_seqno += 1; + + let Some(replica_shard) = init_replica_request.replica_shard else { + warn!("received empty init replica request"); + + return Err(IngestV2Error::Internal( + "init replica request is empty".to_string(), + )); + }; + let queue_id = replica_shard.queue_id(); + + let mut state_guard = self.state.write().await; + + state_guard + .mrecordlog + .create_queue(&queue_id) + .await + .expect("TODO: Handle IO error"); + + let replica_shard = IngesterShard::new_replica( + replica_shard.leader_id.into(), + ShardState::Open, + Position::Beginning, + Position::Beginning, + ); + state_guard.shards.insert(queue_id, replica_shard); + + let init_replica_response = InitReplicaResponse { + replication_seqno: init_replica_request.replication_seqno, + }; + Ok(init_replica_response) + } + async fn replicate( &mut self, replicate_request: ReplicateRequest, @@ -357,32 +527,20 @@ impl ReplicationTask { let from_position_exclusive = subrequest.from_position_exclusive(); let to_position_inclusive = subrequest.to_position_inclusive(); - let replica_shard: &IngesterShard = if from_position_exclusive == Position::Beginning { - // Initialize the replica shard and corresponding mrecordlog queue. - state_guard - .mrecordlog - .create_queue(&queue_id) - .await - .expect("TODO"); - let leader_id: NodeId = replicate_request.leader_id.clone().into(); - let shard = IngesterShard::new_replica( - leader_id, - ShardState::Open, - Position::Beginning, - Position::Beginning, - ); - state_guard.shards.entry(queue_id.clone()).or_insert(shard) - } else { - state_guard - .shards - .get(&queue_id) - .expect("replica shard should be initialized") + let Some(shard) = state_guard.shards.get(&queue_id) else { + let replicate_failure = ReplicateFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: ReplicateFailureReason::ShardNotFound as i32, + }; + replicate_failures.push(replicate_failure); + continue; }; - assert!(matches!( - replica_shard.shard_type, - IngesterShardType::Replica { .. } - )); - if replica_shard.shard_state.is_closed() { + assert!(shard.is_replica()); + + if shard.shard_state.is_closed() { let replicate_failure = ReplicateFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -393,7 +551,7 @@ impl ReplicationTask { replicate_failures.push(replicate_failure); continue; } - if replica_shard.replication_position_inclusive != from_position_exclusive { + if shard.replication_position_inclusive != from_position_exclusive { // TODO } let doc_batch = match subrequest.doc_batch { @@ -407,7 +565,7 @@ impl ReplicationTask { source_id: subrequest.source_id, shard_id: subrequest.shard_id, replication_position_inclusive: Some( - replica_shard.replication_position_inclusive.clone(), + shard.replication_position_inclusive.clone(), ), }; replicate_successes.push(replicate_success); @@ -497,11 +655,23 @@ impl ReplicationTask { async fn run(&mut self) -> IngestV2Result<()> { while let Some(syn_replication_message) = self.syn_replication_stream.next().await { - let replicate_request = into_replicate_request(syn_replication_message); - let ack_replication_message = self - .replicate(replicate_request) - .await - .map(AckReplicationMessage::new_replicate_response); + let ack_replication_message = match syn_replication_message.message { + Some(syn_replication_message::Message::OpenRequest(_)) => { + panic!("TODO: this should not happen, internal error"); + } + Some(syn_replication_message::Message::InitRequest(init_replica_request)) => self + .init_replica(init_replica_request) + .await + .map(AckReplicationMessage::new_init_replica_response), + Some(syn_replication_message::Message::ReplicateRequest(replicate_request)) => self + .replicate(replicate_request) + .await + .map(AckReplicationMessage::new_replicate_response), + None => { + warn!("received empty SYN replication message"); + continue; + } + }; if self .ack_replication_stream_tx .send(ack_replication_message) @@ -524,24 +694,6 @@ impl Drop for ReplicationTaskHandle { } } -fn into_replicate_request(syn_replication_message: SynReplicationMessage) -> ReplicateRequest { - if let Some(syn_replication_message::Message::ReplicateRequest(replicate_request)) = - syn_replication_message.message - { - return replicate_request; - }; - panic!("SYN replication message should be a replicate request") -} - -fn into_replicate_response(ack_replication_message: AckReplicationMessage) -> ReplicateResponse { - if let Some(ack_replication_message::Message::ReplicateResponse(replicate_response)) = - ack_replication_message.message - { - return replicate_response; - }; - panic!("ACK replication message should be a replicate response") -} - #[cfg(test)] mod tests { use std::collections::HashMap; @@ -550,15 +702,69 @@ mod tests { use quickwit_proto::ingest::ingester::{ ObservationMessage, ReplicateSubrequest, ReplicateSuccess, }; - use quickwit_proto::ingest::DocBatchV2; + use quickwit_proto::ingest::{DocBatchV2, Shard}; use quickwit_proto::types::queue_id; use tokio::sync::watch; use super::*; use crate::ingest_v2::test_utils::MultiRecordLogTestExt; + fn into_init_replica_request( + syn_replication_message: SynReplicationMessage, + ) -> InitReplicaRequest { + let Some(syn_replication_message::Message::InitRequest(init_replica_request)) = + syn_replication_message.message + else { + panic!( + "expected init replica SYN message, got `{:?}`", + syn_replication_message.message + ); + }; + init_replica_request + } + + fn into_replicate_request(syn_replication_message: SynReplicationMessage) -> ReplicateRequest { + let Some(syn_replication_message::Message::ReplicateRequest(replicate_request)) = + syn_replication_message.message + else { + panic!( + "expected replicate SYN message, got `{:?}`", + syn_replication_message.message + ); + }; + replicate_request + } + + fn into_init_replica_response( + ack_replication_message: AckReplicationMessage, + ) -> InitReplicaResponse { + let Some(ack_replication_message::Message::InitResponse(init_replica_response)) = + ack_replication_message.message + else { + panic!( + "expected init replica ACK message, got `{:?}`", + ack_replication_message.message + ); + }; + init_replica_response + } + + fn into_replicate_response( + ack_replication_message: AckReplicationMessage, + ) -> ReplicateResponse { + let Some(ack_replication_message::Message::ReplicateResponse(replicate_response)) = + ack_replication_message.message + else { + panic!( + "expected replicate ACK message, got `{:?}`", + ack_replication_message.message + ); + }; + replicate_response + } + #[tokio::test] - async fn test_replication_stream_task() { + async fn test_replication_stream_task_init() { let leader_id: NodeId = "test-leader".into(); let follower_id: NodeId = "test-follower".into(); let (syn_replication_stream_tx, mut syn_replication_stream_rx) = mpsc::channel(5); @@ -571,8 +777,54 @@ mod tests { ack_replication_stream, ); let dummy_replication_task_future = async move { - while let Some(sync_replication_message) = syn_replication_stream_rx.recv().await { - let replicate_request = sync_replication_message.into_replicate_request().unwrap(); + while let Some(syn_replication_message) = syn_replication_stream_rx.recv().await { + let init_replica_request = into_init_replica_request(syn_replication_message); + let init_replica_response = InitReplicaResponse { + replication_seqno: init_replica_request.replication_seqno, + }; + let ack_replication_message = + AckReplicationMessage::new_init_replica_response(init_replica_response); + ack_replication_stream_tx + .send(Ok(ack_replication_message)) + .await + .unwrap(); + } + }; + tokio::spawn(dummy_replication_task_future); + + let replica_shard = Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: "test-leader".to_string(), + follower_id: Some("test-follower".to_string()), + ..Default::default() + }; + let init_replica_response = replication_stream_task_handle + .replication_client() + .init_replica(replica_shard) + .await + .unwrap(); + assert_eq!(init_replica_response.replication_seqno, 0); + } + + #[tokio::test] + async fn test_replication_stream_task_replicate() { + let leader_id: NodeId = "test-leader".into(); + let follower_id: NodeId = "test-follower".into(); + let (syn_replication_stream_tx, mut syn_replication_stream_rx) = mpsc::channel(5); + let (ack_replication_stream_tx, ack_replication_stream) = + ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); + let replication_stream_task_handle = ReplicationStreamTask::spawn( + leader_id.clone(), + follower_id.clone(), + syn_replication_stream_tx, + ack_replication_stream, + ); + let dummy_replication_task_future = async move { + while let Some(syn_replication_message) = syn_replication_stream_rx.recv().await { + let replicate_request = into_replicate_request(syn_replication_message); let replicate_successes = replicate_request .subrequests .iter() @@ -601,48 +853,49 @@ mod tests { }; tokio::spawn(dummy_replication_task_future); - let replicate_request = ReplicateRequest { - leader_id: "test-leader".to_string(), - follower_id: "test-follower".to_string(), - commit_type: CommitTypeV2::Auto as i32, - subrequests: vec![ - ReplicateSubrequest { - subrequest_id: 0, - index_uid: "test-index:0".to_string(), - source_id: "test-source".to_string(), - shard_id: 1, - doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), - from_position_exclusive: None, - to_position_inclusive: Some(Position::from(0u64)), - }, - ReplicateSubrequest { - subrequest_id: 1, - index_uid: "test-index:0".to_string(), - source_id: "test-source".to_string(), - shard_id: 2, - doc_batch: Some(DocBatchV2::for_test(["test-doc-bar", "test-doc-baz"])), - from_position_exclusive: None, - to_position_inclusive: Some(Position::from(1u64)), - }, - ReplicateSubrequest { - subrequest_id: 2, - index_uid: "test-index:1".to_string(), - source_id: "test-source".to_string(), - shard_id: 1, - doc_batch: Some(DocBatchV2::for_test(["test-qux", "test-doc-tux"])), - from_position_exclusive: Some(Position::from(0u64)), - to_position_inclusive: Some(Position::from(2u64)), - }, - ], - replication_seqno: replication_stream_task_handle.next_replication_seqno(), - }; + let subrequests = vec![ + ReplicateSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + from_position_exclusive: None, + to_position_inclusive: Some(Position::from(0u64)), + }, + ReplicateSubrequest { + subrequest_id: 1, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 2, + doc_batch: Some(DocBatchV2::for_test(["test-doc-bar", "test-doc-baz"])), + from_position_exclusive: None, + to_position_inclusive: Some(Position::from(1u64)), + }, + ReplicateSubrequest { + subrequest_id: 2, + index_uid: "test-index:1".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + doc_batch: Some(DocBatchV2::for_test(["test-qux", "test-doc-tux"])), + from_position_exclusive: Some(Position::from(0u64)), + to_position_inclusive: Some(Position::from(2u64)), + }, + ]; let replicate_response = replication_stream_task_handle - .replicate(replicate_request) + .replication_client() + .replicate( + leader_id.clone(), + follower_id.clone(), + subrequests, + CommitTypeV2::Auto, + ) .await .unwrap(); assert_eq!(replicate_response.follower_id, "test-follower"); assert_eq!(replicate_response.successes.len(), 3); assert_eq!(replicate_response.failures.len(), 0); + assert_eq!(replicate_response.replication_seqno, 0); let replicate_success_0 = &replicate_response.successes[0]; assert_eq!(replicate_success_0.index_uid, "test-index:0"); @@ -671,18 +924,19 @@ mod tests { let (_ack_replication_stream_tx, ack_replication_stream) = ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); let replication_stream_task_handle = ReplicationStreamTask::spawn( - leader_id, - follower_id, + leader_id.clone(), + follower_id.clone(), syn_replication_stream_tx, ack_replication_stream, ); - let replicate_request = ReplicateRequest { - leader_id: "test-leader".to_string(), - follower_id: "test-follower".to_string(), - ..Default::default() - }; let timeout_error = replication_stream_task_handle - .replicate(replicate_request.clone()) + .replication_client() + .replicate( + leader_id.clone(), + follower_id.clone(), + Vec::new(), + CommitTypeV2::Auto, + ) .await .unwrap_err(); assert!(matches!(timeout_error, ReplicationError::Timeout)); @@ -692,7 +946,8 @@ mod tests { .abort(); let closed_error = replication_stream_task_handle - .replicate(replicate_request) + .replication_client() + .replicate(leader_id, follower_id, Vec::new(), CommitTypeV2::Auto) .await .unwrap_err(); @@ -732,6 +987,106 @@ mod tests { disk_capacity, memory_capacity, ); + + // Init shard 01. + let init_replica_request = InitReplicaRequest { + replica_shard: Some(Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: "test-leader".to_string(), + follower_id: Some("test-follower".to_string()), + ..Default::default() + }), + replication_seqno: 0, + }; + let syn_replication_message = + SynReplicationMessage::new_init_replica_request(init_replica_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let init_replica_response = into_init_replica_response(ack_replication_message); + assert_eq!(init_replica_response.replication_seqno, 0); + + // Init shard 02. + let init_replica_request = InitReplicaRequest { + replica_shard: Some(Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 2, + shard_state: ShardState::Open as i32, + leader_id: "test-leader".to_string(), + follower_id: Some("test-follower".to_string()), + ..Default::default() + }), + replication_seqno: 1, + }; + let syn_replication_message = + SynReplicationMessage::new_init_replica_request(init_replica_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let init_replica_response = into_init_replica_response(ack_replication_message); + assert_eq!(init_replica_response.replication_seqno, 1); + + // Init shard 11. + let init_replica_request = InitReplicaRequest { + replica_shard: Some(Shard { + index_uid: "test-index:1".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: "test-leader".to_string(), + follower_id: Some("test-follower".to_string()), + ..Default::default() + }), + replication_seqno: 2, + }; + let syn_replication_message = + SynReplicationMessage::new_init_replica_request(init_replica_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let init_replica_response = into_init_replica_response(ack_replication_message); + assert_eq!(init_replica_response.replication_seqno, 2); + + let state_guard = state.read().await; + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + let replica_shard_01 = state_guard.shards.get(&queue_id_01).unwrap(); + replica_shard_01.assert_is_replica(); + replica_shard_01.assert_is_open(); + replica_shard_01.assert_replication_position(Position::Beginning); + replica_shard_01.assert_truncation_position(Position::Beginning); + + assert!(state_guard.mrecordlog.queue_exists(&queue_id_01)); + + let queue_id_02 = queue_id("test-index:0", "test-source", 2); + + let replica_shard_02 = state_guard.shards.get(&queue_id_02).unwrap(); + replica_shard_02.assert_is_replica(); + replica_shard_02.assert_is_open(); + replica_shard_02.assert_replication_position(Position::Beginning); + replica_shard_02.assert_truncation_position(Position::Beginning); + + let queue_id_11 = queue_id("test-index:1", "test-source", 1); + + let replica_shard_11 = state_guard.shards.get(&queue_id_11).unwrap(); + replica_shard_11.assert_is_replica(); + replica_shard_11.assert_is_open(); + replica_shard_11.assert_replication_position(Position::Beginning); + replica_shard_11.assert_truncation_position(Position::Beginning); + + drop(state_guard); + let replicate_request = ReplicateRequest { leader_id: "test-leader".to_string(), follower_id: "test-follower".to_string(), @@ -765,7 +1120,7 @@ mod tests { to_position_inclusive: Some(Position::from(1u64)), }, ], - replication_seqno: 0, + replication_seqno: 3, }; let syn_replication_message = SynReplicationMessage::new_replicate_request(replicate_request); @@ -779,6 +1134,7 @@ mod tests { assert_eq!(replicate_response.follower_id, "test-follower"); assert_eq!(replicate_response.successes.len(), 3); assert_eq!(replicate_response.failures.len(), 0); + assert_eq!(replicate_response.replication_seqno, 3); let replicate_success_0 = &replicate_response.successes[0]; assert_eq!(replicate_success_0.index_uid, "test-index:0"); @@ -800,22 +1156,16 @@ mod tests { let state_guard = state.read().await; - let queue_id_01 = queue_id("test-index:0", "test-source", 1); - state_guard .mrecordlog .assert_records_eq(&queue_id_01, .., &[(0, "\0\0test-doc-foo")]); - let queue_id_02 = queue_id("test-index:0", "test-source", 2); - state_guard.mrecordlog.assert_records_eq( &queue_id_02, .., &[(0, "\0\0test-doc-bar"), (1, "\0\0test-doc-baz")], ); - let queue_id_11 = queue_id("test-index:1", "test-source", 1); - state_guard.mrecordlog.assert_records_eq( &queue_id_11, .., @@ -836,7 +1186,7 @@ mod tests { from_position_exclusive: Some(Position::from(0u64)), to_position_inclusive: Some(Position::from(1u64)), }], - replication_seqno: 1, + replication_seqno: 4, }; let syn_replication_message = SynReplicationMessage::new_replicate_request(replicate_request); @@ -850,6 +1200,7 @@ mod tests { assert_eq!(replicate_response.follower_id, "test-follower"); assert_eq!(replicate_response.successes.len(), 1); assert_eq!(replicate_response.failures.len(), 0); + assert_eq!(replicate_response.replication_seqno, 4); let replicate_success_0 = &replicate_response.successes[0]; assert_eq!(replicate_success_0.index_uid, "test-index:0"); @@ -976,7 +1327,7 @@ mod tests { let memory_capacity = ByteSize(0); let _replication_task_handle = ReplicationTask::spawn( - leader_id, + leader_id.clone(), follower_id, state.clone(), syn_replication_stream, @@ -984,6 +1335,20 @@ mod tests { disk_capacity, memory_capacity, ); + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + let replica_shard = IngesterShard::new_replica( + leader_id, + ShardState::Open, + Position::Beginning, + Position::Beginning, + ); + state + .write() + .await + .shards + .insert(queue_id_01.clone(), replica_shard); + let replicate_request = ReplicateRequest { leader_id: "test-leader".to_string(), follower_id: "test-follower".to_string(), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 7cef5ddb03f..ac0e11c69a4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use quickwit_proto::control_plane::{ GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, }; -use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, PersistSuccess}; +use quickwit_proto::ingest::ingester::{PersistFailure, PersistSuccess}; use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; @@ -233,12 +233,7 @@ impl SubworkbenchFailure { Self::SourceNotFound => IngestFailureReason::SourceNotFound, Self::Internal(_) => IngestFailureReason::Internal, Self::NoShardsAvailable => IngestFailureReason::NoShardsAvailable, - Self::Persist(persist_failure) => match persist_failure.reason() { - PersistFailureReason::RateLimited => IngestFailureReason::RateLimited, - PersistFailureReason::ResourceExhausted => IngestFailureReason::ResourceExhausted, - PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable, - PersistFailureReason::Unspecified => IngestFailureReason::Unspecified, - }, + Self::Persist(persist_failure) => persist_failure.reason().into(), } } } @@ -278,6 +273,8 @@ impl IngestSubworkbench { #[cfg(test)] mod tests { + use quickwit_proto::ingest::ingester::PersistFailureReason; + use super::*; #[test] diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index cf546107a9c..1d0c8eec430 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -37,6 +37,9 @@ service IngesterService { // Streams status updates, called "observations", from an ingester. rpc OpenObservationStream(OpenObservationStreamRequest) returns (stream ObservationMessage); + // Creates and initializes a set of newly opened shards. This RPC is called by the control plane on leaders. + rpc InitShards(InitShardsRequest) returns (InitShardsResponse); + // Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. rpc TruncateShards(TruncateShardsRequest) returns (TruncateShardsResponse); @@ -46,7 +49,6 @@ service IngesterService { // Pings an ingester to check if it is ready to host shards and serve requests. rpc Ping(PingRequest) returns (PingResponse); - // Decommissions the ingester. rpc Decommission(DecommissionRequest) returns (DecommissionResponse); @@ -83,9 +85,10 @@ message PersistSuccess { enum PersistFailureReason { PERSIST_FAILURE_REASON_UNSPECIFIED = 0; - PERSIST_FAILURE_REASON_SHARD_CLOSED = 1; - PERSIST_FAILURE_REASON_RATE_LIMITED = 2; - PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED = 3; + PERSIST_FAILURE_REASON_SHARD_NOT_FOUND = 1; + PERSIST_FAILURE_REASON_SHARD_CLOSED = 2; + PERSIST_FAILURE_REASON_RATE_LIMITED = 3; + PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED = 4; } message PersistFailure { @@ -99,23 +102,38 @@ message PersistFailure { message SynReplicationMessage { oneof message { OpenReplicationStreamRequest open_request = 1; - ReplicateRequest replicate_request = 2; + InitReplicaRequest init_request = 2; + ReplicateRequest replicate_request = 3; } } message AckReplicationMessage { oneof message { OpenReplicationStreamResponse open_response = 1; - ReplicateResponse replicate_response = 2; + InitReplicaResponse init_response = 2; + ReplicateResponse replicate_response = 3; } } message OpenReplicationStreamRequest { string leader_id = 1; string follower_id = 2; + // Position of the request in the replication stream. + uint64 replication_seqno = 3; } message OpenReplicationStreamResponse { + // Position of the response in the replication stream. It should match the position of the request. + uint64 replication_seqno = 1; +} + +message InitReplicaRequest { + Shard replica_shard = 1; + uint64 replication_seqno = 2; +} + +message InitReplicaResponse { + uint64 replication_seqno = 1; } message ReplicateRequest { @@ -155,9 +173,10 @@ message ReplicateSuccess { enum ReplicateFailureReason { REPLICATE_FAILURE_REASON_UNSPECIFIED = 0; - REPLICATE_FAILURE_REASON_SHARD_CLOSED = 1; - reserved 2; // REPLICATE_FAILURE_REASON_RATE_LIMITED = 2; - REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED = 3; + REPLICATE_FAILURE_REASON_SHARD_NOT_FOUND = 1; + REPLICATE_FAILURE_REASON_SHARD_CLOSED = 2; + reserved 3; // REPLICATE_FAILURE_REASON_RATE_LIMITED = 3; + REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED = 4; } message ReplicateFailure { @@ -201,6 +220,13 @@ message FetchResponseV2 { quickwit.ingest.Position to_position_inclusive = 6; } +message InitShardsRequest { + repeated quickwit.ingest.Shard shards = 1; +} + +message InitShardsResponse { +} + message CloseShardsRequest { repeated quickwit.ingest.ShardIds shards = 1; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index f65bae225a4..e8afa95ff53 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -71,7 +71,7 @@ pub struct PersistFailure { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SynReplicationMessage { - #[prost(oneof = "syn_replication_message::Message", tags = "1, 2")] + #[prost(oneof = "syn_replication_message::Message", tags = "1, 2, 3")] pub message: ::core::option::Option, } /// Nested message and enum types in `SynReplicationMessage`. @@ -84,6 +84,8 @@ pub mod syn_replication_message { #[prost(message, tag = "1")] OpenRequest(super::OpenReplicationStreamRequest), #[prost(message, tag = "2")] + InitRequest(super::InitReplicaRequest), + #[prost(message, tag = "3")] ReplicateRequest(super::ReplicateRequest), } } @@ -91,7 +93,7 @@ pub mod syn_replication_message { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AckReplicationMessage { - #[prost(oneof = "ack_replication_message::Message", tags = "1, 2")] + #[prost(oneof = "ack_replication_message::Message", tags = "1, 2, 3")] pub message: ::core::option::Option, } /// Nested message and enum types in `AckReplicationMessage`. @@ -104,6 +106,8 @@ pub mod ack_replication_message { #[prost(message, tag = "1")] OpenResponse(super::OpenReplicationStreamResponse), #[prost(message, tag = "2")] + InitResponse(super::InitReplicaResponse), + #[prost(message, tag = "3")] ReplicateResponse(super::ReplicateResponse), } } @@ -115,11 +119,34 @@ pub struct OpenReplicationStreamRequest { pub leader_id: ::prost::alloc::string::String, #[prost(string, tag = "2")] pub follower_id: ::prost::alloc::string::String, + /// Position of the request in the replication stream. + #[prost(uint64, tag = "3")] + pub replication_seqno: u64, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OpenReplicationStreamResponse { + /// Position of the response in the replication stream. It should match the position of the request. + #[prost(uint64, tag = "1")] + pub replication_seqno: u64, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InitReplicaRequest { + #[prost(message, optional, tag = "1")] + pub replica_shard: ::core::option::Option, + #[prost(uint64, tag = "2")] + pub replication_seqno: u64, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct OpenReplicationStreamResponse {} +pub struct InitReplicaResponse { + #[prost(uint64, tag = "1")] + pub replication_seqno: u64, +} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -261,6 +288,17 @@ pub struct FetchResponseV2 { #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct InitShardsRequest { + #[prost(message, repeated, tag = "1")] + pub shards: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InitShardsResponse {} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CloseShardsRequest { #[prost(message, repeated, tag = "1")] pub shards: ::prost::alloc::vec::Vec, @@ -309,9 +347,10 @@ pub struct ObservationMessage { #[repr(i32)] pub enum PersistFailureReason { Unspecified = 0, - ShardClosed = 1, - RateLimited = 2, - ResourceExhausted = 3, + ShardNotFound = 1, + ShardClosed = 2, + RateLimited = 3, + ResourceExhausted = 4, } impl PersistFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -321,6 +360,9 @@ impl PersistFailureReason { pub fn as_str_name(&self) -> &'static str { match self { PersistFailureReason::Unspecified => "PERSIST_FAILURE_REASON_UNSPECIFIED", + PersistFailureReason::ShardNotFound => { + "PERSIST_FAILURE_REASON_SHARD_NOT_FOUND" + } PersistFailureReason::ShardClosed => "PERSIST_FAILURE_REASON_SHARD_CLOSED", PersistFailureReason::RateLimited => "PERSIST_FAILURE_REASON_RATE_LIMITED", PersistFailureReason::ResourceExhausted => { @@ -332,6 +374,7 @@ impl PersistFailureReason { pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "PERSIST_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), + "PERSIST_FAILURE_REASON_SHARD_NOT_FOUND" => Some(Self::ShardNotFound), "PERSIST_FAILURE_REASON_SHARD_CLOSED" => Some(Self::ShardClosed), "PERSIST_FAILURE_REASON_RATE_LIMITED" => Some(Self::RateLimited), "PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED" => Some(Self::ResourceExhausted), @@ -345,8 +388,9 @@ impl PersistFailureReason { #[repr(i32)] pub enum ReplicateFailureReason { Unspecified = 0, - ShardClosed = 1, - ResourceExhausted = 3, + ShardNotFound = 1, + ShardClosed = 2, + ResourceExhausted = 4, } impl ReplicateFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -356,6 +400,9 @@ impl ReplicateFailureReason { pub fn as_str_name(&self) -> &'static str { match self { ReplicateFailureReason::Unspecified => "REPLICATE_FAILURE_REASON_UNSPECIFIED", + ReplicateFailureReason::ShardNotFound => { + "REPLICATE_FAILURE_REASON_SHARD_NOT_FOUND" + } ReplicateFailureReason::ShardClosed => { "REPLICATE_FAILURE_REASON_SHARD_CLOSED" } @@ -368,6 +415,7 @@ impl ReplicateFailureReason { pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "REPLICATE_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), + "REPLICATE_FAILURE_REASON_SHARD_NOT_FOUND" => Some(Self::ShardNotFound), "REPLICATE_FAILURE_REASON_SHARD_CLOSED" => Some(Self::ShardClosed), "REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED" => { Some(Self::ResourceExhausted) @@ -447,6 +495,11 @@ pub trait IngesterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + &mut self, request: OpenObservationStreamRequest, ) -> crate::ingest::IngestV2Result>; + /// Creates and initializes a set of newly opened shards. This RPC is called by the control plane on leaders. + async fn init_shards( + &mut self, + request: InitShardsRequest, + ) -> crate::ingest::IngestV2Result; /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. async fn truncate_shards( &mut self, @@ -571,6 +624,12 @@ impl IngesterService for IngesterServiceClient { ) -> crate::ingest::IngestV2Result> { self.inner.open_observation_stream(request).await } + async fn init_shards( + &mut self, + request: InitShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.init_shards(request).await + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -635,6 +694,12 @@ pub mod ingester_service_mock { > { self.inner.lock().await.open_observation_stream(request).await } + async fn init_shards( + &mut self, + request: super::InitShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner.lock().await.init_shards(request).await + } async fn truncate_shards( &mut self, request: super::TruncateShardsRequest, @@ -740,6 +805,22 @@ impl tower::Service for Box { Box::pin(fut) } } +impl tower::Service for Box { + type Response = InitShardsResponse; + type Error = crate::ingest::IngestV2Error; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: InitShardsRequest) -> Self::Future { + let mut svc = self.clone(); + let fut = async move { svc.init_shards(request).await }; + Box::pin(fut) + } +} impl tower::Service for Box { type Response = TruncateShardsResponse; type Error = crate::ingest::IngestV2Error; @@ -828,6 +909,11 @@ struct IngesterServiceTowerBlock { IngesterServiceStream, crate::ingest::IngestV2Error, >, + init_shards_svc: quickwit_common::tower::BoxService< + InitShardsRequest, + InitShardsResponse, + crate::ingest::IngestV2Error, + >, truncate_shards_svc: quickwit_common::tower::BoxService< TruncateShardsRequest, TruncateShardsResponse, @@ -857,6 +943,7 @@ impl Clone for IngesterServiceTowerBlock { open_replication_stream_svc: self.open_replication_stream_svc.clone(), open_fetch_stream_svc: self.open_fetch_stream_svc.clone(), open_observation_stream_svc: self.open_observation_stream_svc.clone(), + init_shards_svc: self.init_shards_svc.clone(), truncate_shards_svc: self.truncate_shards_svc.clone(), close_shards_svc: self.close_shards_svc.clone(), ping_svc: self.ping_svc.clone(), @@ -890,6 +977,12 @@ impl IngesterService for IngesterServiceTowerBlock { ) -> crate::ingest::IngestV2Result> { self.open_observation_stream_svc.ready().await?.call(request).await } + async fn init_shards( + &mut self, + request: InitShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.init_shards_svc.ready().await?.call(request).await + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -954,6 +1047,15 @@ pub struct IngesterServiceTowerBlockBuilder { >, >, #[allow(clippy::type_complexity)] + init_shards_layer: Option< + quickwit_common::tower::BoxLayer< + Box, + InitShardsRequest, + InitShardsResponse, + crate::ingest::IngestV2Error, + >, + >, + #[allow(clippy::type_complexity)] truncate_shards_layer: Option< quickwit_common::tower::BoxLayer< Box, @@ -1022,6 +1124,12 @@ impl IngesterServiceTowerBlockBuilder { >::Future: Send + 'static, + L::Service: tower::Service< + InitShardsRequest, + Response = InitShardsResponse, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, L::Service: tower::Service< TruncateShardsRequest, Response = TruncateShardsResponse, @@ -1060,6 +1168,10 @@ impl IngesterServiceTowerBlockBuilder { .open_observation_stream_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); + self + .init_shards_layer = Some( + quickwit_common::tower::BoxLayer::new(layer.clone()), + ); self .truncate_shards_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), @@ -1137,6 +1249,19 @@ impl IngesterServiceTowerBlockBuilder { ); self } + pub fn init_shards_layer(mut self, layer: L) -> Self + where + L: tower::Layer> + Send + Sync + 'static, + L::Service: tower::Service< + InitShardsRequest, + Response = InitShardsResponse, + Error = crate::ingest::IngestV2Error, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.init_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn truncate_shards_layer(mut self, layer: L) -> Self where L: tower::Layer> + Send + Sync + 'static, @@ -1250,6 +1375,11 @@ impl IngesterServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; + let init_shards_svc = if let Some(layer) = self.init_shards_layer { + layer.layer(boxed_instance.clone()) + } else { + quickwit_common::tower::BoxService::new(boxed_instance.clone()) + }; let truncate_shards_svc = if let Some(layer) = self.truncate_shards_layer { layer.layer(boxed_instance.clone()) } else { @@ -1276,6 +1406,7 @@ impl IngesterServiceTowerBlockBuilder { open_replication_stream_svc, open_fetch_stream_svc, open_observation_stream_svc, + init_shards_svc, truncate_shards_svc, close_shards_svc, ping_svc, @@ -1389,6 +1520,12 @@ where crate::ingest::IngestV2Error, >, > + + tower::Service< + InitShardsRequest, + Response = InitShardsResponse, + Error = crate::ingest::IngestV2Error, + Future = BoxFuture, + > + tower::Service< TruncateShardsRequest, Response = TruncateShardsResponse, @@ -1438,6 +1575,12 @@ where ) -> crate::ingest::IngestV2Result> { self.call(request).await } + async fn init_shards( + &mut self, + request: InitShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.call(request).await + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -1549,6 +1692,16 @@ where }) .map_err(|error| error.into()) } + async fn init_shards( + &mut self, + request: InitShardsRequest, + ) -> crate::ingest::IngestV2Result { + self.inner + .init_shards(request) + .await + .map(|response| response.into_inner()) + .map_err(|error| error.into()) + } async fn truncate_shards( &mut self, request: TruncateShardsRequest, @@ -1661,6 +1814,17 @@ for IngesterServiceGrpcServerAdapter { .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) .map_err(|error| error.into()) } + async fn init_shards( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .clone() + .init_shards(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(|error| error.into()) + } async fn truncate_shards( &self, request: tonic::Request, @@ -1918,6 +2082,37 @@ pub mod ingester_service_grpc_client { ); self.inner.server_streaming(req, path, codec).await } + /// Creates and initializes a set of newly opened shards. This RPC is called by the control plane on leaders. + pub async fn init_shards( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.ingest.ingester.IngesterService/InitShards", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.ingest.ingester.IngesterService", + "InitShards", + ), + ); + self.inner.unary(req, path, codec).await + } /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. pub async fn truncate_shards( &mut self, @@ -2093,6 +2288,14 @@ pub mod ingester_service_grpc_server { tonic::Response, tonic::Status, >; + /// Creates and initializes a set of newly opened shards. This RPC is called by the control plane on leaders. + async fn init_shards( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Truncates a set of shards at the given positions. This RPC is called by indexers on leaders AND followers. async fn truncate_shards( &self, @@ -2391,6 +2594,50 @@ pub mod ingester_service_grpc_server { }; Box::pin(fut) } + "/quickwit.ingest.ingester.IngesterService/InitShards" => { + #[allow(non_camel_case_types)] + struct InitShardsSvc(pub Arc); + impl< + T: IngesterServiceGrpc, + > tonic::server::UnaryService + for InitShardsSvc { + type Response = super::InitShardsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).init_shards(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = InitShardsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.ingest.ingester.IngesterService/TruncateShards" => { #[allow(non_camel_case_types)] struct TruncateShardsSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index f24a9de9578..b743124f77a 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -67,18 +67,17 @@ impl SynReplicationMessage { } } - pub fn into_replicate_request(self) -> Option { - match self.message { - Some(syn_replication_message::Message::ReplicateRequest(replicate_request)) => { - Some(replicate_request) - } - _ => None, + pub fn new_open_request(open_request: OpenReplicationStreamRequest) -> Self { + Self { + message: Some(syn_replication_message::Message::OpenRequest(open_request)), } } - pub fn new_open_request(open_request: OpenReplicationStreamRequest) -> Self { + pub fn new_init_replica_request(init_replica_request: InitReplicaRequest) -> Self { Self { - message: Some(syn_replication_message::Message::OpenRequest(open_request)), + message: Some(syn_replication_message::Message::InitRequest( + init_replica_request, + )), } } @@ -109,6 +108,14 @@ impl AckReplicationMessage { } } + pub fn new_init_replica_response(init_replica_response: InitReplicaResponse) -> Self { + Self { + message: Some(ack_replication_message::Message::InitResponse( + init_replica_response, + )), + } + } + pub fn new_replicate_response(replicate_response: ReplicateResponse) -> Self { Self { message: Some(ack_replication_message::Message::ReplicateResponse( diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index ed4cf5f886c..8636ecc3bc2 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -19,7 +19,8 @@ use bytes::Bytes; -use self::ingester::FetchResponseV2; +use self::ingester::{FetchResponseV2, PersistFailureReason, ReplicateFailureReason}; +use self::router::IngestFailureReason; use super::types::NodeId; use super::{ServiceError, ServiceErrorCode}; use crate::control_plane::ControlPlaneError; @@ -220,6 +221,29 @@ impl ShardIds { } } +impl From for IngestFailureReason { + fn from(reason: PersistFailureReason) -> Self { + match reason { + PersistFailureReason::Unspecified => IngestFailureReason::Unspecified, + PersistFailureReason::ShardNotFound => IngestFailureReason::NoShardsAvailable, + PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable, + PersistFailureReason::ResourceExhausted => IngestFailureReason::ResourceExhausted, + PersistFailureReason::RateLimited => IngestFailureReason::RateLimited, + } + } +} + +impl From for PersistFailureReason { + fn from(reason: ReplicateFailureReason) -> Self { + match reason { + ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, + ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound, + ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, + ReplicateFailureReason::ResourceExhausted => PersistFailureReason::ResourceExhausted, + } + } +} + #[cfg(test)] mod tests { use super::*;