diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 29fab1101cb..70683167169 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -42,7 +42,7 @@ pub trait EventSubscriber: Send + Sync + 'static { impl EventSubscriber for F where E: Event, - F: Fn(E) + Send + Sync + 'static, + F: FnMut(E) + Send + Sync + 'static, { async fn handle_event(&mut self, event: E) { (self)(event); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 9ff314919e8..381be88535e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -27,6 +27,7 @@ mod metrics; mod models; mod mrecord; mod mrecordlog_utils; +mod publish_tracker; mod rate_meter; mod replication; mod router; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs b/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs new file mode 100644 index 00000000000..64f9c907722 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs @@ -0,0 +1,341 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle}; +use quickwit_proto::indexing::ShardPositionsUpdate; +use quickwit_proto::types::{Position, ShardId}; +use tokio::sync::Notify; +use tracing::error; + +/// A helper for awaiting shard publish events when running in `wait_for` and +/// `force` commit mode. +/// +/// Registers a set of shard positions and listens to [`ShardPositionsUpdate`] +/// events to assert when all the persisted events have been published. To +/// ensure that no events are missed: +/// - create the tracker before any persist requests is sent +/// - call `register_requested_shards` before each persist request to ensure that +/// the associated publish events are recorded +/// - call `track_persisted_shard_position` after each successful persist subrequests +pub struct PublishTracker { + state: Arc>, + // sync::notify instead of sync::oneshot because we don't want to store the permit + publish_complete: Arc, + _publish_listen_handle: EventSubscriptionHandle, +} + +impl PublishTracker { + pub fn new(event_tracker: EventBroker) -> Self { + let state = Arc::new(Mutex::new(ShardPublishStates::default())); + let state_clone = state.clone(); + let publish_complete = Arc::new(Notify::new()); + let publish_complete_notifier = publish_complete.clone(); + let _publish_listen_handle = + event_tracker.subscribe(move |update: ShardPositionsUpdate| { + let mut publish_states = state_clone.lock().unwrap(); + for (updated_shard_id, updated_position) in &update.updated_shard_positions { + publish_states.position_published( + updated_shard_id, + updated_position, + &publish_complete_notifier, + ); + } + }); + Self { + state, + _publish_listen_handle, + publish_complete, + } + } + + pub fn register_requested_shards<'a>( + &'a self, + shard_ids: impl IntoIterator, + ) { + let mut publish_states = self.state.lock().unwrap(); + for shard_id in shard_ids { + publish_states.shard_tracked(shard_id.clone()); + } + } + + pub fn track_persisted_shard_position(&self, shard_id: ShardId, new_position: Position) { + let mut publish_states = self.state.lock().unwrap(); + publish_states.position_persisted(&shard_id, &new_position) + } + + pub async fn wait_publish_complete(self) { + // correctness: `awaiting_count` cannot be increased after this point + // because `self` is consumed. By subscribing to `publish_complete` + // before checking `awaiting_count`, we make sure we don't miss the + // moment when it becomes 0. + let notified = self.publish_complete.notified(); + if self.state.lock().unwrap().awaiting_count == 0 { + return; + } + notified.await; + } +} + +enum PublishState { + /// The persist request for this shard has been sent + Tracked, + /// The persist request for this shard success response has been received + /// but the position has not yet been published + AwaitingPublish(Position), + /// The shard has been published up to this position (might happen before + /// the persist success is received) + Published(Position), +} + +#[derive(Default)] +struct ShardPublishStates { + states: HashMap, + awaiting_count: usize, +} + +impl ShardPublishStates { + fn shard_tracked(&mut self, shard_id: ShardId) { + self.states.entry(shard_id).or_insert(PublishState::Tracked); + } + + fn position_published( + &mut self, + shard_id: &ShardId, + new_position: &Position, + publish_complete_notifier: &Notify, + ) { + if let Some(publish_state) = self.states.get_mut(shard_id) { + match publish_state { + PublishState::AwaitingPublish(shard_position) if new_position >= shard_position => { + *publish_state = PublishState::Published(new_position.clone()); + self.awaiting_count -= 1; + if self.awaiting_count == 0 { + // The notification is only relevant once + // `self.wait_publish_complete()` is called. + // Before that, `state.awaiting_publish` might + // still be re-populated. + publish_complete_notifier.notify_waiters(); + } + } + PublishState::Published(current_position) if new_position > current_position => { + *current_position = new_position.clone(); + } + PublishState::Tracked => { + *publish_state = PublishState::Published(new_position.clone()); + } + PublishState::Published(_) => { + // looks like a duplicate or out-of-order event + } + PublishState::AwaitingPublish(_) => { + // the shard made some progress but we are waiting for more + } + } + } + // else: this shard is not being tracked here + } + + fn position_persisted(&mut self, shard_id: &ShardId, new_position: &Position) { + if let Some(publish_state) = self.states.get_mut(shard_id) { + match publish_state { + PublishState::Published(current_position) if new_position <= current_position => { + // new position already published, no need to track it + } + PublishState::AwaitingPublish(old_position) => { + error!( + %old_position, + %new_position, + %shard_id, + "shard persisted positions should not be tracked multiple times" + ); + } + PublishState::Tracked | PublishState::Published(_) => { + *publish_state = PublishState::AwaitingPublish(new_position.clone()); + self.awaiting_count += 1; + } + } + } else { + error!(%shard_id, "requested shards should be registered before their position is tracked") + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_proto::types::{IndexUid, ShardId, SourceUid}; + + use super::*; + + #[tokio::test] + async fn test_shard_publish_states() { + let mut shard_publish_states = ShardPublishStates::default(); + let notifier = Arc::new(Notify::new()); + + let shard_id_1 = ShardId::from("test-shard-1"); + let shard_id_2 = ShardId::from("test-shard-2"); + let shard_id_3 = ShardId::from("test-shard-3"); + let shard_id_4 = ShardId::from("test-shard-4"); // not tracked + + shard_publish_states.shard_tracked(shard_id_1.clone()); + shard_publish_states.shard_tracked(shard_id_2.clone()); + shard_publish_states.shard_tracked(shard_id_3.clone()); + + let notifier_receiver = notifier.clone(); + let notified_subscription = notifier_receiver.notified(); + + shard_publish_states.position_persisted(&shard_id_1, &Position::offset(10usize)); + assert_eq!(shard_publish_states.awaiting_count, 1); + shard_publish_states.position_persisted(&shard_id_2, &Position::offset(20usize)); + assert_eq!(shard_publish_states.awaiting_count, 2); + shard_publish_states.position_published(&shard_id_1, &Position::offset(15usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 1); + shard_publish_states.position_published(&shard_id_2, &Position::offset(20usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 0); + + // check that only the notification that was subscribed before holds a permit + tokio::time::timeout(Duration::from_millis(100), notifier.notified()) + .await + .unwrap_err(); + tokio::time::timeout(Duration::from_millis(100), notified_subscription) + .await + .unwrap(); + + let notified_subscription = notifier_receiver.notified(); + shard_publish_states.position_published(&shard_id_3, &Position::offset(10usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 0); + shard_publish_states.position_persisted(&shard_id_3, &Position::offset(10usize)); + assert_eq!(shard_publish_states.awaiting_count, 0); + // no notification expected here as the shard never becomes AwaitingPublish + tokio::time::timeout(Duration::from_millis(100), notified_subscription) + .await + .unwrap_err(); + // shard 4 is not tracked + shard_publish_states.position_published(&shard_id_4, &Position::offset(10usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 0); + assert!(!shard_publish_states.states.contains_key(&shard_id_4)); + } + + #[tokio::test] + async fn test_publish_tracker() { + let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); + let event_broker = EventBroker::default(); + let tracker = PublishTracker::new(event_broker.clone()); + let shard_id_1 = ShardId::from("test-shard-1"); + let shard_id_2 = ShardId::from("test-shard-2"); + let shard_id_3 = ShardId::from("test-shard-3"); + let shard_id_4 = ShardId::from("test-shard-4"); + let shard_id_5 = ShardId::from("test-shard-5"); // not tracked + + tracker.register_requested_shards([&shard_id_1, &shard_id_2, &shard_id_3, &shard_id_4]); + + tracker.track_persisted_shard_position(shard_id_1.clone(), Position::offset(42usize)); + tracker.track_persisted_shard_position(shard_id_2.clone(), Position::offset(42usize)); + tracker.track_persisted_shard_position(shard_id_3.clone(), Position::offset(42usize)); + + event_broker.publish(ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: index_uid.clone(), + source_id: "test-source".to_string(), + }, + updated_shard_positions: vec![ + (shard_id_1.clone(), Position::offset(42usize)), + (shard_id_2.clone(), Position::offset(666usize)), + (shard_id_5.clone(), Position::offset(888usize)), + ] + .into_iter() + .collect(), + }); + + event_broker.publish(ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: index_uid.clone(), + source_id: "test-source".to_string(), + }, + updated_shard_positions: vec![ + (shard_id_3.clone(), Position::eof(42usize)), + (shard_id_4.clone(), Position::offset(42usize)), + ] + .into_iter() + .collect(), + }); + + // persist response received after the publish event + tracker.track_persisted_shard_position(shard_id_4.clone(), Position::offset(42usize)); + + tokio::time::timeout(Duration::from_millis(200), tracker.wait_publish_complete()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_publish_tracker_waits() { + let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); + let shard_id_1 = ShardId::from("test-shard-1"); + let shard_id_2 = ShardId::from("test-shard-2"); + let position = Position::offset(42usize); + + { + let event_broker = EventBroker::default(); + let tracker = PublishTracker::new(event_broker.clone()); + tracker.register_requested_shards([&shard_id_1, &shard_id_2]); + tracker.track_persisted_shard_position(shard_id_1.clone(), position.clone()); + tracker.track_persisted_shard_position(shard_id_2.clone(), position.clone()); + + event_broker.publish(ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: index_uid.clone(), + source_id: "test-source".to_string(), + }, + updated_shard_positions: vec![(shard_id_1.clone(), position.clone())] + .into_iter() + .collect(), + }); + + tokio::time::timeout(Duration::from_millis(200), tracker.wait_publish_complete()) + .await + .unwrap_err(); + } + { + let event_broker = EventBroker::default(); + let tracker = PublishTracker::new(event_broker.clone()); + tracker.register_requested_shards([&shard_id_1, &shard_id_2]); + tracker.track_persisted_shard_position(shard_id_1.clone(), position.clone()); + event_broker.publish(ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: index_uid.clone(), + source_id: "test-source".to_string(), + }, + updated_shard_positions: vec![(shard_id_1.clone(), position.clone())] + .into_iter() + .collect(), + }); + // sleep to make sure the event is processed + tokio::time::sleep(Duration::from_millis(50)).await; + tracker.track_persisted_shard_position(shard_id_2.clone(), position.clone()); + + tokio::time::timeout(Duration::from_millis(200), tracker.wait_publish_complete()) + .await + .unwrap_err(); + } + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 4f46ed5ea92..d20d5c2e74c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -101,6 +101,7 @@ pub struct IngestRouter { replication_factor: usize, // Limits the number of ingest requests in-flight to some capacity in bytes. ingest_semaphore: Arc, + event_broker: EventBroker, } struct RouterState { @@ -125,6 +126,7 @@ impl IngestRouter { control_plane: ControlPlaneServiceClient, ingester_pool: IngesterPool, replication_factor: usize, + event_broker: EventBroker, ) -> Self { let state = Arc::new(Mutex::new(RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer::default(), @@ -143,15 +145,16 @@ impl IngestRouter { state, replication_factor, ingest_semaphore, + event_broker, } } - pub fn subscribe(&self, event_broker: &EventBroker) { + pub fn subscribe(&self) { let weak_router_state = WeakRouterState(Arc::downgrade(&self.state)); - event_broker + self.event_broker .subscribe::(weak_router_state.clone()) .forever(); - event_broker + self.event_broker .subscribe::(weak_router_state) .forever(); } @@ -422,6 +425,7 @@ impl IngestRouter { subrequests, commit_type: commit_type as i32, }; + workbench.record_persist_request(&persist_request); let persist_future = async move { let persist_result = tokio::time::timeout( PERSIST_REQUEST_TIMEOUT, @@ -454,12 +458,20 @@ impl IngestRouter { max_num_attempts: usize, ) -> IngestResponseV2 { let commit_type = ingest_request.commit_type(); - let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts); + let mut workbench = if matches!(commit_type, CommitTypeV2::Force | CommitTypeV2::WaitFor) { + IngestWorkbench::new_with_publish_tracking( + ingest_request.subrequests, + max_num_attempts, + self.event_broker.clone(), + ) + } else { + IngestWorkbench::new(ingest_request.subrequests, max_num_attempts) + }; while !workbench.is_complete() { workbench.new_attempt(); self.batch_persist(&mut workbench, commit_type).await; } - workbench.into_ingest_result() + workbench.into_ingest_result().await } async fn ingest_timeout( @@ -595,9 +607,14 @@ impl IngestRouterService for IngestRouter { .try_acquire_many_owned(request_size_bytes as u32) .map_err(|_| IngestV2Error::TooManyRequests(RateLimitingCause::RouterLoadShedding))?; - let ingest_res = self - .ingest_timeout(ingest_request, ingest_request_timeout()) - .await; + let ingest_res = if ingest_request.commit_type() == CommitTypeV2::Auto { + self.ingest_timeout(ingest_request, ingest_request_timeout()) + .await + } else { + Ok(self + .retry_batch_persist(ingest_request, MAX_PERSIST_ATTEMPTS) + .await) + }; update_ingest_metrics(&ingest_res, num_subrequests); @@ -712,6 +729,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let mut workbench = IngestWorkbench::default(); let (get_or_create_open_shard_request_opt, rendezvous) = router @@ -948,6 +966,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let ingest_subrequests = vec![ IngestSubrequest { @@ -1062,6 +1081,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1120,6 +1140,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1149,6 +1170,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1200,6 +1222,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let ingest_subrequests = vec![IngestSubrequest { subrequest_id: 0, @@ -1251,6 +1274,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); let mut state_guard = router.state.lock().await; @@ -1337,6 +1361,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let ingest_subrequests = vec![ IngestSubrequest { @@ -1416,6 +1441,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); let index_uid2: IndexUid = IndexUid::for_test("test-index-1", 0); @@ -1653,6 +1679,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let mut state_guard = router.state.lock().await; let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); @@ -1757,14 +1784,15 @@ mod tests { let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); let ingester_pool = IngesterPool::default(); let replication_factor = 1; + let event_broker = EventBroker::default(); let router = IngestRouter::new( self_node_id, control_plane, ingester_pool.clone(), replication_factor, + event_broker.clone(), ); - let event_broker = EventBroker::default(); - router.subscribe(&event_broker); + router.subscribe(); let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); let mut state_guard = router.state.lock().await; @@ -1854,6 +1882,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0); let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); @@ -1903,6 +1932,7 @@ mod tests { control_plane, ingester_pool.clone(), replication_factor, + EventBroker::default(), ); let mut state_guard = router.state.lock().await; let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index a8bc0700270..7dab68c5485 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -19,11 +19,14 @@ use std::collections::{BTreeMap, HashSet}; +use quickwit_common::pubsub::EventBroker; use quickwit_common::rate_limited_error; use quickwit_proto::control_plane::{ GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, }; -use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, PersistSuccess}; +use quickwit_proto::ingest::ingester::{ + PersistFailure, PersistFailureReason, PersistRequest, PersistSuccess, +}; use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; @@ -31,6 +34,7 @@ use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause}; use quickwit_proto::types::{NodeId, ShardId, SubrequestId}; use tracing::warn; +use super::publish_tracker::PublishTracker; use super::router::PersistRequestSummary; /// A helper struct for managing the state of the subrequests of an ingest request during multiple @@ -44,13 +48,14 @@ pub(super) struct IngestWorkbench { /// subrequest. pub num_attempts: usize, pub max_num_attempts: usize, - // List of leaders that have been marked as temporarily unavailable. - // These leaders have encountered a transport error during an attempt and will be treated as if - // they were out of the pool for subsequent attempts. - // - // (The point here is to make sure we do not wait for the failure detection to kick the node - // out of the ingest node.) + /// List of leaders that have been marked as temporarily unavailable. + /// These leaders have encountered a transport error during an attempt and will be treated as + /// if they were out of the pool for subsequent attempts. + /// + /// (The point here is to make sure we do not wait for the failure detection to kick the node + /// out of the ingest node.) pub unavailable_leaders: HashSet, + publish_tracker: Option, } /// Returns an iterator of pending of subrequests, sorted by sub request id. @@ -67,7 +72,11 @@ pub(super) fn pending_subrequests( } impl IngestWorkbench { - pub fn new(ingest_subrequests: Vec, max_num_attempts: usize) -> Self { + fn new_inner( + ingest_subrequests: Vec, + max_num_attempts: usize, + publish_tracker: Option, + ) -> Self { let subworkbenches: BTreeMap = ingest_subrequests .into_iter() .map(|subrequest| { @@ -81,16 +90,33 @@ impl IngestWorkbench { Self { subworkbenches, max_num_attempts, + publish_tracker, ..Default::default() } } + pub fn new(ingest_subrequests: Vec, max_num_attempts: usize) -> Self { + Self::new_inner(ingest_subrequests, max_num_attempts, None) + } + + pub fn new_with_publish_tracking( + ingest_subrequests: Vec, + max_num_attempts: usize, + event_broker: EventBroker, + ) -> Self { + Self::new_inner( + ingest_subrequests, + max_num_attempts, + Some(PublishTracker::new(event_broker)), + ) + } + pub fn new_attempt(&mut self) { self.num_attempts += 1; } - /// Returns true if all subrequests were successful or if the number of - /// attempts has been exhausted. + /// Returns true if all subrequests were successfully persisted or if the + /// number of attempts has been exhausted. pub fn is_complete(&self) -> bool { self.num_successes >= self.subworkbenches.len() || self.num_attempts >= self.max_num_attempts @@ -107,6 +133,16 @@ impl IngestWorkbench { .all(|subworbench| !subworbench.is_pending()) } + pub fn record_persist_request(&self, persist_request: &PersistRequest) { + if let Some(publish_tracker) = &self.publish_tracker { + let shards = persist_request + .subrequests + .iter() + .map(|subrequest| subrequest.shard_id()); + publish_tracker.register_requested_shards(shards); + } + } + pub fn record_get_or_create_open_shards_failure( &mut self, open_shards_failure: GetOrCreateOpenShardsFailure, @@ -138,6 +174,14 @@ impl IngestWorkbench { ); return; }; + if let Some(publish_tracker) = &mut self.publish_tracker { + if let Some(position) = &persist_success.replication_position_inclusive { + publish_tracker.track_persisted_shard_position( + persist_success.shard_id().clone(), + position.clone(), + ); + } + } self.num_successes += 1; subworkbench.num_attempts += 1; subworkbench.persist_success_opt = Some(persist_success); @@ -223,7 +267,7 @@ impl IngestWorkbench { ); } - pub fn into_ingest_result(self) -> IngestResponseV2 { + pub async fn into_ingest_result(self) -> IngestResponseV2 { let num_subworkbenches = self.subworkbenches.len(); let mut successes = Vec::with_capacity(self.num_successes); let mut failures = Vec::with_capacity(num_subworkbenches - self.num_successes); @@ -255,6 +299,10 @@ impl IngestWorkbench { let num_failures = failures.len(); assert_eq!(num_successes + num_failures, num_subworkbenches); + if let Some(publish_tracker) = self.publish_tracker { + publish_tracker.wait_publish_complete().await; + } + // For tests, we sort the successes and failures by subrequest_id #[cfg(test)] { @@ -358,8 +406,11 @@ impl IngestSubworkbench { #[cfg(test)] mod tests { - use quickwit_proto::ingest::ingester::PersistFailureReason; - use quickwit_proto::types::ShardId; + use std::time::Duration; + + use quickwit_proto::indexing::ShardPositionsUpdate; + use quickwit_proto::ingest::ingester::{PersistFailureReason, PersistSubrequest}; + use quickwit_proto::types::{IndexUid, Position, ShardId, SourceUid}; use super::*; @@ -485,6 +536,182 @@ mod tests { assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 0); } + #[tokio::test] + async fn test_workbench_publish_tracking_empty() { + let workbench = + IngestWorkbench::new_with_publish_tracking(Vec::new(), 1, EventBroker::default()); + assert!(workbench.is_complete()); + assert_eq!( + workbench.into_ingest_result().await, + IngestResponseV2::default() + ); + } + + #[tokio::test] + async fn test_workbench_publish_tracking_happy_path() { + let event_broker = EventBroker::default(); + let shard_id_1 = ShardId::from("test-shard-1"); + let shard_id_2 = ShardId::from("test-shard-2"); + let ingest_subrequests = vec![ + IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 1, + ..Default::default() + }, + ]; + let mut workbench = + IngestWorkbench::new_with_publish_tracking(ingest_subrequests, 1, event_broker.clone()); + assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 2); + assert!(!workbench.is_complete()); + + let persist_request = PersistRequest { + subrequests: vec![ + PersistSubrequest { + subrequest_id: 0, + shard_id: Some(shard_id_1.clone()), + ..Default::default() + }, + PersistSubrequest { + subrequest_id: 1, + shard_id: Some(shard_id_2.clone()), + ..Default::default() + }, + ], + ..Default::default() + }; + workbench.record_persist_request(&persist_request); + + let persist_success = PersistSuccess { + subrequest_id: 0, + shard_id: Some(shard_id_1.clone()), + replication_position_inclusive: Some(Position::offset(42usize)), + ..Default::default() + }; + workbench.record_persist_success(persist_success); + + let persist_failure = PersistFailure { + subrequest_id: 1, + shard_id: Some(shard_id_2.clone()), + ..Default::default() + }; + workbench.record_persist_failure(&persist_failure); + + let persist_success = PersistSuccess { + subrequest_id: 1, + shard_id: Some(shard_id_2.clone()), + replication_position_inclusive: Some(Position::offset(66usize)), + ..Default::default() + }; + + // retry to persist shard 2 + + let persist_request = PersistRequest { + subrequests: vec![PersistSubrequest { + subrequest_id: 1, + shard_id: Some(shard_id_2.clone()), + ..Default::default() + }], + ..Default::default() + }; + workbench.record_persist_request(&persist_request); + + workbench.record_persist_success(persist_success); + + assert!(workbench.is_complete()); + assert_eq!(workbench.num_successes, 2); + assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 0); + + event_broker.publish(ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: IndexUid::for_test("test-index", 0), + source_id: "test-source".to_string(), + }, + updated_shard_positions: vec![ + (shard_id_1, Position::offset(42usize)), + (shard_id_2, Position::offset(66usize)), + ] + .into_iter() + .collect(), + }); + + let ingest_response = workbench.into_ingest_result().await; + assert_eq!(ingest_response.successes.len(), 2); + assert_eq!(ingest_response.failures.len(), 0); + } + + #[tokio::test] + async fn test_workbench_publish_tracking_waits() { + let event_broker = EventBroker::default(); + let shard_id_1 = ShardId::from("test-shard-1"); + let shard_id_2 = ShardId::from("test-shard-2"); + let ingest_subrequests = vec![ + IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 1, + ..Default::default() + }, + ]; + let mut workbench = + IngestWorkbench::new_with_publish_tracking(ingest_subrequests, 1, event_broker.clone()); + + let persist_request = PersistRequest { + subrequests: vec![ + PersistSubrequest { + subrequest_id: 0, + shard_id: Some(shard_id_1.clone()), + ..Default::default() + }, + PersistSubrequest { + subrequest_id: 1, + shard_id: Some(shard_id_2.clone()), + ..Default::default() + }, + ], + ..Default::default() + }; + workbench.record_persist_request(&persist_request); + + let persist_success = PersistSuccess { + subrequest_id: 0, + shard_id: Some(shard_id_1.clone()), + replication_position_inclusive: Some(Position::offset(42usize)), + ..Default::default() + }; + workbench.record_persist_success(persist_success); + + let persist_success = PersistSuccess { + subrequest_id: 1, + shard_id: Some(shard_id_2.clone()), + replication_position_inclusive: Some(Position::offset(66usize)), + ..Default::default() + }; + workbench.record_persist_success(persist_success); + + assert!(workbench.is_complete()); + assert_eq!(workbench.num_successes, 2); + assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 0); + + event_broker.publish(ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: IndexUid::for_test("test-index", 0), + source_id: "test-source".to_string(), + }, + updated_shard_positions: vec![(shard_id_2, Position::offset(66usize))] + .into_iter() + .collect(), + }); + // still waits for shard 1 to be published + tokio::time::timeout(Duration::from_millis(200), workbench.into_ingest_result()) + .await + .unwrap_err(); + } + #[test] fn test_ingest_workbench_record_get_or_create_open_shards_failure() { let ingest_subrequests = vec![IngestSubrequest { @@ -680,10 +907,10 @@ mod tests { assert_eq!(subworkbench.num_attempts, 1); } - #[test] - fn test_ingest_workbench_into_ingest_result() { + #[tokio::test] + async fn test_ingest_workbench_into_ingest_result() { let workbench = IngestWorkbench::new(Vec::new(), 0); - let response = workbench.into_ingest_result(); + let response = workbench.into_ingest_result().await; assert!(response.successes.is_empty()); assert!(response.failures.is_empty()); @@ -706,7 +933,7 @@ mod tests { workbench.record_no_shards_available(1); - let response = workbench.into_ingest_result(); + let response = workbench.into_ingest_result().await; assert_eq!(response.successes.len(), 1); assert_eq!(response.successes[0].subrequest_id, 0); @@ -725,7 +952,7 @@ mod tests { let failure = SubworkbenchFailure::Persist(PersistFailureReason::Timeout); workbench.record_failure(0, failure); - let ingest_response = workbench.into_ingest_result(); + let ingest_response = workbench.into_ingest_result().await; assert_eq!(ingest_response.successes.len(), 0); assert_eq!( ingest_response.failures[0].reason(), diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs index e109facf105..915a24dd7a4 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs @@ -19,12 +19,14 @@ use std::time::Duration; +use futures_util::FutureExt; use hyper::StatusCode; use quickwit_config::service::QuickwitService; use quickwit_config::ConfigFormat; use quickwit_metastore::SplitState; use quickwit_rest_client::error::{ApiError, Error}; use quickwit_rest_client::rest_client::CommitType; +use quickwit_serve::ListSplitsQueryParams; use serde_json::json; use crate::ingest_json; @@ -219,7 +221,7 @@ async fn test_ingest_v2_index_not_found() { ingest_json!({"body": "doc1"}), None, None, - CommitType::WaitFor, + CommitType::Auto, ) .await .unwrap_err(); @@ -281,7 +283,7 @@ async fn test_ingest_v2_happy_path() { ingest_json!({"body": "doc1"}), None, None, - CommitType::WaitFor, + CommitType::Auto, ) .await; let Some(ingest_error) = ingest_res.err() else { @@ -317,10 +319,10 @@ async fn test_ingest_v2_happy_path() { } #[tokio::test] -async fn test_commit_modes() { +async fn test_commit_force() { initialize_tests(); - let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; - let index_id = "test_commit_modes"; + let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = "test_commit_force"; let index_config = format!( r#" version: 0.8 @@ -330,7 +332,7 @@ async fn test_commit_modes() { - name: body type: text indexing_settings: - commit_timeout_secs: 2 + commit_timeout_secs: 60 "# ); @@ -342,37 +344,133 @@ async fn test_commit_modes() { .await .unwrap(); - // TODO: make this test work with ingest v2 (#4438) - // sandbox.enable_ingest_v2(); + sandbox.enable_ingest_v2(); - // Test force commit - ingest_with_retry( - &sandbox.indexer_rest_client, - index_id, - ingest_json!({"body": "force"}), - CommitType::Force, + // commit_timeout_secs is set to a large value, so this would timeout if + // the commit isn't forced + tokio::time::timeout( + Duration::from_secs(20), + ingest_with_retry( + &sandbox.indexer_rest_client, + index_id, + ingest_json!({"body": "force"}), + CommitType::Force, + ), ) .await + .unwrap() .unwrap(); sandbox.assert_hit_count(index_id, "body:force", 1).await; - // Test wait_for commit + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_commit_wait_for() { + initialize_tests(); + let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = "test_commit_wait_for"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 3 + "# + ); + + // Create index sandbox + .indexer_rest_client + .indexes() + .create(index_config, ConfigFormat::Yaml, false) + .await + .unwrap(); + + sandbox.enable_ingest_v2(); + + // run 2 ingest requests at the same time on the same index + // wait_for shouldn't force the commit so expect only 1 published split + + let ingest_1_fut = sandbox + .indexer_rest_client + .ingest( + index_id, + ingest_json!({"body": "wait for"}), + None, + None, + CommitType::WaitFor, + ) + .then(|res| async { + res.unwrap(); + sandbox.assert_hit_count(index_id, "body:for", 1).await; + }); + + let ingest_2_fut = sandbox .indexer_rest_client .ingest( index_id, - ingest_json!({"body": "wait"}), + ingest_json!({"body": "wait again"}), None, None, CommitType::WaitFor, ) + .then(|res| async { + res.unwrap(); + sandbox.assert_hit_count(index_id, "body:again", 1).await; + }); + + tokio::join!(ingest_1_fut, ingest_2_fut); + + sandbox.assert_hit_count(index_id, "body:wait", 2).await; + + let splits_query_params = ListSplitsQueryParams { + split_states: Some(vec![SplitState::Published]), + ..Default::default() + }; + let published_splits = sandbox + .indexer_rest_client + .splits(index_id) + .list(splits_query_params) + .await + .unwrap(); + assert_eq!(published_splits.len(), 1); + + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_commit_auto() { + initialize_tests(); + let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = "test_commit_auto"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 2 + "# + ); + + sandbox + .indexer_rest_client + .indexes() + .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); - sandbox.assert_hit_count(index_id, "body:wait", 1).await; + sandbox.enable_ingest_v2(); - // Test auto commit sandbox .indexer_rest_client .ingest( @@ -388,13 +486,12 @@ async fn test_commit_modes() { sandbox.assert_hit_count(index_id, "body:auto", 0).await; sandbox - .wait_for_splits(index_id, Some(vec![SplitState::Published]), 3) + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) .await .unwrap(); sandbox.assert_hit_count(index_id, "body:auto", 1).await; - // Clean up sandbox.shutdown().await.unwrap(); } @@ -445,7 +542,7 @@ async fn test_very_large_index_name() { &sandbox.indexer_rest_client, index_id, ingest_json!({"body": "not too long"}), - CommitType::WaitFor, + CommitType::Auto, ) .await .unwrap(); @@ -605,7 +702,7 @@ async fn test_shutdown_control_plane_early_shutdown() { &sandbox.indexer_rest_client, index_id, ingest_json!({"body": "one"}), - CommitType::WaitFor, + CommitType::Force, ) .await .unwrap(); @@ -664,7 +761,7 @@ async fn test_shutdown_separate_indexer() { &sandbox.indexer_rest_client, index_id, ingest_json!({"body": "one"}), - CommitType::WaitFor, + CommitType::Force, ) .await .unwrap(); diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index 404bbf7c660..0eb000b675f 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -49,7 +49,7 @@ message ShardPKey { enum CommitTypeV2 { COMMIT_TYPE_V2_UNSPECIFIED = 0; COMMIT_TYPE_V2_AUTO = 1; - COMMIT_TYPE_V2_WAIT = 2; + COMMIT_TYPE_V2_WAIT_FOR = 2; COMMIT_TYPE_V2_FORCE = 3; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs index 9b071d20db4..0790c8a7416 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs @@ -122,7 +122,7 @@ pub struct ParseFailure { pub enum CommitTypeV2 { Unspecified = 0, Auto = 1, - Wait = 2, + WaitFor = 2, Force = 3, } impl CommitTypeV2 { @@ -134,7 +134,7 @@ impl CommitTypeV2 { match self { CommitTypeV2::Unspecified => "COMMIT_TYPE_V2_UNSPECIFIED", CommitTypeV2::Auto => "COMMIT_TYPE_V2_AUTO", - CommitTypeV2::Wait => "COMMIT_TYPE_V2_WAIT", + CommitTypeV2::WaitFor => "COMMIT_TYPE_V2_WAIT_FOR", CommitTypeV2::Force => "COMMIT_TYPE_V2_FORCE", } } @@ -143,7 +143,7 @@ impl CommitTypeV2 { match value { "COMMIT_TYPE_V2_UNSPECIFIED" => Some(Self::Unspecified), "COMMIT_TYPE_V2_AUTO" => Some(Self::Auto), - "COMMIT_TYPE_V2_WAIT" => Some(Self::Wait), + "COMMIT_TYPE_V2_WAIT_FOR" => Some(Self::WaitFor), "COMMIT_TYPE_V2_FORCE" => Some(Self::Force), _ => None, } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index e068483730a..1220fcc415c 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -30,7 +30,6 @@ use quickwit_proto::ingest::router::{ use quickwit_proto::ingest::CommitTypeV2; use quickwit_proto::types::{DocUid, IndexId}; use serde::{Deserialize, Serialize}; -use tracing::warn; use super::model::ElasticException; use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError}; @@ -140,9 +139,6 @@ pub(crate) async fn elastic_bulk_ingest_v2( } let commit_type: CommitTypeV2 = bulk_options.refresh.into(); - if commit_type != CommitTypeV2::Auto { - warn!("ingest API v2 does not support the `refresh` parameter (yet)"); - } let ingest_request_opt = ingest_request_builder.build(INGEST_V2_SOURCE_ID, commit_type); let Some(ingest_request) = ingest_request_opt else { @@ -781,4 +777,40 @@ mod tests { assert_eq!(error.exception, ElasticException::IndexNotFound); assert_eq!(error.reason, "no such index [test-index-bar]"); } + + #[tokio::test] + async fn test_refresh_param() { + let mut mock_ingest_router = MockIngestRouterService::new(); + mock_ingest_router + .expect_ingest() + .once() + .returning(|ingest_request| { + assert_eq!(ingest_request.commit_type(), CommitTypeV2::WaitFor); + Ok(IngestResponseV2 { + successes: vec![IngestSuccess { + subrequest_id: 0, + index_uid: Some(IndexUid::for_test("my-index-1", 0)), + source_id: INGEST_V2_SOURCE_ID.to_string(), + shard_id: Some(ShardId::from(1)), + replication_position_inclusive: Some(Position::offset(1u64)), + num_ingested_docs: 2, + parse_failures: Vec::new(), + }], + failures: Vec::new(), + }) + }); + let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router); + let handler = es_compat_bulk_handler_v2(ingest_router); + + let payload = r#" + {"create": {"_index": "my-index-1", "_id" : "1"}} + {"ts": 1, "message": "my-message-1"} + "#; + warp::test::request() + .path("/_elastic/_bulk?refresh=wait_for") + .method("POST") + .body(payload) + .reply(&handler) + .await; + } } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/bulk_query_params.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/bulk_query_params.rs index a1d982b4e25..e9b415c8248 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/bulk_query_params.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/bulk_query_params.rs @@ -68,7 +68,7 @@ impl From for CommitTypeV2 { match val { ElasticRefresh::False => Self::Auto, ElasticRefresh::True => Self::Force, - ElasticRefresh::WaitFor => Self::Wait, + ElasticRefresh::WaitFor => Self::WaitFor, } } } diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index ee086902522..713b361bbeb 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -27,6 +27,7 @@ use quickwit_proto::ingest::router::{ IngestRequestV2, IngestResponseV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest, }; +use quickwit_proto::ingest::CommitTypeV2; use quickwit_proto::types::{DocUidGenerator, IndexId}; use serde::Deserialize; use warp::{Filter, Rejection}; @@ -56,6 +57,13 @@ struct IngestOptions { commit_type: CommitType, } +#[derive(Clone, Debug, Default, Deserialize, PartialEq)] +struct IngestV2Options { + #[serde(alias = "commit")] + #[serde(default)] + commit_type: CommitTypeV2, +} + pub(crate) fn ingest_api_handlers( ingest_router: IngestRouterServiceClient, ingest_service: IngestServiceClient, @@ -92,14 +100,14 @@ fn ingest_handler( fn ingest_v2_filter( config: IngestApiConfig, -) -> impl Filter + Clone { +) -> impl Filter + Clone { warp::path!(String / "ingest-v2") .and(warp::post()) .and(warp::body::content_length_limit( config.content_length_limit.as_u64(), )) .and(get_body_bytes()) - .and(serde_qs::warp::query::( + .and(serde_qs::warp::query::( serde_qs::Config::default(), )) } @@ -118,7 +126,7 @@ fn ingest_v2_handler( async fn ingest_v2( index_id: IndexId, body: Body, - ingest_options: IngestOptions, + ingest_options: IngestV2Options, ingest_router: IngestRouterServiceClient, ) -> Result { let mut doc_batch_builder = DocBatchV2Builder::default(); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 4de79fe5d1b..9b816333528 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -864,8 +864,9 @@ async fn setup_ingest_v2( control_plane.clone(), ingester_pool.clone(), replication_factor, + event_broker.clone(), ); - ingest_router.subscribe(event_broker); + ingest_router.subscribe(); let ingest_router_service = IngestRouterServiceClient::tower() .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml index ec6e9f81a3d..b6d2df1053f 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml @@ -66,4 +66,3 @@ params: refresh: "true" headers: {"Content-Type": "application/json", "content-encoding": "gzip"} body_from_file: gharchive-bulk.json.gz -sleep_after: 3