From f6ff304feb9a699d4ef34b3ec6d25df2e9335257 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 1 Aug 2024 00:00:47 +0900 Subject: [PATCH] Avoid deserializing QueryAST for every split. --- quickwit/quickwit-search/src/leaf.rs | 82 ++++++++++++++++++---------- 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index b225a53c13d..f997c17e81b 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -44,6 +44,7 @@ use tantivy::directory::FileSlice; use tantivy::fastfield::FastFieldReaders; use tantivy::schema::Field; use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; +use tokio::task::JoinError; use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; @@ -342,9 +343,11 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { } /// Apply a leaf search on a single split. +#[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( searcher_context: &SearcherContext, mut search_request: SearchRequest, + query_ast: Arc, storage: Arc, split: SplitIdAndFooterOffsets, doc_mapper: Arc, @@ -363,9 +366,6 @@ async fn leaf_search_single_split( return Ok(cached_answer); } - let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - // CanSplitDoBetter or rewrite_request may have changed the request to be a count only request // This may be the case for AllQuery with a sort by date and time filter, where the current // split can't have better results. @@ -1217,12 +1217,17 @@ pub async fn leaf_search( let split_filter = Arc::new(RwLock::new(split_filter)); - let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len()); + let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> = + Vec::with_capacity(split_with_req.len()); let merge_collector = make_merge_collector(&request, &aggregations_limits)?; let incremental_merge_collector = IncrementalCollector::new(merge_collector); let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); + let query_ast: Arc = serde_json::from_str::(&request.query_ast) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))? + .into(); + for (split, mut request) in split_with_req { let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore .clone() @@ -1236,26 +1241,46 @@ pub async fn leaf_search( continue; } - leaf_search_single_split_futures.push(tokio::spawn( - leaf_search_single_split_wrapper( - request, - searcher_context.clone(), - index_storage.clone(), - doc_mapper.clone(), - split, - split_filter.clone(), - incremental_merge_collector.clone(), - leaf_split_search_permit, - aggregations_limits.clone(), - ) - .in_current_span(), + leaf_search_single_split_join_handles.push(( + split.split_id.clone(), + tokio::spawn( + leaf_search_single_split_wrapper( + request, + query_ast.clone(), + searcher_context.clone(), + index_storage.clone(), + doc_mapper.clone(), + split, + split_filter.clone(), + incremental_merge_collector.clone(), + leaf_split_search_permit, + aggregations_limits.clone(), + ) + .in_current_span(), + ), )); } // TODO we could cancel running splits when !run_all_splits and the running split can no // longer give better results after some other split answered. - let split_search_results: Vec> = - futures::future::join_all(leaf_search_single_split_futures).await; + let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new(); + + // There is no need to use `join_all`, as these are spawned tasks. + for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles { + // splits that did not panic were already added to the collector + if let Err(join_error) = leaf_search_join_handle.await { + if join_error.is_cancelled() { + // An explicit task cancellation is not an error. + continue; + } + if join_error.is_panic() { + error!(split=%split, "leaf search task panicked"); + } else { + error!(split=%split, "please report: leaf search was not cancelled, and could not extract panic. this should never happen"); + } + split_search_join_errors.push((split, join_error)); + } + } // we can't use unwrap_or_clone because mutexes aren't Clone let mut incremental_merge_collector = match Arc::try_unwrap(incremental_merge_collector) { @@ -1263,17 +1288,12 @@ pub async fn leaf_search( Err(filter_merger) => filter_merger.lock().unwrap().clone(), }; - for result in split_search_results { - // splits that did not panic were already added to the collector - if let Err(e) = result { - incremental_merge_collector.add_failed_split(SplitSearchError { - // we could reasonably add a wrapper to the JoinHandle to give us the - // split_id anyway - split_id: "unknown".to_string(), - error: format!("{}", SearchError::from(e)), - retryable_error: true, - }) - } + for (split_id, split_search_join_error) in split_search_join_errors { + incremental_merge_collector.add_failed_split(SplitSearchError { + split_id, + error: format!("{}", SearchError::from(split_search_join_error)), + retryable_error: true, + }); } let result = crate::search_thread_pool() @@ -1289,6 +1309,7 @@ pub async fn leaf_search( #[instrument(skip_all, fields(split_id = split.split_id))] async fn leaf_search_single_split_wrapper( request: SearchRequest, + query_ast: Arc, searcher_context: Arc, index_storage: Arc, doc_mapper: Arc, @@ -1305,6 +1326,7 @@ async fn leaf_search_single_split_wrapper( let leaf_search_single_split_res = leaf_search_single_split( &searcher_context, request, + query_ast, index_storage, split.clone(), doc_mapper,