From 76edd6db93a30faec875c9eca0ad3ac1076ae0e5 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 26 Nov 2024 10:03:52 +0100 Subject: [PATCH 01/12] Measure and log the amount of memory taken by a split search, and log this. --- quickwit/Cargo.lock | 19 ++++-- .../src/caching_directory.rs | 22 ++++--- .../protos/quickwit/search.proto | 10 +++ .../src/codegen/quickwit/quickwit.search.rs | 17 +++++ .../quickwit-search/src/cluster_client.rs | 10 ++- quickwit/quickwit-search/src/collector.rs | 23 ++++++- quickwit/quickwit-search/src/fetch_docs.rs | 2 +- quickwit/quickwit-search/src/leaf.rs | 63 ++++++++++++------- quickwit/quickwit-search/src/lib.rs | 22 ++++++- quickwit/quickwit-search/src/list_terms.rs | 7 ++- quickwit/quickwit-search/src/root.rs | 54 ++++++++++++++-- .../quickwit-search/src/search_stream/leaf.rs | 7 ++- .../src/cache/byte_range_cache.rs | 52 ++++++++++----- 13 files changed, 245 insertions(+), 63 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 1662803ed0b..11cbae2c968 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3502,12 +3502,23 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.5.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", ] [[package]] diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index b90f444d062..e55c49f7fad 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -33,7 +33,7 @@ use tantivy::{Directory, HasLen}; pub struct CachingDirectory { underlying: Arc, // TODO fixme: that's a pretty ugly cache we have here. - cache: Arc, + cache: ByteRangeCache, } impl CachingDirectory { @@ -42,12 +42,18 @@ impl CachingDirectory { /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. pub fn new_unbounded(underlying: Arc) -> CachingDirectory { - CachingDirectory { - underlying, - cache: Arc::new(ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - )), - } + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + ); + CachingDirectory::new(underlying, byte_range_cache) + } + + /// Creates a new CachingDirectory. + /// + /// Warming: The resulting CacheDirectory will cache all information without ever + /// removing any item from the cache. + pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { + CachingDirectory { underlying, cache } } } @@ -59,7 +65,7 @@ impl fmt::Debug for CachingDirectory { struct CachingFileHandle { path: PathBuf, - cache: Arc, + cache: ByteRangeCache, underlying_filehandle: Arc, } diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 60671239ecc..1213ce2040e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -347,6 +347,14 @@ message LeafSearchRequest { repeated string index_uris = 9; } +message ResourceStats { + uint64 short_lived_cache_num_bytes = 1; + uint64 split_num_docs = 2; + uint64 warmup_microsecs = 3; + uint64 cpu_thread_pool_wait_microsecs = 4; + uint64 cpu_microsecs = 5; +} + /// LeafRequestRef references data in LeafSearchRequest to deduplicate data. message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` @@ -479,6 +487,8 @@ message LeafSearchResponse { // postcard serialized intermediate aggregation_result. optional bytes intermediate_aggregation_result = 6; + + ResourceStats resource_stats = 8; } message SnippetRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 3fc4d5bdcaa..e29cae37fec 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -286,6 +286,21 @@ pub struct LeafSearchRequest { #[prost(string, repeated, tag = "9")] pub index_uris: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceStats { + #[prost(uint64, tag = "1")] + pub short_lived_cache_num_bytes: u64, + #[prost(uint64, tag = "2")] + pub split_num_docs: u64, + #[prost(uint64, tag = "3")] + pub warmup_microsecs: u64, + #[prost(uint64, tag = "4")] + pub cpu_thread_pool_wait_microsecs: u64, + #[prost(uint64, tag = "5")] + pub cpu_microsecs: u64, +} /// / LeafRequestRef references data in LeafSearchRequest to deduplicate data. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -457,6 +472,8 @@ pub struct LeafSearchResponse { pub intermediate_aggregation_result: ::core::option::Option< ::prost::alloc::vec::Vec, >, + #[prost(message, optional, tag = "8")] + pub resource_stats: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index b8042f03fb7..ec5a093f25e 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn}; use crate::retry::search::LeafSearchRetryPolicy; use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds}; use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy}; -use crate::{SearchError, SearchJobPlacer, SearchServiceClient}; +use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient}; /// Maximum number of put requests emitted to perform a replicated given PUT KV. const MAX_PUT_KV_ATTEMPTS: usize = 6; @@ -317,6 +317,13 @@ fn merge_original_with_retry_leaf_search_response( (Some(left), None) => Some(left), (None, None) => None, }; + let mut stats = [ + original_response.resource_stats.as_ref(), + retry_response.resource_stats.as_ref(), + ] + .into_iter() + .flat_map(|el_opt| el_opt); + let resource_stats = merge_resource_stats_it(&mut stats); Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: original_response.num_hits + retry_response.num_hits, @@ -326,6 +333,7 @@ fn merge_original_with_retry_leaf_search_response( partial_hits: original_response.partial_hits, num_successful_splits: original_response.num_successful_splits + retry_response.num_successful_splits, + resource_stats, }) } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 4b69348ecde..cee95f62c26 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -25,8 +25,8 @@ use itertools::Itertools; use quickwit_common::binary_heap::{SortKeyMapper, TopK}; use quickwit_doc_mapper::WarmupInfo; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue, - SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortOrder, + SortValue, SplitSearchError, }; use quickwit_proto::types::SplitId; use serde::Deserialize; @@ -40,7 +40,7 @@ use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyErro use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span}; use crate::top_k_collector::{specialized_top_k_segment_collector, QuickwitSegmentTopKCollector}; -use crate::GlobalDocAddress; +use crate::{merge_resource_stats, merge_resource_stats_it, GlobalDocAddress}; #[derive(Clone, Debug)] pub(crate) enum SortByComponent { @@ -587,6 +587,7 @@ impl SegmentCollector for QuickwitSegmentCollector { } None => None, }; + Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: self.num_hits, @@ -594,6 +595,7 @@ impl SegmentCollector for QuickwitSegmentCollector { failed_splits: Vec::new(), num_attempted_splits: 1, num_successful_splits: 1, + resource_stats: None, }) } } @@ -919,6 +921,11 @@ fn merge_leaf_responses( return Ok(leaf_responses.pop().unwrap()); } + let mut resource_stats_it = leaf_responses + .iter() + .flat_map(|leaf_response| leaf_response.resource_stats.as_ref()); + let merged_resource_stats = merge_resource_stats_it(&mut resource_stats_it); + let merged_intermediate_aggregation_result: Option> = merge_intermediate_aggregation_result( aggregations_opt, @@ -960,6 +967,7 @@ fn merge_leaf_responses( failed_splits, num_attempted_splits, num_successful_splits, + resource_stats: merged_resource_stats, }) } @@ -1183,6 +1191,7 @@ pub(crate) struct IncrementalCollector { num_attempted_splits: u64, num_successful_splits: u64, start_offset: usize, + resource_stats: ResourceStats, } impl IncrementalCollector { @@ -1203,6 +1212,7 @@ impl IncrementalCollector { failed_splits: Vec::new(), num_attempted_splits: 0, num_successful_splits: 0, + resource_stats: ResourceStats::default(), } } @@ -1215,8 +1225,13 @@ impl IncrementalCollector { num_attempted_splits, intermediate_aggregation_result, num_successful_splits, + resource_stats, } = leaf_response; + if let Some(leaf_resource_stats) = &resource_stats { + merge_resource_stats(leaf_resource_stats, &mut self.resource_stats); + } + self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); self.failed_splits.extend(failed_splits); @@ -1266,6 +1281,7 @@ impl IncrementalCollector { num_attempted_splits: self.num_attempted_splits, num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, + resource_stats: Some(self.resource_stats), }) } } @@ -1772,6 +1788,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }], ); diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 9c326764539..e8cf5c3a1fb 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -179,7 +179,7 @@ async fn fetch_docs_in_split( index_storage, split, Some(doc_mapper.tokenizer_manager()), - false, + None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 5ad92f63aa2..94d6bd0ccb1 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -22,6 +22,7 @@ use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; use anyhow::Context; use futures::future::try_join_all; @@ -29,21 +30,20 @@ use quickwit_common::pretty::PrettySample; use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, - SortValue, SplitIdAndFooterOffsets, SplitSearchError, + CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError }; use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ - wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, - StorageResolver, TimeoutAndRetryStorage, + wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes, + SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage, }; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; use tantivy::aggregation::AggregationLimitsGuard; use tantivy::directory::FileSlice; use tantivy::fastfield::FastFieldReaders; use tantivy::schema::Field; -use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; +use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term}; use tokio::task::JoinError; use tracing::*; @@ -134,7 +134,7 @@ pub(crate) async fn open_index_with_caches( index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, + ephemeral_unbounded_cache: Option, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -166,8 +166,8 @@ pub(crate) async fn open_index_with_caches( let directory = StorageDirectory::new(bundle_storage_with_cache); - let hot_directory = if ephemeral_unbounded_cache { - let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); + let hot_directory = if let Some(cache) = ephemeral_unbounded_cache { + let caching_directory = CachingDirectory::new(Arc::new(directory), cache); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? @@ -363,6 +363,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { num_attempted_splits: 1, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, } } @@ -400,12 +401,14 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); + let byte_range_cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); let index = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(byte_range_cache.clone()), ) .await?; let split_schema = index.schema(); @@ -425,7 +428,15 @@ async fn leaf_search_single_split( warmup_info.merge(collector_warmup_info); warmup_info.simplify(); + let warmup_start = Instant::now(); warmup(&searcher, &warmup_info).await?; + let warmup_end = Instant::now(); + let warmup_duration: Duration = warmup_end.duration_since(warmup_start); + + let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes(); + let split_num_docs = split.num_docs; + + let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { @@ -433,25 +444,31 @@ async fn leaf_search_single_split( crate::search_thread_pool() .run_cpu_intensive(move || { + let cpu_start = Instant::now(); + let cpu_thread_pool_wait_microsecs = cpu_start.duration_since(warmup_end); let _span_guard = span.enter(); // Our search execution has been scheduled, let's check if we can improve the // request based on the results of the preceding searches check_optimize_search_request(&mut search_request, &split, &split_filter); collector.update_search_param(&search_request); - if is_metadata_count_request_with_ast(&query_ast, &search_request) { - return Ok(( - search_request, - get_leaf_resp_from_count(searcher.num_docs() as u64), - )); - } - if collector.is_count_only() { - let count = query.count(&searcher)? as u64; - Ok((search_request, get_leaf_resp_from_count(count))) - } else { - searcher - .search(&query, &collector) - .map(|resp| (search_request, resp)) - } + let mut leaf_search_response: LeafSearchResponse = + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + get_leaf_resp_from_count(searcher.num_docs() as u64) + } else if collector.is_count_only() { + let count = query.count(&searcher)? as u64; + get_leaf_resp_from_count(count) + } else { + searcher.search(&query, &collector)? + }; + leaf_search_response.resource_stats = Some(quickwit_proto::search::ResourceStats { + cpu_microsecs: cpu_start.elapsed().as_micros() as u64, + short_lived_cache_num_bytes, + split_num_docs, + warmup_microsecs: warmup_duration.as_micros() as u64, + cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros() + as u64, + }); + Result::<_, TantivyError>::Ok((search_request, leaf_search_response)) }) .await .map_err(|_| { diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index a81a974d75d..181879b91a6 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -72,7 +72,9 @@ use quickwit_metastore::{ IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, }; -use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; +use quickwit_proto::search::{ + PartialHit, ResourceStats, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, +}; use quickwit_proto::types::IndexUid; use quickwit_storage::StorageResolver; pub use service::SearcherContext; @@ -340,3 +342,21 @@ pub fn searcher_pool_for_test( }), ) } + +pub(crate) fn merge_resource_stats_it( + stats_it: &mut dyn Iterator, +) -> Option { + let mut acc_stats: ResourceStats = stats_it.next()?.clone(); + for new_stats in stats_it { + merge_resource_stats(new_stats, &mut acc_stats); + } + Some(acc_stats) +} + +fn merge_resource_stats(new_stats: &ResourceStats, stat_accs: &mut ResourceStats) { + stat_accs.short_lived_cache_num_bytes += new_stats.short_lived_cache_num_bytes; + stat_accs.split_num_docs += new_stats.split_num_docs; + stat_accs.warmup_microsecs += new_stats.warmup_microsecs; + stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs; + stat_accs.cpu_microsecs += new_stats.cpu_microsecs; +} diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 765203438d1..43faf0c95a9 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -33,7 +33,7 @@ use quickwit_proto::search::{ SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_proto::types::IndexUid; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::schema::{Field, FieldType}; use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; @@ -216,7 +216,10 @@ async fn leaf_list_terms_single_split( storage: Arc, split: SplitIdAndFooterOffsets, ) -> crate::Result { - let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let index = + open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; let split_schema = index.schema(); let reader = index .reader_builder() diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 608bc87e479..5e4e1e5804e 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; use std::time::Duration; @@ -49,7 +50,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, info, info_span, instrument}; +use tracing::{debug, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -683,10 +684,44 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec bool { + // It is not worth considering small splits for this. + if split_num_docs < 100_000 { + return false; + } + // We multiply those figure by 1_000 for accuracy. + const PERCENTILE: u64 = 95; + const PRIOR_NUM_BYTES_PER_DOC: u64 = 3 * 1_000; + static NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR: AtomicU64 = AtomicU64::new(PRIOR_NUM_BYTES_PER_DOC); + let num_bits_per_docs = num_bytes * 1_000 / split_num_docs; + let current_estimator = NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.load(Ordering::Relaxed); + let is_memory_intensive = num_bits_per_docs > current_estimator; + let new_estimator: u64 = if is_memory_intensive { + current_estimator.saturating_add(PRIOR_NUM_BYTES_PER_DOC * PERCENTILE / 100) + } else { + current_estimator.saturating_sub(PRIOR_NUM_BYTES_PER_DOC * (100 - PERCENTILE) / 100) + }; + // We do not use fetch_add / fetch_sub directly as they wrap around. + // Concurrency could lead to different results here, but really we don't care. + // + // This is just ignoring some gradient updates. + NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.store(new_estimator, Ordering::Relaxed); + is_memory_intensive +} + /// If this method fails for some splits, a partial search response is returned, with the list of /// faulty splits in the failed_splits field. #[instrument(level = "debug", skip_all)] @@ -744,9 +779,18 @@ pub(crate) async fn search_partial_hits_phase( has_intermediate_aggregation_result = leaf_search_response.intermediate_aggregation_result.is_some(), "Merged leaf search response." ); + + if let Some(resource_stats) = &leaf_search_response.resource_stats { + if is_top_1pct_memory_intensive(resource_stats.short_lived_cache_num_bytes, resource_stats.split_num_docs) { + // We log at most 5 times per minute. + quickwit_common::rate_limited_info!(limit_per_min=5, split_num_docs=resource_stats.split_num_docs, %search_request.query_ast, short_lived_cached_num_bytes=resource_stats.short_lived_cache_num_bytes, query=%search_request.query_ast, "memory intensive query"); + } + } + if !leaf_search_response.failed_splits.is_empty() { quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } + Ok(leaf_search_response) } @@ -1107,14 +1151,13 @@ async fn refine_and_list_matches( /// 2. Merges the search results. /// 3. Sends fetch docs requests to multiple leaf nodes. /// 4. Builds the response with docs and returns. -#[instrument(skip_all)] +#[instrument(skip_all, fields(request_id))] pub async fn root_search( searcher_context: &SearcherContext, mut search_request: SearchRequest, mut metastore: MetastoreServiceClient, cluster_client: &ClusterClient, ) -> crate::Result { - info!(searcher_context = ?searcher_context, search_request = ?search_request); let start_instant = tokio::time::Instant::now(); let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1169,9 +1212,12 @@ pub async fn root_search( ) .await; + let elapsed = start_instant.elapsed(); + if let Ok(search_response) = &mut search_response_result { - search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + search_response.elapsed_time_micros = elapsed.as_micros() as u64; } + let label_values = if search_response_result.is_ok() { ["success"] } else { diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 941e0d12612..ed7d39965a9 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -29,7 +29,7 @@ use quickwit_proto::search::{ LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType}; use tantivy::fastfield::Column; use tantivy::query::Query; @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split( &split, ); + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let index = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(cache), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9ef3b7f523f..c9f261c347b 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -21,7 +21,8 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use tantivy::directory::OwnedBytes; @@ -344,31 +345,54 @@ impl Drop for NeedMutByteRangeCache { /// cached data, the changes may or may not get recorded. /// /// At the moment this is hardly a cache as it features no eviction policy. +#[derive(Clone)] pub struct ByteRangeCache { - inner: Mutex>, + inner_arc: Arc, +} + +struct Inner { + num_stored_bytes: AtomicU64, + need_mut_byte_range_cache: Mutex>, } impl ByteRangeCache { /// Creates a slice cache that never removes any entry. pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { + let need_mut_byte_range_cache = + NeedMutByteRangeCache::with_infinite_capacity(cache_counters); + let inner = Inner { + num_stored_bytes: AtomicU64::default(), + need_mut_byte_range_cache: Mutex::new(need_mut_byte_range_cache), + }; ByteRangeCache { - inner: Mutex::new(NeedMutByteRangeCache::with_infinite_capacity( - cache_counters, - )), + inner_arc: Arc::new(inner), } } + /// Overall amount of bytes stored in the cache. + pub fn get_num_bytes(&self) -> u64 { + self.inner_arc.num_stored_bytes.load(Ordering::Relaxed) + } + /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { - self.inner.lock().unwrap().get_slice(path, byte_range) + self.inner_arc + .need_mut_byte_range_cache + .lock() + .unwrap() + .get_slice(path, byte_range) } /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - self.inner - .lock() - .unwrap() - .put_slice(path, byte_range, bytes) + let mut need_mut_byte_range_cache_locked = + self.inner_arc.need_mut_byte_range_cache.lock().unwrap(); + need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); + let num_bytes = need_mut_byte_range_cache_locked.num_bytes; + drop(need_mut_byte_range_cache_locked); + self.inner_arc + .num_stored_bytes + .store(num_bytes as u64, Ordering::Relaxed); } } @@ -446,13 +470,13 @@ mod tests { .sum(); // in some case we have ranges touching each other, count_items count them // as only one, but cache count them as 2. - assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64); + assert!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() .filter(|stored| **stored) .count(); - assert_eq!(cache.inner.lock().unwrap().num_bytes, expected_byte_count as u64); + assert_eq!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_bytes, expected_byte_count as u64); } Operation::Get { range, @@ -519,7 +543,7 @@ mod tests { ); { - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -531,7 +555,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2); From 18489fc47966fc59176460bd6cde2f1f3fedcaca Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 26 Nov 2024 10:03:54 +0100 Subject: [PATCH 02/12] Limit search memory usage associated with warmup. Due to tantivy limitations, searching a split requires downloading all of the required data, and keep them in memory. We call this phase warmup. Before this PR, the only thing that curbed memory usage was the search permits: only N split search may happen concurrently. Unfortunately, the amount of data required here varies vastly. We need a mechanism to measure and avoid running more split search when memory is tight. Just using a semaphore is however not an option. We do not know beforehands how much memory will be required by a split search, so it could easily lead to a dead lock. Instead, this commit builds upon the search permit provider. The search permit provider is in charge of managing a configurable memory budget for this warmup memory. We introduce here a configurable "warmup_single_split_initial_allocation". A new leaf split search cannot be started if this memory is not available. This initial allocation is meant to be greater than what will be actually needed most of the time. The split search then holds this allocation until the end of warmup. After warmup, we can get the actual memory usage by interrogating the warmup cache. We can then update the amount of memory held. (most of the time, this should mean releasing some memory) In addition, in this PR, at this point, we also release the warmup search permit: We still have to perform the actual task of searching, but the thread pool will take care of limiting the number of concurrent task. Closes #5355 --- .../quickwit-config/src/node_config/mod.rs | 12 +++- quickwit/quickwit-search/src/leaf.rs | 6 +- .../src/search_permit_provider.rs | 66 +++++++++++++++---- quickwit/quickwit-search/src/service.rs | 2 +- 4 files changed, 68 insertions(+), 18 deletions(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 3eef1f10428..190e5b44295 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,6 +226,12 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, + + // TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation` + // TODO set serde default + pub warmup_memory_budget: ByteSize, + // TODO set serde default + pub warmup_single_split_initial_allocation: ByteSize, } /// Configuration controlling how fast a searcher should timeout a `get_slice` @@ -263,7 +269,7 @@ impl StorageTimeoutPolicy { impl Default for SearcherConfig { fn default() -> Self { - Self { + SearcherConfig { fast_field_cache_capacity: ByteSize::gb(1), split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), @@ -274,6 +280,10 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, + // TODO change this to the method used for serde default. + warmup_memory_budget: ByteSize::gb(1), + // TODO change this to the method used for serde default. + warmup_single_split_initial_allocation: ByteSize::mb(50), } } } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 94d6bd0ccb1..78dfc706b7c 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -376,6 +376,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, + search_permit: &mut SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -434,9 +435,9 @@ async fn leaf_search_single_split( let warmup_duration: Duration = warmup_end.duration_since(warmup_start); let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes(); + search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes); let split_num_docs = split.num_docs; - let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { @@ -1378,7 +1379,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - search_permit: SearchPermit, + mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1393,6 +1394,7 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, + &mut search_permit, ) .await; diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index f6883efb34b..07aad825eb1 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -20,8 +20,10 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; +use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; use tokio::sync::oneshot; +use tracing::warn; /// `SearchPermitProvider` is a distributor of permits to perform single split /// search operation. @@ -33,11 +35,14 @@ pub struct SearchPermitProvider { } impl SearchPermitProvider { - pub fn new(num_permits: usize) -> SearchPermitProvider { + pub fn new(num_permits: usize, memory_budget: ByteSize, initial_allocation: ByteSize) -> SearchPermitProvider { SearchPermitProvider { inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { num_permits_available: num_permits, + memory_budget: memory_budget.as_u64(), permits_requests: VecDeque::new(), + memory_allocated: 0u64, + initial_allocation: initial_allocation.as_u64(), })), } } @@ -65,6 +70,14 @@ impl SearchPermitProvider { struct InnerSearchPermitProvider { num_permits_available: usize, + + // Note it is possible for memory_allocated to exceed memory_budget temporarily, + // if and only if a split leaf search task ended up using more than `initial_allocation`. + // + // When it happens, new permits will not be assigned until the memory is freed. + memory_budget: u64, + memory_allocated: u64, + initial_allocation: u64, permits_requests: VecDeque>, } @@ -94,30 +107,29 @@ impl InnerSearchPermitProvider { permits } - fn recycle_permit(&mut self, inner_arc: &Arc>) { - self.num_permits_available += 1; - self.assign_available_permits(inner_arc); - } - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_permits_available > 0 { - let Some(sender) = self.permits_requests.pop_front() else { + while self.num_permits_available > 0 && self.memory_allocated + self.initial_allocation <= self.memory_budget { + let Some(permit_requester_tx) = self.permits_requests.pop_front() else { break; }; let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = sender.send(SearchPermit { + let send_res = permit_requester_tx.send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, inner_arc: inner_arc.clone(), - recycle_on_drop: true, + warmup_permit_held: true, + memory_allocation: self.initial_allocation, }); match send_res { Ok(()) => { self.num_permits_available -= 1; + self.memory_allocated += self.initial_allocation; } Err(search_permit) => { + // We cannot just decrease the num_permits_available in all case and rely on + // the drop logic here: it would cause a dead lock on the inner_arc Mutex. search_permit.drop_without_recycling_permit(); } } @@ -131,23 +143,49 @@ impl InnerSearchPermitProvider { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, inner_arc: Arc>, - recycle_on_drop: bool, + warmup_permit_held: bool, + memory_allocation: u64, } impl SearchPermit { + /// After warm up, we have a proper estimate of the memory usage of a single split leaf search. + /// + /// We can then set the actual memory usage. + pub fn set_actual_memory_usage_and_release_permit_after(&mut self, new_memory_usage: u64) { + if new_memory_usage > self.memory_allocation { + warn!(memory_usage=new_memory_usage, memory_allocation=self.memory_allocation, "current leaf search is consuming more memory than the initial allocation"); + } + let mut inner_guard = self.inner_arc.lock().unwrap(); + let delta = new_memory_usage as i64 - inner_guard.initial_allocation as i64; + inner_guard.memory_allocated += delta as u64; + inner_guard.num_permits_available += 1; + if inner_guard.memory_allocated > inner_guard.memory_budget { + warn!(memory_allocated=inner_guard.memory_allocated, memory_budget=inner_guard.memory_budget, "memory allocated exceeds memory budget"); + } + self.memory_allocation = new_memory_usage; + inner_guard.assign_available_permits(&self.inner_arc); + } + fn drop_without_recycling_permit(mut self) { - self.recycle_on_drop = false; + self.warmup_permit_held = false; + self.memory_allocation = 0u64; drop(self); } } impl Drop for SearchPermit { fn drop(&mut self) { - if !self.recycle_on_drop { + // This is not just an optimization. This is necessary to avoid a dead lock when the + // permit requester dropped its receiver channel. + if !self.warmup_permit_held && self.memory_allocation == 0 { return; } let mut inner_guard = self.inner_arc.lock().unwrap(); - inner_guard.recycle_permit(&self.inner_arc.clone()); + if self.warmup_permit_held { + inner_guard.num_permits_available += 1; + } + inner_guard.memory_allocated -= self.memory_allocation; + inner_guard.assign_available_permits(&self.inner_arc); } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 0029f4dd3a7..734344f2c74 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -489,7 +489,7 @@ impl SearcherContext { &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); + SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, searcher_config.warmup_single_split_initial_allocation); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; From 96b39c22a307ebb70b1bf46da353b080c540c54d Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 26 Nov 2024 10:03:54 +0100 Subject: [PATCH 03/12] Bring some clarifications and remove single permit getter --- quickwit/quickwit-search/src/list_terms.rs | 13 +- .../src/search_permit_provider.rs | 204 ++++++++++-------- 2 files changed, 117 insertions(+), 100 deletions(-) diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 43faf0c95a9..4b80b3aed83 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -328,18 +328,17 @@ pub async fn leaf_list_terms( splits: &[SplitIdAndFooterOffsets], ) -> Result { info!(split_offsets = ?PrettySample::new(splits, 5)); + let permits = searcher_context + .search_permit_provider + .get_permits(splits.len()); let leaf_search_single_split_futures: Vec<_> = splits .iter() - .map(|split| { + .zip(permits.into_iter()) + .map(|(split, search_permit_recv)| { let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = searcher_context_clone - .search_permit_provider - .get_permit() - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); - + let _leaf_split_search_permit = search_permit_recv.await; // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 07aad825eb1..15980acf4a8 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -18,7 +18,10 @@ // along with this program. If not, see . use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; @@ -28,17 +31,22 @@ use tracing::warn; /// `SearchPermitProvider` is a distributor of permits to perform single split /// search operation. /// -/// Requests are served in order. +/// - Two types of resources are managed: memory allocations and download slots. +/// - Requests are served in order. #[derive(Clone)] pub struct SearchPermitProvider { inner_arc: Arc>, } impl SearchPermitProvider { - pub fn new(num_permits: usize, memory_budget: ByteSize, initial_allocation: ByteSize) -> SearchPermitProvider { + pub fn new( + num_download_slots: usize, + memory_budget: ByteSize, + initial_allocation: ByteSize, + ) -> SearchPermitProvider { SearchPermitProvider { inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { - num_permits_available: num_permits, + num_download_slots_available: num_download_slots, memory_budget: memory_budget.as_u64(), permits_requests: VecDeque::new(), memory_allocated: 0u64, @@ -47,29 +55,20 @@ impl SearchPermitProvider { } } - /// Returns a future permit in the form of a oneshot Receiver channel. - /// - /// At this point the permit is not acquired yet. - #[must_use] - pub fn get_permit(&self) -> oneshot::Receiver { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permit(&self.inner_arc) - } - - /// Returns a list of future permits in the form of oneshot Receiver channels. + /// Returns a list of future permits in the form of awaitable futures. /// /// The permits returned are guaranteed to be resolved in order. /// In addition, the permits are guaranteed to be resolved before permits returned by /// subsequent calls to this function (or `get_permit`). #[must_use] - pub fn get_permits(&self, num_permits: usize) -> Vec> { + pub fn get_permits(&self, num_permits: usize) -> Vec { let mut permits_lock = self.inner_arc.lock().unwrap(); permits_lock.get_permits(num_permits, &self.inner_arc) } } struct InnerSearchPermitProvider { - num_permits_available: usize, + num_download_slots_available: usize, // Note it is possible for memory_allocated to exceed memory_budget temporarily, // if and only if a split leaf search task ended up using more than `initial_allocation`. @@ -82,33 +81,29 @@ struct InnerSearchPermitProvider { } impl InnerSearchPermitProvider { - fn get_permit( - &mut self, - inner_arc: &Arc>, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - self.assign_available_permits(inner_arc); - rx - } - fn get_permits( &mut self, num_permits: usize, inner_arc: &Arc>, - ) -> Vec> { + ) -> Vec { let mut permits = Vec::with_capacity(num_permits); for _ in 0..num_permits { let (tx, rx) = oneshot::channel(); self.permits_requests.push_back(tx); - permits.push(rx); + permits.push(SearchPermitFuture(rx)); } self.assign_available_permits(inner_arc); permits } + /// Called each time a permit is requested or released + /// + /// Calling lock on `inner_arc` inside this method will cause a deadlock as + /// `&mut self` and `inner_arc` reference the same instance. fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_permits_available > 0 && self.memory_allocated + self.initial_allocation <= self.memory_budget { + while self.num_download_slots_available > 0 + && self.memory_allocated + self.initial_allocation <= self.memory_budget + { let Some(permit_requester_tx) = self.permits_requests.pop_front() else { break; }; @@ -124,7 +119,7 @@ impl InnerSearchPermitProvider { }); match send_res { Ok(()) => { - self.num_permits_available -= 1; + self.num_download_slots_available -= 1; self.memory_allocated += self.initial_allocation; } Err(search_permit) => { @@ -153,14 +148,22 @@ impl SearchPermit { /// We can then set the actual memory usage. pub fn set_actual_memory_usage_and_release_permit_after(&mut self, new_memory_usage: u64) { if new_memory_usage > self.memory_allocation { - warn!(memory_usage=new_memory_usage, memory_allocation=self.memory_allocation, "current leaf search is consuming more memory than the initial allocation"); + warn!( + memory_usage = new_memory_usage, + memory_allocation = self.memory_allocation, + "current leaf search is consuming more memory than the initial allocation" + ); } let mut inner_guard = self.inner_arc.lock().unwrap(); let delta = new_memory_usage as i64 - inner_guard.initial_allocation as i64; inner_guard.memory_allocated += delta as u64; - inner_guard.num_permits_available += 1; + inner_guard.num_download_slots_available += 1; if inner_guard.memory_allocated > inner_guard.memory_budget { - warn!(memory_allocated=inner_guard.memory_allocated, memory_budget=inner_guard.memory_budget, "memory allocated exceeds memory budget"); + warn!( + memory_allocated = inner_guard.memory_allocated, + memory_budget = inner_guard.memory_budget, + "memory allocated exceeds memory budget" + ); } self.memory_allocation = new_memory_usage; inner_guard.assign_available_permits(&self.inner_arc); @@ -182,77 +185,92 @@ impl Drop for SearchPermit { } let mut inner_guard = self.inner_arc.lock().unwrap(); if self.warmup_permit_held { - inner_guard.num_permits_available += 1; + inner_guard.num_download_slots_available += 1; } inner_guard.memory_allocated -= self.memory_allocation; inner_guard.assign_available_permits(&self.inner_arc); } } -#[cfg(test)] -mod tests { - use tokio::task::JoinSet; +pub struct SearchPermitFuture(oneshot::Receiver); - use super::*; +impl Future for SearchPermitFuture { + type Output = Option; - #[tokio::test] - async fn test_search_permits_get_permits_future() { - // We test here that `get_permits_futures` does not interleave - let search_permits = SearchPermitProvider::new(1); - let mut all_futures = Vec::new(); - let first_batch_of_permits = search_permits.get_permits(10); - assert_eq!(first_batch_of_permits.len(), 10); - all_futures.extend( - first_batch_of_permits - .into_iter() - .enumerate() - .map(move |(i, fut)| ((1, i), fut)), - ); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let receiver = Pin::new(&mut self.get_mut().0); + match receiver.poll(cx) { + Poll::Ready(Ok(search_permit)) => Poll::Ready(Some(search_permit)), + Poll::Ready(Err(_)) => panic!("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."), + Poll::Pending => Poll::Pending, + } + } +} - let second_batch_of_permits = search_permits.get_permits(10); - assert_eq!(second_batch_of_permits.len(), 10); - all_futures.extend( - second_batch_of_permits - .into_iter() - .enumerate() - .map(move |(i, fut)| ((2, i), fut)), - ); +// #[cfg(test)] +// mod tests { +// use tokio::task::JoinSet; - use rand::seq::SliceRandom; - // not super useful, considering what join set does, but still a tiny bit more sound. - all_futures.shuffle(&mut rand::thread_rng()); +// use super::*; - let mut join_set = JoinSet::new(); - for (res, fut) in all_futures { - join_set.spawn(async move { - let permit = fut.await; - (res, permit) - }); - } - let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); - while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { - ordered_result.push((batch_id, order)); - } +// #[tokio::test] +// async fn test_search_permits_get_permits_future() { +// // We test here that `get_permits_futures` does not interleave +// let search_permits = SearchPermitProvider::new(1); +// let mut all_futures = Vec::new(); +// let first_batch_of_permits = search_permits.get_permits(10); +// assert_eq!(first_batch_of_permits.len(), 10); +// all_futures.extend( +// first_batch_of_permits +// .into_iter() +// .enumerate() +// .map(move |(i, fut)| ((1, i), fut)), +// ); - assert_eq!(ordered_result.len(), 20); - for (i, res) in ordered_result[0..10].iter().enumerate() { - assert_eq!(res, &(1, i)); - } - for (i, res) in ordered_result[10..20].iter().enumerate() { - assert_eq!(res, &(2, i)); - } - } +// let second_batch_of_permits = search_permits.get_permits(10); +// assert_eq!(second_batch_of_permits.len(), 10); +// all_futures.extend( +// second_batch_of_permits +// .into_iter() +// .enumerate() +// .map(move |(i, fut)| ((2, i), fut)), +// ); - #[tokio::test] - async fn test_search_permits_receiver_race_condition() { - // Here we test that we don't have a problem if the Receiver is dropped. - // In particular, we want to check that there is not a race condition where drop attempts to - // lock the mutex. - let search_permits = SearchPermitProvider::new(1); - let permit_rx = search_permits.get_permit(); - let permit_rx2 = search_permits.get_permit(); - drop(permit_rx2); - drop(permit_rx); - let _permit_rx = search_permits.get_permit(); - } -} +// use rand::seq::SliceRandom; +// // not super useful, considering what join set does, but still a tiny bit more sound. +// all_futures.shuffle(&mut rand::thread_rng()); + +// let mut join_set = JoinSet::new(); +// for (res, fut) in all_futures { +// join_set.spawn(async move { +// let permit = fut.await; +// (res, permit) +// }); +// } +// let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); +// while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { +// ordered_result.push((batch_id, order)); +// } + +// assert_eq!(ordered_result.len(), 20); +// for (i, res) in ordered_result[0..10].iter().enumerate() { +// assert_eq!(res, &(1, i)); +// } +// for (i, res) in ordered_result[10..20].iter().enumerate() { +// assert_eq!(res, &(2, i)); +// } +// } + +// #[tokio::test] +// async fn test_search_permits_receiver_race_condition() { +// // Here we test that we don't have a problem if the Receiver is dropped. +// // In particular, we want to check that there is not a race condition where drop attempts +// to // lock the mutex. +// let search_permits = SearchPermitProvider::new(1); +// let permit_rx = search_permits.get_permit(); +// let permit_rx2 = search_permits.get_permit(); +// drop(permit_rx2); +// drop(permit_rx); +// let _permit_rx = search_permits.get_permit(); +// } +// } From 21ea04f4662bfa4be538b35a7bac7ca2bad89468 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 26 Nov 2024 10:03:54 +0100 Subject: [PATCH 04/12] Make search permit provider into an actor. Also attach the permit to the actual memory cache to ensure memory is freed at the right moment. --- .../quickwit-config/src/node_config/mod.rs | 14 +- .../src/node_config/serialize.rs | 4 +- .../src/caching_directory.rs | 33 ++- quickwit/quickwit-search/src/collector.rs | 13 +- quickwit/quickwit-search/src/fetch_docs.rs | 2 +- quickwit/quickwit-search/src/leaf.rs | 40 +-- quickwit/quickwit-search/src/leaf_cache.rs | 2 + quickwit/quickwit-search/src/list_terms.rs | 14 +- .../src/search_permit_provider.rs | 256 ++++++++++-------- .../quickwit-search/src/search_stream/leaf.rs | 7 +- quickwit/quickwit-search/src/service.rs | 7 +- .../src/cache/byte_range_cache.rs | 70 +++-- 12 files changed, 278 insertions(+), 184 deletions(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 190e5b44295..bd855ec0395 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,11 +226,7 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, - - // TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation` - // TODO set serde default pub warmup_memory_budget: ByteSize, - // TODO set serde default pub warmup_single_split_initial_allocation: ByteSize, } @@ -280,9 +276,7 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, - // TODO change this to the method used for serde default. warmup_memory_budget: ByteSize::gb(1), - // TODO change this to the method used for serde default. warmup_single_split_initial_allocation: ByteSize::mb(50), } } @@ -318,6 +312,14 @@ impl SearcherConfig { split_cache_limits.max_file_descriptors ); } + if self.warmup_single_split_initial_allocation > self.warmup_memory_budget { + anyhow::bail!( + "warmup_single_split_initial_allocation ({}) must be lower or equal to \ + warmup_memory_budget ({})", + self.warmup_single_split_initial_allocation, + self.warmup_memory_budget + ); + } } Ok(()) } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 8a1337636cf..457cf5c5fae 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -616,7 +616,9 @@ mod tests { min_throughtput_bytes_per_secs: 100_000, timeout_millis: 2_000, max_num_retries: 2 - }) + }), + warmup_memory_budget: ByteSize::gb(1), + warmup_single_split_initial_allocation: ByteSize::mb(50), } ); assert_eq!( diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index e55c49f7fad..b5093680458 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -29,11 +29,19 @@ use tantivy::directory::{FileHandle, OwnedBytes}; use tantivy::{Directory, HasLen}; /// The caching directory is a simple cache that wraps another directory. -#[derive(Clone)] -pub struct CachingDirectory { +pub struct CachingDirectory { underlying: Arc, // TODO fixme: that's a pretty ugly cache we have here. - cache: ByteRangeCache, + cache: ByteRangeCache, +} + +impl Clone for CachingDirectory { + fn clone(&self) -> Self { + CachingDirectory { + underlying: self.underlying.clone(), + cache: self.cache.clone(), + } + } } impl CachingDirectory { @@ -44,32 +52,35 @@ impl CachingDirectory { pub fn new_unbounded(underlying: Arc) -> CachingDirectory { let byte_range_cache = ByteRangeCache::with_infinite_capacity( &quickwit_storage::STORAGE_METRICS.shortlived_cache, + (), ); CachingDirectory::new(underlying, byte_range_cache) } +} +impl CachingDirectory { /// Creates a new CachingDirectory. /// /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. - pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { + pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { CachingDirectory { underlying, cache } } } -impl fmt::Debug for CachingDirectory { +impl fmt::Debug for CachingDirectory { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "CachingDirectory({:?})", self.underlying) } } -struct CachingFileHandle { +struct CachingFileHandle { path: PathBuf, - cache: ByteRangeCache, + cache: ByteRangeCache, underlying_filehandle: Arc, } -impl fmt::Debug for CachingFileHandle { +impl fmt::Debug for CachingFileHandle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -81,7 +92,7 @@ impl fmt::Debug for CachingFileHandle { } #[async_trait] -impl FileHandle for CachingFileHandle { +impl FileHandle for CachingFileHandle { fn read_bytes(&self, byte_range: Range) -> io::Result { if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) { return Ok(bytes); @@ -106,13 +117,13 @@ impl FileHandle for CachingFileHandle { } } -impl HasLen for CachingFileHandle { +impl HasLen for CachingFileHandle { fn len(&self) -> usize { self.underlying_filehandle.len() } } -impl Directory for CachingDirectory { +impl Directory for CachingDirectory { fn exists(&self, path: &Path) -> std::result::Result { self.underlying.exists(path) } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index cee95f62c26..1cba2b95a32 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -1806,7 +1806,8 @@ mod tests { failed_splits: Vec::new(), num_attempted_splits: 3, num_successful_splits: 3, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1845,6 +1846,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }, LeafSearchResponse { num_hits: 10, @@ -1863,6 +1865,7 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, }, ], ); @@ -1894,7 +1897,8 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1934,6 +1938,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }, LeafSearchResponse { num_hits: 10, @@ -1952,6 +1957,7 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, }, ], ); @@ -1983,7 +1989,8 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); // TODO would be nice to test aggregation too. diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index e8cf5c3a1fb..070109d1114 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -174,7 +174,7 @@ async fn fetch_docs_in_split( global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful // when fetching docs as we will fetch them only once. - let mut index = open_index_with_caches( + let mut index = open_index_with_caches::<()>( &searcher_context, index_storage, split, diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 78dfc706b7c..41a1fad0465 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -30,7 +30,8 @@ use quickwit_common::pretty::PrettySample; use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError + CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, + SortValue, SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; @@ -127,14 +128,18 @@ pub(crate) async fn open_split_bundle( /// Opens a `tantivy::Index` for the given split with several cache layers: /// - A split footer cache given by `SearcherContext.split_footer_cache`. /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. +/// - An ephemeral unbounded cache directory whose lifetime is tied to the +/// returned `Index`. +/// +/// TODO: generic T should be forced to SearcherPermit, but this requires the +/// search stream to also request permits. #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] -pub(crate) async fn open_index_with_caches( +pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: Option, + ephemeral_unbounded_cache: Option>, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -368,6 +373,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { } /// Apply a leaf search on a single split. +#[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( searcher_context: &SearcherContext, mut search_request: SearchRequest, @@ -376,7 +382,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, - search_permit: &mut SearchPermit, + search_permit: SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -402,8 +408,10 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); - let byte_range_cache = - ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + search_permit, + ); let index = open_index_with_caches( searcher_context, storage, @@ -433,9 +441,8 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let warmup_end = Instant::now(); let warmup_duration: Duration = warmup_end.duration_since(warmup_start); - - let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes(); - search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes); + let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes(); + byte_range_cache.track(|permit| permit.warmup_completed(short_lived_cache_num_bytes)); let split_num_docs = split.num_docs; let span = info_span!("tantivy_search"); @@ -1281,15 +1288,15 @@ pub async fn leaf_search( // do no interleave with other leaf search requests. let permit_futures = searcher_context .search_permit_provider - .get_permits(split_with_req.len()); + .get_permits(split_with_req.len()) + .await; for ((split, mut request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) { let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + .await; let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); if !can_be_better && !run_all_splits { @@ -1379,7 +1386,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - mut search_permit: SearchPermit, + search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1394,13 +1401,10 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, - &mut search_permit, + search_permit, ) .await; - // We explicitly drop it, to highlight it to the reader - std::mem::drop(search_permit); - if leaf_search_single_split_res.is_ok() { timer.observe_duration(); } diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 491f66f3aee..e2601081cd5 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -252,6 +252,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: None, }; assert!(cache.get(split_1.clone(), query_1.clone()).is_none()); @@ -342,6 +343,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: None, }; // for split_1, 1 and 1bis cover different timestamp ranges diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 4b80b3aed83..5bb03168120 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -40,6 +40,7 @@ use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_permit_provider::SearchPermit; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -215,9 +216,12 @@ async fn leaf_list_terms_single_split( search_request: &ListTermsRequest, storage: Arc, split: SplitIdAndFooterOffsets, + search_permit: SearchPermit, ) -> crate::Result { - let cache = - ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + search_permit, + ); let index = open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; let split_schema = index.schema(); @@ -330,7 +334,8 @@ pub async fn leaf_list_terms( info!(split_offsets = ?PrettySample::new(splits, 5)); let permits = searcher_context .search_permit_provider - .get_permits(splits.len()); + .get_permits(splits.len()) + .await; let leaf_search_single_split_futures: Vec<_> = splits .iter() .zip(permits.into_iter()) @@ -338,7 +343,7 @@ pub async fn leaf_list_terms( let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = search_permit_recv.await; + let leaf_split_search_permit = search_permit_recv.await; // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS @@ -349,6 +354,7 @@ pub async fn leaf_list_terms( request, index_storage_clone, split.clone(), + leaf_split_search_permit, ) .await; timer.observe_duration(); diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 15980acf4a8..2b86b5191b5 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -20,22 +20,37 @@ use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tracing::warn; -/// `SearchPermitProvider` is a distributor of permits to perform single split -/// search operation. +/// Distributor of permits to perform split search operation. /// -/// - Two types of resources are managed: memory allocations and download slots. -/// - Requests are served in order. +/// Requests are served in order. Each permit initially reserves a slot for the +/// warmup (limit concurrent downloads) and a pessimistic amount of memory. Once +/// the warmup is completed, the actual memory usage is set and the warmup slot +/// is released. Once the search is completed and the permit is dropped, the +/// remaining memory is also released. #[derive(Clone)] pub struct SearchPermitProvider { - inner_arc: Arc>, + sender: mpsc::UnboundedSender, +} + +pub enum SearchPermitMessage { + Request { + permit_sender: oneshot::Sender>, + num_permits: usize, + }, + WarmupCompleted { + memory_delta: i64, + }, + Drop { + memory_size: u64, + warmup_permit_held: bool, + }, } impl SearchPermitProvider { @@ -43,66 +58,110 @@ impl SearchPermitProvider { num_download_slots: usize, memory_budget: ByteSize, initial_allocation: ByteSize, - ) -> SearchPermitProvider { - SearchPermitProvider { - inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { - num_download_slots_available: num_download_slots, - memory_budget: memory_budget.as_u64(), - permits_requests: VecDeque::new(), - memory_allocated: 0u64, - initial_allocation: initial_allocation.as_u64(), - })), - } + ) -> Self { + let (sender, receiver) = mpsc::unbounded_channel(); + let mut actor = SearchPermitActor { + msg_receiver: receiver, + msg_sender: sender.downgrade(), + num_warmup_slots_available: num_download_slots, + total_memory_budget: memory_budget.as_u64(), + permits_requests: VecDeque::new(), + total_memory_allocated: 0u64, + per_permit_initial_memory_allocation: initial_allocation.as_u64(), + }; + tokio::spawn(async move { actor.run().await }); + Self { sender } } - /// Returns a list of future permits in the form of awaitable futures. + /// Returns `num_permits` futures that complete once enough resources are + /// available. /// - /// The permits returned are guaranteed to be resolved in order. - /// In addition, the permits are guaranteed to be resolved before permits returned by - /// subsequent calls to this function (or `get_permit`). - #[must_use] - pub fn get_permits(&self, num_permits: usize) -> Vec { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permits(num_permits, &self.inner_arc) + /// The permits returned are guaranteed to be resolved in order. In + /// addition, the permits are guaranteed to be resolved before permits + /// returned by subsequent calls to this function. + pub async fn get_permits(&self, num_permits: usize) -> Vec { + let (permit_sender, permit_receiver) = oneshot::channel(); + self.sender + .send(SearchPermitMessage::Request { + permit_sender, + num_permits, + }) + .expect("Receiver lives longer than sender"); + permit_receiver + .await + .expect("Receiver lives longer than sender") } } -struct InnerSearchPermitProvider { - num_download_slots_available: usize, - - // Note it is possible for memory_allocated to exceed memory_budget temporarily, - // if and only if a split leaf search task ended up using more than `initial_allocation`. - // - // When it happens, new permits will not be assigned until the memory is freed. - memory_budget: u64, - memory_allocated: u64, - initial_allocation: u64, +struct SearchPermitActor { + msg_receiver: mpsc::UnboundedReceiver, + msg_sender: mpsc::WeakUnboundedSender, + num_warmup_slots_available: usize, + /// Note it is possible for memory_allocated to exceed memory_budget temporarily, + /// if and only if a split leaf search task ended up using more than `initial_allocation`. + /// When it happens, new permits will not be assigned until the memory is freed. + total_memory_budget: u64, + total_memory_allocated: u64, + per_permit_initial_memory_allocation: u64, permits_requests: VecDeque>, } -impl InnerSearchPermitProvider { - fn get_permits( - &mut self, - num_permits: usize, - inner_arc: &Arc>, - ) -> Vec { - let mut permits = Vec::with_capacity(num_permits); - for _ in 0..num_permits { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - permits.push(SearchPermitFuture(rx)); +impl SearchPermitActor { + async fn run(&mut self) { + // Stops when the last clone of SearchPermitProvider is dropped. + while let Some(msg) = self.msg_receiver.recv().await { + self.handle_message(msg); } - self.assign_available_permits(inner_arc); - permits } - /// Called each time a permit is requested or released - /// - /// Calling lock on `inner_arc` inside this method will cause a deadlock as - /// `&mut self` and `inner_arc` reference the same instance. - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_download_slots_available > 0 - && self.memory_allocated + self.initial_allocation <= self.memory_budget + fn handle_message(&mut self, msg: SearchPermitMessage) { + match msg { + SearchPermitMessage::Request { + num_permits, + permit_sender, + } => { + let mut permits = Vec::with_capacity(num_permits); + for _ in 0..num_permits { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + permits.push(SearchPermitFuture(rx)); + } + self.assign_available_permits(); + permit_sender + .send(permits) + .ok() + // This is a request response pattern, so we can safely ignore the error. + .expect("Receiver lives longer than sender"); + } + SearchPermitMessage::WarmupCompleted { memory_delta } => { + self.num_warmup_slots_available += 1; + if self.total_memory_allocated as i64 + memory_delta < 0 { + panic!("More memory released than allocated, should never happen.") + } + self.total_memory_allocated = + (self.total_memory_allocated as i64 + memory_delta) as u64; + self.assign_available_permits(); + } + SearchPermitMessage::Drop { + memory_size, + warmup_permit_held, + } => { + if warmup_permit_held { + self.num_warmup_slots_available += 1; + } + self.total_memory_allocated = self + .total_memory_allocated + .checked_sub(memory_size) + .expect("More memory released than allocated, should never happen."); + self.assign_available_permits(); + } + } + } + + fn assign_available_permits(&mut self) { + while self.num_warmup_slots_available > 0 + && self.total_memory_allocated + self.per_permit_initial_memory_allocation + <= self.total_memory_budget { let Some(permit_requester_tx) = self.permits_requests.pop_front() else { break; @@ -111,23 +170,17 @@ impl InnerSearchPermitProvider { &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = permit_requester_tx.send(SearchPermit { - _ongoing_gauge_guard: ongoing_gauge_guard, - inner_arc: inner_arc.clone(), - warmup_permit_held: true, - memory_allocation: self.initial_allocation, - }); - match send_res { - Ok(()) => { - self.num_download_slots_available -= 1; - self.memory_allocated += self.initial_allocation; - } - Err(search_permit) => { - // We cannot just decrease the num_permits_available in all case and rely on - // the drop logic here: it would cause a dead lock on the inner_arc Mutex. - search_permit.drop_without_recycling_permit(); - } - } + self.total_memory_allocated += self.per_permit_initial_memory_allocation; + permit_requester_tx + .send(SearchPermit { + _ongoing_gauge_guard: ongoing_gauge_guard, + msg_sender: self.msg_sender.clone(), + memory_allocation: self.per_permit_initial_memory_allocation, + warmup_permit_held: true, + }) + // if the requester dropped its receiver, we drop the newly + // created SearchPermit which releases the resources + .ok(); } crate::SEARCH_METRICS .leaf_search_single_split_tasks_pending @@ -137,16 +190,16 @@ impl InnerSearchPermitProvider { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_arc: Arc>, - warmup_permit_held: bool, + msg_sender: mpsc::WeakUnboundedSender, memory_allocation: u64, + warmup_permit_held: bool, } impl SearchPermit { - /// After warm up, we have a proper estimate of the memory usage of a single split leaf search. - /// - /// We can then set the actual memory usage. - pub fn set_actual_memory_usage_and_release_permit_after(&mut self, new_memory_usage: u64) { + /// After warm up, we have a proper estimate of the memory usage of a single + /// split leaf search. We can thus set the actual memory usage and release + /// the warmup slot. + pub fn warmup_completed(&mut self, new_memory_usage: u64) { if new_memory_usage > self.memory_allocation { warn!( memory_usage = new_memory_usage, @@ -154,53 +207,40 @@ impl SearchPermit { "current leaf search is consuming more memory than the initial allocation" ); } - let mut inner_guard = self.inner_arc.lock().unwrap(); - let delta = new_memory_usage as i64 - inner_guard.initial_allocation as i64; - inner_guard.memory_allocated += delta as u64; - inner_guard.num_download_slots_available += 1; - if inner_guard.memory_allocated > inner_guard.memory_budget { - warn!( - memory_allocated = inner_guard.memory_allocated, - memory_budget = inner_guard.memory_budget, - "memory allocated exceeds memory budget" - ); - } - self.memory_allocation = new_memory_usage; - inner_guard.assign_available_permits(&self.inner_arc); + let memory_delta = new_memory_usage as i64 - self.memory_allocation as i64; + self.warmup_permit_held = false; + self.send_if_still_running(SearchPermitMessage::WarmupCompleted { memory_delta }); } - fn drop_without_recycling_permit(mut self) { - self.warmup_permit_held = false; - self.memory_allocation = 0u64; - drop(self); + fn send_if_still_running(&self, msg: SearchPermitMessage) { + if let Some(sender) = self.msg_sender.upgrade() { + sender + .send(msg) + // Receiver instance in the event loop is never dropped or + // closed as long as there is a strong sender reference. + .expect("Receiver should live longer than sender"); + } } } impl Drop for SearchPermit { fn drop(&mut self) { - // This is not just an optimization. This is necessary to avoid a dead lock when the - // permit requester dropped its receiver channel. - if !self.warmup_permit_held && self.memory_allocation == 0 { - return; - } - let mut inner_guard = self.inner_arc.lock().unwrap(); - if self.warmup_permit_held { - inner_guard.num_download_slots_available += 1; - } - inner_guard.memory_allocated -= self.memory_allocation; - inner_guard.assign_available_permits(&self.inner_arc); + self.send_if_still_running(SearchPermitMessage::Drop { + memory_size: self.memory_allocation, + warmup_permit_held: self.warmup_permit_held, + }); } } pub struct SearchPermitFuture(oneshot::Receiver); impl Future for SearchPermitFuture { - type Output = Option; + type Output = SearchPermit; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let receiver = Pin::new(&mut self.get_mut().0); match receiver.poll(cx) { - Poll::Ready(Ok(search_permit)) => Poll::Ready(Some(search_permit)), + Poll::Ready(Ok(search_permit)) => Poll::Ready(search_permit), Poll::Ready(Err(_)) => panic!("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."), Poll::Pending => Poll::Pending, } diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index ed7d39965a9..22d63853b4e 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -127,8 +127,11 @@ async fn leaf_search_stream_single_split( &split, ); - let cache = - ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + // should we not track the memory with a SearcherPermit? + (), + ); let index = open_index_with_caches( &searcher_context, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 734344f2c74..e6d1238e644 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -488,8 +488,11 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, searcher_config.warmup_single_split_initial_allocation); + let leaf_search_split_semaphore = SearchPermitProvider::new( + searcher_config.max_num_concurrent_split_searches, + searcher_config.warmup_memory_budget, + searcher_config.warmup_single_split_initial_allocation, + ); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index c9f261c347b..076d64a22d1 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -21,7 +21,6 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tantivy::directory::OwnedBytes; @@ -345,54 +344,69 @@ impl Drop for NeedMutByteRangeCache { /// cached data, the changes may or may not get recorded. /// /// At the moment this is hardly a cache as it features no eviction policy. -#[derive(Clone)] -pub struct ByteRangeCache { - inner_arc: Arc, +pub struct ByteRangeCache { + inner_arc: Arc>>, } -struct Inner { - num_stored_bytes: AtomicU64, - need_mut_byte_range_cache: Mutex>, +struct Inner { + need_mut_byte_range_cache: NeedMutByteRangeCache, + tracker: T, } -impl ByteRangeCache { +impl Clone for ByteRangeCache { + fn clone(&self) -> Self { + ByteRangeCache { + inner_arc: self.inner_arc.clone(), + } + } +} + +impl ByteRangeCache { /// Creates a slice cache that never removes any entry. - pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { + pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics, tracker: T) -> Self { let need_mut_byte_range_cache = NeedMutByteRangeCache::with_infinite_capacity(cache_counters); let inner = Inner { - num_stored_bytes: AtomicU64::default(), - need_mut_byte_range_cache: Mutex::new(need_mut_byte_range_cache), + need_mut_byte_range_cache, + tracker, }; ByteRangeCache { - inner_arc: Arc::new(inner), + inner_arc: Arc::new(Mutex::new(inner)), } } /// Overall amount of bytes stored in the cache. pub fn get_num_bytes(&self) -> u64 { - self.inner_arc.num_stored_bytes.load(Ordering::Relaxed) + self.inner_arc + .lock() + .unwrap() + .need_mut_byte_range_cache + .num_bytes } /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { self.inner_arc - .need_mut_byte_range_cache .lock() .unwrap() + .need_mut_byte_range_cache .get_slice(path, byte_range) } /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - let mut need_mut_byte_range_cache_locked = - self.inner_arc.need_mut_byte_range_cache.lock().unwrap(); - need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); - let num_bytes = need_mut_byte_range_cache_locked.num_bytes; - drop(need_mut_byte_range_cache_locked); - self.inner_arc - .num_stored_bytes - .store(num_bytes as u64, Ordering::Relaxed); + let mut inner = self.inner_arc.lock().unwrap(); + inner + .need_mut_byte_range_cache + .put_slice(path, byte_range, bytes); + } + + /// Apply the provided action on the tracker. + /// + /// The action should not block for long. + pub fn track(&self, action: impl FnOnce(&mut T)) { + let mut inner = self.inner_arc.lock().unwrap(); + action(&mut inner.tracker); } } @@ -449,7 +463,7 @@ mod tests { state.insert("path1", vec![false; 12]); state.insert("path2", vec![false; 12]); - let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS); + let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS, ()); for op in ops { match op { @@ -470,13 +484,13 @@ mod tests { .sum(); // in some case we have ranges touching each other, count_items count them // as only one, but cache count them as 2. - assert!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_items >= expected_item_count as u64); + assert!(cache.inner_arc.lock().unwrap().need_mut_byte_range_cache.num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() .filter(|stored| **stored) .count(); - assert_eq!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_bytes, expected_byte_count as u64); + assert_eq!(cache.inner_arc.lock().unwrap().need_mut_byte_range_cache.num_bytes, expected_byte_count as u64); } Operation::Get { range, @@ -517,7 +531,7 @@ mod tests { static METRICS: Lazy = Lazy::new(|| CacheMetrics::for_component("byterange_cache_test")); - let cache = ByteRangeCache::with_infinite_capacity(&METRICS); + let cache = ByteRangeCache::with_infinite_capacity(&METRICS, ()); let key: std::path::PathBuf = "key".into(); @@ -543,7 +557,7 @@ mod tests { ); { - let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); + let mutable_cache = &cache.inner_arc.lock().unwrap().need_mut_byte_range_cache; assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -555,7 +569,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); + let mutable_cache = &cache.inner_arc.lock().unwrap().need_mut_byte_range_cache; assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2); From ba3759f6103fafc39dbbe9079b2b50da19ae7afc Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 26 Nov 2024 10:09:41 +0100 Subject: [PATCH 05/12] Revert weird cargo lock update --- quickwit/Cargo.lock | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 11cbae2c968..1662803ed0b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3502,23 +3502,12 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" -dependencies = [ - "idna_adapter", - "smallvec", - "utf8_iter", -] - -[[package]] -name = "idna_adapter" -version = "1.2.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ - "icu_normalizer", - "icu_properties", + "unicode-bidi", + "unicode-normalization", ] [[package]] From 23a7aefff4ac96d5806c38157ab2273073327853 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 26 Nov 2024 12:30:13 +0100 Subject: [PATCH 06/12] Improve separation of concern by using wrapping instead of nesting Adding an extra generic field into the cache to optionally allow permit tracking is weird. Instead, we make the directory generic on the type of cache and use a wrapped cache when tracking is necessary. --- .../src/caching_directory.rs | 62 ++++++++------- quickwit/quickwit-directories/src/lib.rs | 2 +- quickwit/quickwit-search/src/fetch_docs.rs | 6 +- quickwit/quickwit-search/src/leaf.rs | 20 ++--- quickwit/quickwit-search/src/lib.rs | 1 + quickwit/quickwit-search/src/list_terms.rs | 11 +-- .../src/search_permit_provider.rs | 1 + .../quickwit-search/src/search_stream/leaf.rs | 11 +-- quickwit/quickwit-search/src/tracked_cache.rs | 77 +++++++++++++++++++ .../src/cache/byte_range_cache.rs | 74 ++++++------------ 10 files changed, 160 insertions(+), 105 deletions(-) create mode 100644 quickwit/quickwit-search/src/tracked_cache.rs diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index b5093680458..5ee8b63777a 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -29,41 +29,28 @@ use tantivy::directory::{FileHandle, OwnedBytes}; use tantivy::{Directory, HasLen}; /// The caching directory is a simple cache that wraps another directory. -pub struct CachingDirectory { +#[derive(Clone)] +pub struct CachingDirectory { underlying: Arc, - // TODO fixme: that's a pretty ugly cache we have here. - cache: ByteRangeCache, + cache: C, } -impl Clone for CachingDirectory { - fn clone(&self) -> Self { - CachingDirectory { - underlying: self.underlying.clone(), - cache: self.cache.clone(), - } - } -} - -impl CachingDirectory { - /// Creates a new CachingDirectory. +impl CachingDirectory> { + /// Creates a new CachingDirectory with a default cache. /// /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. - pub fn new_unbounded(underlying: Arc) -> CachingDirectory { + pub fn new_unbounded(underlying: Arc) -> CachingDirectory> { let byte_range_cache = ByteRangeCache::with_infinite_capacity( &quickwit_storage::STORAGE_METRICS.shortlived_cache, - (), ); - CachingDirectory::new(underlying, byte_range_cache) + CachingDirectory::new(underlying, Arc::new(byte_range_cache)) } } -impl CachingDirectory { - /// Creates a new CachingDirectory. - /// - /// Warming: The resulting CacheDirectory will cache all information without ever - /// removing any item from the cache. - pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { +impl CachingDirectory { + /// Creates a new CachingDirectory with an existing cache. + pub fn new(underlying: Arc, cache: C) -> CachingDirectory { CachingDirectory { underlying, cache } } } @@ -74,9 +61,9 @@ impl fmt::Debug for CachingDirectory { } } -struct CachingFileHandle { +struct CachingFileHandle { path: PathBuf, - cache: ByteRangeCache, + cache: C, underlying_filehandle: Arc, } @@ -92,7 +79,7 @@ impl fmt::Debug for CachingFileHandle { } #[async_trait] -impl FileHandle for CachingFileHandle { +impl FileHandle for CachingFileHandle { fn read_bytes(&self, byte_range: Range) -> io::Result { if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) { return Ok(bytes); @@ -117,13 +104,13 @@ impl FileHandle for CachingFileHandle { } } -impl HasLen for CachingFileHandle { +impl HasLen for CachingFileHandle { fn len(&self) -> usize { self.underlying_filehandle.len() } } -impl Directory for CachingDirectory { +impl Directory for CachingDirectory { fn exists(&self, path: &Path) -> std::result::Result { self.underlying.exists(path) } @@ -153,6 +140,25 @@ impl Directory for CachingDirectory { crate::read_only_directory!(); } +/// A byte range cache that to be used in front of the directory. +pub trait DirectoryCache: Clone + Send + Sync + 'static { + /// If available, returns the cached view of the slice. + fn get_slice(&self, path: &Path, byte_range: Range) -> Option; + + /// Put the given amount of data in the cache. + fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes); +} + +impl DirectoryCache for Arc { + fn get_slice(&self, path: &Path, byte_range: Range) -> Option { + ByteRangeCache::get_slice(self, path, byte_range) + } + + fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { + ByteRangeCache::put_slice(self, path, byte_range, bytes) + } +} + #[cfg(test)] mod tests { diff --git a/quickwit/quickwit-directories/src/lib.rs b/quickwit/quickwit-directories/src/lib.rs index 4df4f2799ec..d8cd9e02fa3 100644 --- a/quickwit/quickwit-directories/src/lib.rs +++ b/quickwit/quickwit-directories/src/lib.rs @@ -37,7 +37,7 @@ mod storage_directory; mod union_directory; pub use self::bundle_directory::{get_hotcache_from_split, read_split_footer, BundleDirectory}; -pub use self::caching_directory::CachingDirectory; +pub use self::caching_directory::{CachingDirectory, DirectoryCache}; pub use self::debug_proxy_directory::{DebugProxyDirectory, ReadOperation}; pub use self::hot_directory::{write_hotcache, HotDirectory}; pub use self::storage_directory::StorageDirectory; diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 070109d1114..9240d5399ae 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -27,7 +27,7 @@ use quickwit_doc_mapper::DocMapper; use quickwit_proto::search::{ FetchDocsResponse, PartialHit, SnippetRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::query::Query; use tantivy::schema::document::CompactDocValue; use tantivy::schema::{Document as DocumentTrait, Field, TantivyDocument, Value}; @@ -174,12 +174,12 @@ async fn fetch_docs_in_split( global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful // when fetching docs as we will fetch them only once. - let mut index = open_index_with_caches::<()>( + let mut index = open_index_with_caches( &searcher_context, index_storage, split, Some(doc_mapper.tokenizer_manager()), - None, + Option::>::None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 41a1fad0465..cf83c8e4a15 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -27,7 +27,7 @@ use std::time::{Duration, Instant}; use anyhow::Context; use futures::future::try_join_all; use quickwit_common::pretty::PrettySample; -use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; +use quickwit_directories::{CachingDirectory, DirectoryCache, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, @@ -53,6 +53,7 @@ use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; use crate::search_permit_provider::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; +use crate::tracked_cache::TrackedByteRangeCache; use crate::{QuickwitAggregations, SearchError}; #[instrument(skip_all)] @@ -134,12 +135,12 @@ pub(crate) async fn open_split_bundle( /// TODO: generic T should be forced to SearcherPermit, but this requires the /// search stream to also request permits. #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] -pub(crate) async fn open_index_with_caches( +pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: Option>, + ephemeral_unbounded_cache: Option, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -408,16 +409,15 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); - let byte_range_cache = ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - search_permit, - ); + let byte_range_cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let tracked_cache = TrackedByteRangeCache::new(byte_range_cache, search_permit); let index = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - Some(byte_range_cache.clone()), + Some(tracked_cache.clone()), ) .await?; let split_schema = index.schema(); @@ -441,8 +441,8 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let warmup_end = Instant::now(); let warmup_duration: Duration = warmup_end.duration_since(warmup_start); - let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes(); - byte_range_cache.track(|permit| permit.warmup_completed(short_lived_cache_num_bytes)); + tracked_cache.warmup_completed(); + let short_lived_cache_num_bytes = tracked_cache.get_num_bytes(); let split_num_docs = split.num_docs; let span = info_span!("tantivy_search"); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 181879b91a6..3685d49dbb4 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -42,6 +42,7 @@ mod search_response_rest; mod search_stream; mod service; pub(crate) mod top_k_collector; +mod tracked_cache; mod metrics; mod search_permit_provider; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 5bb03168120..06136b0fefb 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -41,6 +41,7 @@ use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; use crate::search_permit_provider::SearchPermit; +use crate::tracked_cache::TrackedByteRangeCache; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -218,12 +219,12 @@ async fn leaf_list_terms_single_split( split: SplitIdAndFooterOffsets, search_permit: SearchPermit, ) -> crate::Result { - let cache = ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - search_permit, - ); + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let tracked_cache = TrackedByteRangeCache::new(cache, search_permit); let index = - open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; + open_index_with_caches(searcher_context, storage, &split, None, Some(tracked_cache)) + .await?; let split_schema = index.schema(); let reader = index .reader_builder() diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 2b86b5191b5..f6965574169 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -209,6 +209,7 @@ impl SearchPermit { } let memory_delta = new_memory_usage as i64 - self.memory_allocation as i64; self.warmup_permit_held = false; + self.memory_allocation = new_memory_usage; self.send_if_still_running(SearchPermitMessage::WarmupCompleted { memory_delta }); } diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 22d63853b4e..0932e827a71 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -127,18 +127,15 @@ async fn leaf_search_stream_single_split( &split, ); - let cache = ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - // should we not track the memory with a SearcherPermit? - (), - ); - + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + // TODO should create a SearchPermit we wrap with TrackedByteRangeCache here? let index = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - Some(cache), + Some(Arc::new(cache)), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-search/src/tracked_cache.rs b/quickwit/quickwit-search/src/tracked_cache.rs new file mode 100644 index 00000000000..2eb947dc385 --- /dev/null +++ b/quickwit/quickwit-search/src/tracked_cache.rs @@ -0,0 +1,77 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::sync::{Arc, Mutex}; + +use quickwit_directories::DirectoryCache; +use quickwit_storage::ByteRangeCache; + +use crate::search_permit_provider::SearchPermit; + +/// A [`ByteRangeCache`] tied to a [`SearchPermit`]. +#[derive(Clone)] +pub struct TrackedByteRangeCache { + inner: Arc, +} + +struct Inner { + cache: ByteRangeCache, + search_permit: Mutex, +} + +impl TrackedByteRangeCache { + pub fn new(cache: ByteRangeCache, search_permit: SearchPermit) -> TrackedByteRangeCache { + TrackedByteRangeCache { + inner: Arc::new(Inner { + cache, + search_permit: Mutex::new(search_permit), + }), + } + } + + pub fn warmup_completed(&self) { + self.inner + .search_permit + .lock() + .unwrap() + .warmup_completed(self.get_num_bytes()); + } + + pub fn get_num_bytes(&self) -> u64 { + self.inner.cache.get_num_bytes() + } +} + +impl DirectoryCache for TrackedByteRangeCache { + fn get_slice( + &self, + path: &std::path::Path, + byte_range: std::ops::Range, + ) -> Option { + self.inner.cache.get_slice(path, byte_range) + } + fn put_slice( + &self, + path: std::path::PathBuf, + byte_range: std::ops::Range, + bytes: quickwit_storage::OwnedBytes, + ) { + self.inner.cache.put_slice(path, byte_range, bytes) + } +} diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 076d64a22d1..9f5c51ac033 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -21,7 +21,8 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Mutex; use tantivy::directory::OwnedBytes; @@ -344,69 +345,40 @@ impl Drop for NeedMutByteRangeCache { /// cached data, the changes may or may not get recorded. /// /// At the moment this is hardly a cache as it features no eviction policy. -pub struct ByteRangeCache { - inner_arc: Arc>>, +pub struct ByteRangeCache { + num_stored_bytes: AtomicU64, + inner: Mutex>, } -struct Inner { - need_mut_byte_range_cache: NeedMutByteRangeCache, - tracker: T, -} - -impl Clone for ByteRangeCache { - fn clone(&self) -> Self { - ByteRangeCache { - inner_arc: self.inner_arc.clone(), - } - } -} - -impl ByteRangeCache { +impl ByteRangeCache { /// Creates a slice cache that never removes any entry. - pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics, tracker: T) -> Self { + pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { let need_mut_byte_range_cache = NeedMutByteRangeCache::with_infinite_capacity(cache_counters); - let inner = Inner { - need_mut_byte_range_cache, - tracker, - }; ByteRangeCache { - inner_arc: Arc::new(Mutex::new(inner)), + num_stored_bytes: AtomicU64::default(), + inner: Mutex::new(need_mut_byte_range_cache), } } /// Overall amount of bytes stored in the cache. pub fn get_num_bytes(&self) -> u64 { - self.inner_arc - .lock() - .unwrap() - .need_mut_byte_range_cache - .num_bytes + self.num_stored_bytes.load(Ordering::Relaxed) } /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { - self.inner_arc - .lock() - .unwrap() - .need_mut_byte_range_cache - .get_slice(path, byte_range) + self.inner.lock().unwrap().get_slice(path, byte_range) } /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - let mut inner = self.inner_arc.lock().unwrap(); - inner - .need_mut_byte_range_cache - .put_slice(path, byte_range, bytes); - } - - /// Apply the provided action on the tracker. - /// - /// The action should not block for long. - pub fn track(&self, action: impl FnOnce(&mut T)) { - let mut inner = self.inner_arc.lock().unwrap(); - action(&mut inner.tracker); + let mut need_mut_byte_range_cache_locked = self.inner.lock().unwrap(); + need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); + let num_bytes = need_mut_byte_range_cache_locked.num_bytes; + drop(need_mut_byte_range_cache_locked); + self.num_stored_bytes + .store(num_bytes as u64, Ordering::Relaxed); } } @@ -463,7 +435,7 @@ mod tests { state.insert("path1", vec![false; 12]); state.insert("path2", vec![false; 12]); - let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS, ()); + let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS); for op in ops { match op { @@ -484,13 +456,13 @@ mod tests { .sum(); // in some case we have ranges touching each other, count_items count them // as only one, but cache count them as 2. - assert!(cache.inner_arc.lock().unwrap().need_mut_byte_range_cache.num_items >= expected_item_count as u64); + assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() .filter(|stored| **stored) .count(); - assert_eq!(cache.inner_arc.lock().unwrap().need_mut_byte_range_cache.num_bytes, expected_byte_count as u64); + assert_eq!(cache.inner.lock().unwrap().num_bytes, expected_byte_count as u64); } Operation::Get { range, @@ -531,7 +503,7 @@ mod tests { static METRICS: Lazy = Lazy::new(|| CacheMetrics::for_component("byterange_cache_test")); - let cache = ByteRangeCache::with_infinite_capacity(&METRICS, ()); + let cache = ByteRangeCache::with_infinite_capacity(&METRICS); let key: std::path::PathBuf = "key".into(); @@ -557,7 +529,7 @@ mod tests { ); { - let mutable_cache = &cache.inner_arc.lock().unwrap().need_mut_byte_range_cache; + let mutable_cache = &cache.inner.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -569,7 +541,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = &cache.inner_arc.lock().unwrap().need_mut_byte_range_cache; + let mutable_cache = &cache.inner.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2); From 08ddc9fdb4fa6abd92ad431e0b5081422a2291c9 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 26 Nov 2024 13:45:19 +0100 Subject: [PATCH 07/12] Fix clippy --- quickwit/quickwit-search/src/cluster_client.rs | 2 +- quickwit/quickwit-search/src/root.rs | 13 +++++++++---- .../quickwit-storage/src/cache/byte_range_cache.rs | 3 +-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index ec5a093f25e..0f18283a9f0 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -322,7 +322,7 @@ fn merge_original_with_retry_leaf_search_response( retry_response.resource_stats.as_ref(), ] .into_iter() - .flat_map(|el_opt| el_opt); + .flatten(); let resource_stats = merge_resource_stats_it(&mut stats); Ok(LeafSearchResponse { intermediate_aggregation_result, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 5e4e1e5804e..ea057be4ff7 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -694,7 +694,8 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec bool { @@ -705,10 +706,11 @@ fn is_top_1pct_memory_intensive(num_bytes: u64, split_num_docs: u64) -> bool { // We multiply those figure by 1_000 for accuracy. const PERCENTILE: u64 = 95; const PRIOR_NUM_BYTES_PER_DOC: u64 = 3 * 1_000; - static NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR: AtomicU64 = AtomicU64::new(PRIOR_NUM_BYTES_PER_DOC); + static NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR: AtomicU64 = + AtomicU64::new(PRIOR_NUM_BYTES_PER_DOC); let num_bits_per_docs = num_bytes * 1_000 / split_num_docs; let current_estimator = NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.load(Ordering::Relaxed); - let is_memory_intensive = num_bits_per_docs > current_estimator; + let is_memory_intensive = num_bits_per_docs > current_estimator; let new_estimator: u64 = if is_memory_intensive { current_estimator.saturating_add(PRIOR_NUM_BYTES_PER_DOC * PERCENTILE / 100) } else { @@ -781,7 +783,10 @@ pub(crate) async fn search_partial_hits_phase( ); if let Some(resource_stats) = &leaf_search_response.resource_stats { - if is_top_1pct_memory_intensive(resource_stats.short_lived_cache_num_bytes, resource_stats.split_num_docs) { + if is_top_1pct_memory_intensive( + resource_stats.short_lived_cache_num_bytes, + resource_stats.split_num_docs, + ) { // We log at most 5 times per minute. quickwit_common::rate_limited_info!(limit_per_min=5, split_num_docs=resource_stats.split_num_docs, %search_request.query_ast, short_lived_cached_num_bytes=resource_stats.short_lived_cache_num_bytes, query=%search_request.query_ast, "memory intensive query"); } diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9f5c51ac033..fbe0031f291 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -377,8 +377,7 @@ impl ByteRangeCache { need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); let num_bytes = need_mut_byte_range_cache_locked.num_bytes; drop(need_mut_byte_range_cache_locked); - self.num_stored_bytes - .store(num_bytes as u64, Ordering::Relaxed); + self.num_stored_bytes.store(num_bytes, Ordering::Relaxed); } } From 37c07bcf777d97979c0284f0f5a1ede1cc637287 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 26 Nov 2024 14:21:24 +0100 Subject: [PATCH 08/12] Fix undefined incremental resource stat --- quickwit/quickwit-search/src/collector.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 1cba2b95a32..4f43dc8ba3c 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -1191,7 +1191,7 @@ pub(crate) struct IncrementalCollector { num_attempted_splits: u64, num_successful_splits: u64, start_offset: usize, - resource_stats: ResourceStats, + resource_stats: Option, } impl IncrementalCollector { @@ -1212,7 +1212,7 @@ impl IncrementalCollector { failed_splits: Vec::new(), num_attempted_splits: 0, num_successful_splits: 0, - resource_stats: ResourceStats::default(), + resource_stats: None, } } @@ -1228,8 +1228,12 @@ impl IncrementalCollector { resource_stats, } = leaf_response; - if let Some(leaf_resource_stats) = &resource_stats { - merge_resource_stats(leaf_resource_stats, &mut self.resource_stats); + if let Some(leaf_resource_stats) = resource_stats { + if let Some(current_stats) = self.resource_stats.as_mut() { + merge_resource_stats(&leaf_resource_stats, current_stats); + } else { + self.resource_stats = Some(leaf_resource_stats); + } } self.num_hits += num_hits; @@ -1281,7 +1285,7 @@ impl IncrementalCollector { num_attempted_splits: self.num_attempted_splits, num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, - resource_stats: Some(self.resource_stats), + resource_stats: self.resource_stats, }) } } From 2f90a0cd3c9a1344848bef337b970c29942acfe7 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 26 Nov 2024 17:06:41 +0100 Subject: [PATCH 09/12] Add tests to permit provider --- .../src/search_permit_provider.rs | 199 ++++++++++++------ 1 file changed, 130 insertions(+), 69 deletions(-) diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index f6965574169..a9d8d9bb777 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -24,6 +24,8 @@ use std::task::{Context, Poll}; use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; +#[cfg(test)] +use tokio::sync::watch; use tokio::sync::{mpsc, oneshot}; use tracing::warn; @@ -36,7 +38,9 @@ use tracing::warn; /// remaining memory is also released. #[derive(Clone)] pub struct SearchPermitProvider { - sender: mpsc::UnboundedSender, + message_sender: mpsc::UnboundedSender, + #[cfg(test)] + actor_stopped: watch::Receiver, } pub enum SearchPermitMessage { @@ -59,18 +63,26 @@ impl SearchPermitProvider { memory_budget: ByteSize, initial_allocation: ByteSize, ) -> Self { - let (sender, receiver) = mpsc::unbounded_channel(); - let mut actor = SearchPermitActor { - msg_receiver: receiver, - msg_sender: sender.downgrade(), + let (message_sender, message_receiver) = mpsc::unbounded_channel(); + #[cfg(test)] + let (state_sender, state_receiver) = watch::channel(false); + let actor = SearchPermitActor { + msg_receiver: message_receiver, + msg_sender: message_sender.downgrade(), num_warmup_slots_available: num_download_slots, total_memory_budget: memory_budget.as_u64(), permits_requests: VecDeque::new(), total_memory_allocated: 0u64, per_permit_initial_memory_allocation: initial_allocation.as_u64(), + #[cfg(test)] + stopped: state_sender, }; - tokio::spawn(async move { actor.run().await }); - Self { sender } + tokio::spawn(actor.run()); + Self { + message_sender, + #[cfg(test)] + actor_stopped: state_receiver, + } } /// Returns `num_permits` futures that complete once enough resources are @@ -81,7 +93,7 @@ impl SearchPermitProvider { /// returned by subsequent calls to this function. pub async fn get_permits(&self, num_permits: usize) -> Vec { let (permit_sender, permit_receiver) = oneshot::channel(); - self.sender + self.message_sender .send(SearchPermitMessage::Request { permit_sender, num_permits, @@ -104,14 +116,18 @@ struct SearchPermitActor { total_memory_allocated: u64, per_permit_initial_memory_allocation: u64, permits_requests: VecDeque>, + #[cfg(test)] + stopped: watch::Sender, } impl SearchPermitActor { - async fn run(&mut self) { + async fn run(mut self) { // Stops when the last clone of SearchPermitProvider is dropped. while let Some(msg) = self.msg_receiver.recv().await { self.handle_message(msg); } + #[cfg(test)] + self.stopped.send(true).ok(); } fn handle_message(&mut self, msg: SearchPermitMessage) { @@ -171,6 +187,7 @@ impl SearchPermitActor { ); ongoing_gauge_guard.add(1); self.total_memory_allocated += self.per_permit_initial_memory_allocation; + self.num_warmup_slots_available -= 1; permit_requester_tx .send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, @@ -188,6 +205,7 @@ impl SearchPermitActor { } } +#[derive(Debug)] pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, msg_sender: mpsc::WeakUnboundedSender, @@ -248,70 +266,113 @@ impl Future for SearchPermitFuture { } } -// #[cfg(test)] -// mod tests { -// use tokio::task::JoinSet; +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::StreamExt; + use rand::seq::SliceRandom; + use tokio::task::JoinSet; + + use super::*; -// use super::*; + #[tokio::test] + async fn test_search_permit_order() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); + let mut all_futures = Vec::new(); + let first_batch_of_permits = permit_provider.get_permits(10).await; + assert_eq!(first_batch_of_permits.len(), 10); + all_futures.extend( + first_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((1, i), fut)), + ); -// #[tokio::test] -// async fn test_search_permits_get_permits_future() { -// // We test here that `get_permits_futures` does not interleave -// let search_permits = SearchPermitProvider::new(1); -// let mut all_futures = Vec::new(); -// let first_batch_of_permits = search_permits.get_permits(10); -// assert_eq!(first_batch_of_permits.len(), 10); -// all_futures.extend( -// first_batch_of_permits -// .into_iter() -// .enumerate() -// .map(move |(i, fut)| ((1, i), fut)), -// ); + let second_batch_of_permits = permit_provider.get_permits(10).await; + assert_eq!(second_batch_of_permits.len(), 10); + all_futures.extend( + second_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((2, i), fut)), + ); -// let second_batch_of_permits = search_permits.get_permits(10); -// assert_eq!(second_batch_of_permits.len(), 10); -// all_futures.extend( -// second_batch_of_permits -// .into_iter() -// .enumerate() -// .map(move |(i, fut)| ((2, i), fut)), -// ); + // not super useful, considering what join set does, but still a tiny bit more sound. + all_futures.shuffle(&mut rand::thread_rng()); -// use rand::seq::SliceRandom; -// // not super useful, considering what join set does, but still a tiny bit more sound. -// all_futures.shuffle(&mut rand::thread_rng()); + let mut join_set = JoinSet::new(); + for (res, fut) in all_futures { + join_set.spawn(async move { + let permit = fut.await; + (res, permit) + }); + } + let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); + while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { + ordered_result.push((batch_id, order)); + } -// let mut join_set = JoinSet::new(); -// for (res, fut) in all_futures { -// join_set.spawn(async move { -// let permit = fut.await; -// (res, permit) -// }); -// } -// let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); -// while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { -// ordered_result.push((batch_id, order)); -// } + assert_eq!(ordered_result.len(), 20); + for (i, res) in ordered_result[0..10].iter().enumerate() { + assert_eq!(res, &(1, i)); + } + for (i, res) in ordered_result[10..20].iter().enumerate() { + assert_eq!(res, &(2, i)); + } + } -// assert_eq!(ordered_result.len(), 20); -// for (i, res) in ordered_result[0..10].iter().enumerate() { -// assert_eq!(res, &(1, i)); -// } -// for (i, res) in ordered_result[10..20].iter().enumerate() { -// assert_eq!(res, &(2, i)); -// } -// } + #[tokio::test] + async fn test_search_permit_early_drops() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); + let permit_fut1 = permit_provider + .get_permits(1) + .await + .into_iter() + .next() + .unwrap(); + let permit_fut2 = permit_provider + .get_permits(1) + .await + .into_iter() + .next() + .unwrap(); + drop(permit_fut1); + let permit = permit_fut2.await; + assert_eq!(permit.memory_allocation, ByteSize::mb(10).as_u64()); + assert_eq!(*permit_provider.actor_stopped.borrow(), false); -// #[tokio::test] -// async fn test_search_permits_receiver_race_condition() { -// // Here we test that we don't have a problem if the Receiver is dropped. -// // In particular, we want to check that there is not a race condition where drop attempts -// to // lock the mutex. -// let search_permits = SearchPermitProvider::new(1); -// let permit_rx = search_permits.get_permit(); -// let permit_rx2 = search_permits.get_permit(); -// drop(permit_rx2); -// drop(permit_rx); -// let _permit_rx = search_permits.get_permit(); -// } -// } + let _permit_fut3 = permit_provider + .get_permits(1) + .await + .into_iter() + .next() + .unwrap(); + let mut actor_stopped = permit_provider.actor_stopped.clone(); + drop(permit_provider); + { + actor_stopped.changed().await.unwrap(); + assert!(*actor_stopped.borrow()); + } + } + + #[tokio::test] + async fn test_memory_permit() { + let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100), ByteSize::mb(10)); + let mut permit_futs = permit_provider.get_permits(12).await; + let mut blocked_permits = permit_futs.split_off(10).into_iter(); + let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) + .buffered(1) + .collect() + .await; + let next_blocked_permit = blocked_permits.next().unwrap(); + tokio::time::timeout(Duration::from_millis(20), next_blocked_permit) + .await + .unwrap_err(); + permits.drain(0..1); + let next_blocked_permit = blocked_permits.next().unwrap(); + tokio::time::timeout(Duration::from_millis(20), next_blocked_permit) + .await + .unwrap(); + } +} From 0946bc5695ce26e3bf15a897dd67e2212a2d7096 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 28 Nov 2024 11:13:43 +0100 Subject: [PATCH 10/12] Improve and test stats merging utils --- .../quickwit-search/src/cluster_client.rs | 11 +- quickwit/quickwit-search/src/collector.rs | 33 ++--- quickwit/quickwit-search/src/lib.rs | 125 ++++++++++++++++-- 3 files changed, 137 insertions(+), 32 deletions(-) diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 0f18283a9f0..8f2b08487fc 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -317,13 +317,10 @@ fn merge_original_with_retry_leaf_search_response( (Some(left), None) => Some(left), (None, None) => None, }; - let mut stats = [ - original_response.resource_stats.as_ref(), - retry_response.resource_stats.as_ref(), - ] - .into_iter() - .flatten(); - let resource_stats = merge_resource_stats_it(&mut stats); + let resource_stats = merge_resource_stats_it([ + &original_response.resource_stats, + &retry_response.resource_stats, + ]); Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: original_response.num_hits + retry_response.num_hits, diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 4f43dc8ba3c..67beb8090cb 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -921,10 +921,10 @@ fn merge_leaf_responses( return Ok(leaf_responses.pop().unwrap()); } - let mut resource_stats_it = leaf_responses + let resource_stats_it = leaf_responses .iter() - .flat_map(|leaf_response| leaf_response.resource_stats.as_ref()); - let merged_resource_stats = merge_resource_stats_it(&mut resource_stats_it); + .map(|leaf_response| &leaf_response.resource_stats); + let merged_resource_stats = merge_resource_stats_it(resource_stats_it); let merged_intermediate_aggregation_result: Option> = merge_intermediate_aggregation_result( @@ -1228,13 +1228,7 @@ impl IncrementalCollector { resource_stats, } = leaf_response; - if let Some(leaf_resource_stats) = resource_stats { - if let Some(current_stats) = self.resource_stats.as_mut() { - merge_resource_stats(&leaf_resource_stats, current_stats); - } else { - self.resource_stats = Some(leaf_resource_stats); - } - } + merge_resource_stats(&resource_stats, &mut self.resource_stats); self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); @@ -1295,8 +1289,8 @@ mod tests { use std::cmp::Ordering; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortField, SortOrder, - SortValue, SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortField, + SortOrder, SortValue, SplitSearchError, }; use tantivy::collector::Collector; use tantivy::TantivyDocument; @@ -1942,7 +1936,10 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, - resource_stats: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 100, + ..Default::default() + }), }, LeafSearchResponse { num_hits: 10, @@ -1961,7 +1958,10 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, - resource_stats: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 50, + ..Default::default() + }), }, ], ); @@ -1994,7 +1994,10 @@ mod tests { num_attempted_splits: 5, num_successful_splits: 4, intermediate_aggregation_result: None, - resource_stats: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 150, + ..Default::default() + }), } ); // TODO would be nice to test aggregation too. diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 3685d49dbb4..c9031865c52 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -344,20 +344,125 @@ pub fn searcher_pool_for_test( ) } -pub(crate) fn merge_resource_stats_it( - stats_it: &mut dyn Iterator, +pub(crate) fn merge_resource_stats_it<'a>( + stats_it: impl IntoIterator>, ) -> Option { - let mut acc_stats: ResourceStats = stats_it.next()?.clone(); + let mut acc_stats: Option = None; for new_stats in stats_it { merge_resource_stats(new_stats, &mut acc_stats); } - Some(acc_stats) + acc_stats } -fn merge_resource_stats(new_stats: &ResourceStats, stat_accs: &mut ResourceStats) { - stat_accs.short_lived_cache_num_bytes += new_stats.short_lived_cache_num_bytes; - stat_accs.split_num_docs += new_stats.split_num_docs; - stat_accs.warmup_microsecs += new_stats.warmup_microsecs; - stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs; - stat_accs.cpu_microsecs += new_stats.cpu_microsecs; +fn merge_resource_stats( + new_stats_opt: &Option, + stat_accs_opt: &mut Option, +) { + if let Some(new_stats) = new_stats_opt { + if let Some(stat_accs) = stat_accs_opt { + stat_accs.short_lived_cache_num_bytes += new_stats.short_lived_cache_num_bytes; + stat_accs.split_num_docs += new_stats.split_num_docs; + stat_accs.warmup_microsecs += new_stats.warmup_microsecs; + stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs; + stat_accs.cpu_microsecs += new_stats.cpu_microsecs; + } else { + *stat_accs_opt = Some(new_stats.clone()); + } + } +} +#[cfg(test)] +mod stats_merge_tests { + use super::*; + + #[test] + fn test_merge_resource_stats() { + let mut acc_stats = None; + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, None); + + let stats = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + merge_resource_stats(&stats, &mut acc_stats); + + assert_eq!(acc_stats, stats); + + let new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + merge_resource_stats(&new_stats, &mut acc_stats); + + let stats_plus_new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 150, + split_num_docs: 300, + warmup_microsecs: 450, + cpu_thread_pool_wait_microsecs: 600, + cpu_microsecs: 750, + }); + + assert_eq!(acc_stats, stats_plus_new_stats); + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, stats_plus_new_stats); + } + + #[test] + fn test_merge_resource_stats_it() { + let merged_stats = merge_resource_stats_it(Vec::<&Option>::new()); + assert_eq!(merged_stats, None); + + let stats1 = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + let merged_stats = merge_resource_stats_it(vec![&None, &stats1, &None]); + + assert_eq!(merged_stats, stats1); + + let stats2 = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + let stats3 = Some(ResourceStats { + short_lived_cache_num_bytes: 25, + split_num_docs: 50, + warmup_microsecs: 75, + cpu_thread_pool_wait_microsecs: 100, + cpu_microsecs: 125, + }); + + let merged_stats = merge_resource_stats_it(vec![&stats1, &stats2, &stats3]); + + assert_eq!( + merged_stats, + Some(ResourceStats { + short_lived_cache_num_bytes: 175, + split_num_docs: 350, + warmup_microsecs: 525, + cpu_thread_pool_wait_microsecs: 700, + cpu_microsecs: 875, + }) + ); + } } From 244b35df36c8e193e0ad902e63ea1b9a37b96ec4 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 28 Nov 2024 12:34:13 +0100 Subject: [PATCH 11/12] Fix minor typos --- quickwit/quickwit-search/src/leaf.rs | 6 +++--- quickwit/quickwit-search/src/leaf_cache.rs | 5 +++-- quickwit/quickwit-search/src/root.rs | 4 ++-- quickwit/quickwit-search/src/search_stream/leaf.rs | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index cf83c8e4a15..9d67445f174 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -30,8 +30,8 @@ use quickwit_common::pretty::PrettySample; use quickwit_directories::{CachingDirectory, DirectoryCache, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, - SortValue, SplitIdAndFooterOffsets, SplitSearchError, + CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, + SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; @@ -468,7 +468,7 @@ async fn leaf_search_single_split( } else { searcher.search(&query, &collector)? }; - leaf_search_response.resource_stats = Some(quickwit_proto::search::ResourceStats { + leaf_search_response.resource_stats = Some(ResourceStats { cpu_microsecs: cpu_start.elapsed().as_micros() as u64, short_lived_cache_num_bytes, split_num_docs, diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index e2601081cd5..016cdd5b00f 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -192,7 +192,8 @@ impl std::ops::RangeBounds for Range { #[cfg(test)] mod tests { use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortValue, SplitIdAndFooterOffsets, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortValue, + SplitIdAndFooterOffsets, }; use super::LeafSearchCache; @@ -343,7 +344,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], - resource_stats: None, + resource_stats: Some(ResourceStats::default()), }; // for split_1, 1 and 1bis cover different timestamp ranges diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index ea057be4ff7..7cf9fe16bbd 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -698,7 +698,7 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec bool { +fn is_top_5pct_memory_intensive(num_bytes: u64, split_num_docs: u64) -> bool { // It is not worth considering small splits for this. if split_num_docs < 100_000 { return false; @@ -783,7 +783,7 @@ pub(crate) async fn search_partial_hits_phase( ); if let Some(resource_stats) = &leaf_search_response.resource_stats { - if is_top_1pct_memory_intensive( + if is_top_5pct_memory_intensive( resource_stats.short_lived_cache_num_bytes, resource_stats.split_num_docs, ) { diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 0932e827a71..80a3509ea0e 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -129,7 +129,7 @@ async fn leaf_search_stream_single_split( let cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); - // TODO should create a SearchPermit we wrap with TrackedByteRangeCache here? + // TODO should create a SearchPermit and wrap ByteRangeCache with TrackedByteRangeCache here? let index = open_index_with_caches( &searcher_context, storage, From c3de281064deb42dc19554291500957b314baf0c Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 28 Nov 2024 14:40:29 +0100 Subject: [PATCH 12/12] Add test for permit resizing --- .../src/search_permit_provider.rs | 48 +++++++++++++------ quickwit/quickwit-search/src/tracked_cache.rs | 3 +- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index a9d8d9bb777..f396c42dd01 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -43,6 +43,7 @@ pub struct SearchPermitProvider { actor_stopped: watch::Receiver, } +#[derive(Debug)] pub enum SearchPermitMessage { Request { permit_sender: oneshot::Sender>, @@ -217,17 +218,18 @@ impl SearchPermit { /// After warm up, we have a proper estimate of the memory usage of a single /// split leaf search. We can thus set the actual memory usage and release /// the warmup slot. - pub fn warmup_completed(&mut self, new_memory_usage: u64) { - if new_memory_usage > self.memory_allocation { + pub fn warmup_completed(&mut self, new_memory_usage: ByteSize) { + let new_usage_bytes = new_memory_usage.as_u64(); + if new_usage_bytes > self.memory_allocation { warn!( - memory_usage = new_memory_usage, + memory_usage = new_usage_bytes, memory_allocation = self.memory_allocation, "current leaf search is consuming more memory than the initial allocation" ); } - let memory_delta = new_memory_usage as i64 - self.memory_allocation as i64; + let memory_delta = new_usage_bytes as i64 - self.memory_allocation as i64; self.warmup_permit_held = false; - self.memory_allocation = new_memory_usage; + self.memory_allocation = new_usage_bytes; self.send_if_still_running(SearchPermitMessage::WarmupCompleted { memory_delta }); } @@ -251,6 +253,7 @@ impl Drop for SearchPermit { } } +#[derive(Debug)] pub struct SearchPermitFuture(oneshot::Receiver); impl Future for SearchPermitFuture { @@ -356,23 +359,38 @@ mod tests { } } + /// Tries to wait for a permit + async fn try_get(permit_fut: SearchPermitFuture) -> anyhow::Result { + // using a short timeout is a bit flaky, but it should be enough for these tests + let permit = tokio::time::timeout(Duration::from_millis(20), permit_fut).await?; + Ok(permit) + } + #[tokio::test] async fn test_memory_permit() { let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100), ByteSize::mb(10)); - let mut permit_futs = permit_provider.get_permits(12).await; - let mut blocked_permits = permit_futs.split_off(10).into_iter(); + let mut permit_futs = permit_provider.get_permits(14).await; + let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); + assert_eq!(remaining_permit_futs.len(), 4); + // we should be able to obtain 10 permits right away (100MB / 10MB) let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) .buffered(1) .collect() .await; - let next_blocked_permit = blocked_permits.next().unwrap(); - tokio::time::timeout(Duration::from_millis(20), next_blocked_permit) - .await - .unwrap_err(); + // the next permit is blocked by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // if we drop one of the permits, we can get a new one permits.drain(0..1); - let next_blocked_permit = blocked_permits.next().unwrap(); - tokio::time::timeout(Duration::from_millis(20), next_blocked_permit) - .await - .unwrap(); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + let _new_permit = try_get(next_permit_fut).await.unwrap(); + // the next permit is blocked again by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // by setting a more accurate memory usage after a completed warmup, we can get more permits + permits[0].warmup_completed(ByteSize::mb(4)); + permits[1].warmup_completed(ByteSize::mb(6)); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_permit_fut).await.unwrap(); } } diff --git a/quickwit/quickwit-search/src/tracked_cache.rs b/quickwit/quickwit-search/src/tracked_cache.rs index 2eb947dc385..6529f38332b 100644 --- a/quickwit/quickwit-search/src/tracked_cache.rs +++ b/quickwit/quickwit-search/src/tracked_cache.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, Mutex}; +use bytesize::ByteSize; use quickwit_directories::DirectoryCache; use quickwit_storage::ByteRangeCache; @@ -50,7 +51,7 @@ impl TrackedByteRangeCache { .search_permit .lock() .unwrap() - .warmup_completed(self.get_num_bytes()); + .warmup_completed(ByteSize(self.get_num_bytes())); } pub fn get_num_bytes(&self) -> u64 {