diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 3eef1f10428..822fe86cb91 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,6 +226,8 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, + pub warmup_memory_budget: ByteSize, + pub warmup_single_split_initial_allocation: ByteSize, } /// Configuration controlling how fast a searcher should timeout a `get_slice` @@ -263,7 +265,7 @@ impl StorageTimeoutPolicy { impl Default for SearcherConfig { fn default() -> Self { - Self { + SearcherConfig { fast_field_cache_capacity: ByteSize::gb(1), split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), @@ -274,6 +276,8 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, + warmup_memory_budget: ByteSize::gb(100), + warmup_single_split_initial_allocation: ByteSize::gb(1), } } } @@ -308,6 +312,14 @@ impl SearcherConfig { split_cache_limits.max_file_descriptors ); } + if self.warmup_single_split_initial_allocation > self.warmup_memory_budget { + anyhow::bail!( + "warmup_single_split_initial_allocation ({}) must be lower or equal to \ + warmup_memory_budget ({})", + self.warmup_single_split_initial_allocation, + self.warmup_memory_budget + ); + } } Ok(()) } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 8a1337636cf..b208309af4c 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -616,7 +616,9 @@ mod tests { min_throughtput_bytes_per_secs: 100_000, timeout_millis: 2_000, max_num_retries: 2 - }) + }), + warmup_memory_budget: ByteSize::gb(100), + warmup_single_split_initial_allocation: ByteSize::gb(1), } ); assert_eq!( diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index b90f444d062..58d5ffd8028 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -33,21 +33,27 @@ use tantivy::{Directory, HasLen}; pub struct CachingDirectory { underlying: Arc, // TODO fixme: that's a pretty ugly cache we have here. - cache: Arc, + cache: ByteRangeCache, } impl CachingDirectory { /// Creates a new CachingDirectory. /// - /// Warming: The resulting CacheDirectory will cache all information without ever + /// Warning: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. pub fn new_unbounded(underlying: Arc) -> CachingDirectory { - CachingDirectory { - underlying, - cache: Arc::new(ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - )), - } + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + ); + CachingDirectory::new(underlying, byte_range_cache) + } + + /// Creates a new CachingDirectory. + /// + /// Warning: The resulting CacheDirectory will cache all information without ever + /// removing any item from the cache. + pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { + CachingDirectory { underlying, cache } } } @@ -59,7 +65,7 @@ impl fmt::Debug for CachingDirectory { struct CachingFileHandle { path: PathBuf, - cache: Arc, + cache: ByteRangeCache, underlying_filehandle: Arc, } diff --git a/quickwit/quickwit-directories/src/hot_directory.rs b/quickwit/quickwit-directories/src/hot_directory.rs index d217ac29851..a388ea75b51 100644 --- a/quickwit/quickwit-directories/src/hot_directory.rs +++ b/quickwit/quickwit-directories/src/hot_directory.rs @@ -205,14 +205,12 @@ impl StaticDirectoryCache { self.file_lengths.get(path).copied() } - /// return the files and their cached lengths - pub fn get_stats(&self) -> Vec<(PathBuf, usize)> { + pub fn get_file_lengths(&self) -> Vec<(PathBuf, u64)> { let mut entries = self - .slices + .file_lengths .iter() - .map(|(path, cache)| (path.to_owned(), cache.len())) + .map(|(path, len)| (path.clone(), *len)) .collect::>(); - entries.sort_by_key(|el| el.0.to_owned()); entries } @@ -265,10 +263,6 @@ impl StaticSliceCache { } None } - - pub fn len(&self) -> usize { - self.bytes.len() - } } struct StaticSliceCacheBuilder { @@ -376,12 +370,12 @@ impl HotDirectory { }), }) } - /// Get files and their cached sizes. - pub fn get_stats_per_file( - hot_cache_bytes: OwnedBytes, - ) -> anyhow::Result> { - let static_cache = StaticDirectoryCache::open(hot_cache_bytes)?; - Ok(static_cache.get_stats()) + + /// Get all the files in the directory and their sizes. + /// + /// The actual cached data is a very small fraction of this length. + pub fn get_file_lengths(&self) -> Vec<(PathBuf, u64)> { + self.inner.cache.get_file_lengths() } } @@ -704,10 +698,10 @@ mod tests { assert_eq!(directory_cache.get_file_length(three_path), Some(300)); assert_eq!(directory_cache.get_file_length(four_path), None); - let stats = directory_cache.get_stats(); - assert_eq!(stats[0], (one_path.to_owned(), 8)); - assert_eq!(stats[1], (three_path.to_owned(), 0)); - assert_eq!(stats[2], (two_path.to_owned(), 7)); + let file_lengths = directory_cache.get_file_lengths(); + assert_eq!(file_lengths[0], (one_path.to_owned(), 100)); + assert_eq!(file_lengths[1], (three_path.to_owned(), 300)); + assert_eq!(file_lengths[2], (two_path.to_owned(), 200)); assert_eq!( directory_cache diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 60671239ecc..1213ce2040e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -347,6 +347,14 @@ message LeafSearchRequest { repeated string index_uris = 9; } +message ResourceStats { + uint64 short_lived_cache_num_bytes = 1; + uint64 split_num_docs = 2; + uint64 warmup_microsecs = 3; + uint64 cpu_thread_pool_wait_microsecs = 4; + uint64 cpu_microsecs = 5; +} + /// LeafRequestRef references data in LeafSearchRequest to deduplicate data. message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` @@ -479,6 +487,8 @@ message LeafSearchResponse { // postcard serialized intermediate aggregation_result. optional bytes intermediate_aggregation_result = 6; + + ResourceStats resource_stats = 8; } message SnippetRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 3fc4d5bdcaa..e29cae37fec 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -286,6 +286,21 @@ pub struct LeafSearchRequest { #[prost(string, repeated, tag = "9")] pub index_uris: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceStats { + #[prost(uint64, tag = "1")] + pub short_lived_cache_num_bytes: u64, + #[prost(uint64, tag = "2")] + pub split_num_docs: u64, + #[prost(uint64, tag = "3")] + pub warmup_microsecs: u64, + #[prost(uint64, tag = "4")] + pub cpu_thread_pool_wait_microsecs: u64, + #[prost(uint64, tag = "5")] + pub cpu_microsecs: u64, +} /// / LeafRequestRef references data in LeafSearchRequest to deduplicate data. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -457,6 +472,8 @@ pub struct LeafSearchResponse { pub intermediate_aggregation_result: ::core::option::Option< ::prost::alloc::vec::Vec, >, + #[prost(message, optional, tag = "8")] + pub resource_stats: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index d32ad92327c..32f375ca06c 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn}; use crate::retry::search::LeafSearchRetryPolicy; use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds}; use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy}; -use crate::{SearchError, SearchJobPlacer, SearchServiceClient}; +use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient}; /// Maximum number of put requests emitted to perform a replicated given PUT KV. const MAX_PUT_KV_ATTEMPTS: usize = 6; @@ -317,6 +317,10 @@ fn merge_original_with_retry_leaf_search_response( (Some(left), None) => Some(left), (None, None) => None, }; + let resource_stats = merge_resource_stats_it([ + &original_response.resource_stats, + &retry_response.resource_stats, + ]); Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: original_response.num_hits + retry_response.num_hits, @@ -326,6 +330,7 @@ fn merge_original_with_retry_leaf_search_response( partial_hits: original_response.partial_hits, num_successful_splits: original_response.num_successful_splits + retry_response.num_successful_splits, + resource_stats, }) } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 4b69348ecde..67beb8090cb 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -25,8 +25,8 @@ use itertools::Itertools; use quickwit_common::binary_heap::{SortKeyMapper, TopK}; use quickwit_doc_mapper::WarmupInfo; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue, - SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortOrder, + SortValue, SplitSearchError, }; use quickwit_proto::types::SplitId; use serde::Deserialize; @@ -40,7 +40,7 @@ use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyErro use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span}; use crate::top_k_collector::{specialized_top_k_segment_collector, QuickwitSegmentTopKCollector}; -use crate::GlobalDocAddress; +use crate::{merge_resource_stats, merge_resource_stats_it, GlobalDocAddress}; #[derive(Clone, Debug)] pub(crate) enum SortByComponent { @@ -587,6 +587,7 @@ impl SegmentCollector for QuickwitSegmentCollector { } None => None, }; + Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: self.num_hits, @@ -594,6 +595,7 @@ impl SegmentCollector for QuickwitSegmentCollector { failed_splits: Vec::new(), num_attempted_splits: 1, num_successful_splits: 1, + resource_stats: None, }) } } @@ -919,6 +921,11 @@ fn merge_leaf_responses( return Ok(leaf_responses.pop().unwrap()); } + let resource_stats_it = leaf_responses + .iter() + .map(|leaf_response| &leaf_response.resource_stats); + let merged_resource_stats = merge_resource_stats_it(resource_stats_it); + let merged_intermediate_aggregation_result: Option> = merge_intermediate_aggregation_result( aggregations_opt, @@ -960,6 +967,7 @@ fn merge_leaf_responses( failed_splits, num_attempted_splits, num_successful_splits, + resource_stats: merged_resource_stats, }) } @@ -1183,6 +1191,7 @@ pub(crate) struct IncrementalCollector { num_attempted_splits: u64, num_successful_splits: u64, start_offset: usize, + resource_stats: Option, } impl IncrementalCollector { @@ -1203,6 +1212,7 @@ impl IncrementalCollector { failed_splits: Vec::new(), num_attempted_splits: 0, num_successful_splits: 0, + resource_stats: None, } } @@ -1215,8 +1225,11 @@ impl IncrementalCollector { num_attempted_splits, intermediate_aggregation_result, num_successful_splits, + resource_stats, } = leaf_response; + merge_resource_stats(&resource_stats, &mut self.resource_stats); + self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); self.failed_splits.extend(failed_splits); @@ -1266,6 +1279,7 @@ impl IncrementalCollector { num_attempted_splits: self.num_attempted_splits, num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, + resource_stats: self.resource_stats, }) } } @@ -1275,8 +1289,8 @@ mod tests { use std::cmp::Ordering; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortField, SortOrder, - SortValue, SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortField, + SortOrder, SortValue, SplitSearchError, }; use tantivy::collector::Collector; use tantivy::TantivyDocument; @@ -1772,6 +1786,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }], ); @@ -1789,7 +1804,8 @@ mod tests { failed_splits: Vec::new(), num_attempted_splits: 3, num_successful_splits: 3, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1828,6 +1844,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }, LeafSearchResponse { num_hits: 10, @@ -1846,6 +1863,7 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, }, ], ); @@ -1877,7 +1895,8 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1917,6 +1936,10 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 100, + ..Default::default() + }), }, LeafSearchResponse { num_hits: 10, @@ -1935,6 +1958,10 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 50, + ..Default::default() + }), }, ], ); @@ -1966,7 +1993,11 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 150, + ..Default::default() + }), } ); // TODO would be nice to test aggregation too. diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 9c326764539..d75f7efff0c 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -174,12 +174,12 @@ async fn fetch_docs_in_split( global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful // when fetching docs as we will fetch them only once. - let mut index = open_index_with_caches( + let (mut index, _) = open_index_with_caches( &searcher_context, index_storage, split, Some(doc_mapper.tokenizer_manager()), - false, + None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 5ad92f63aa2..236149ca038 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -22,35 +22,37 @@ use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; use anyhow::Context; +use bytesize::ByteSize; use futures::future::try_join_all; use quickwit_common::pretty::PrettySample; use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, - SortValue, SplitIdAndFooterOffsets, SplitSearchError, + CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, + SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ - wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, - StorageResolver, TimeoutAndRetryStorage, + wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes, + SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage, }; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; use tantivy::aggregation::AggregationLimitsGuard; use tantivy::directory::FileSlice; use tantivy::fastfield::FastFieldReaders; use tantivy::schema::Field; -use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; +use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term}; use tokio::task::JoinError; use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; -use crate::search_permit_provider::SearchPermit; +use crate::search_permit_provider::{compute_initial_memory_allocation, SearchPermit}; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -124,33 +126,39 @@ pub(crate) async fn open_split_bundle( Ok((hotcache_bytes, bundle_storage)) } +/// Add a storage proxy to retry `get_slice` requests if they are taking too long, +/// if configured in the searcher config. +/// +/// The goal here is too ensure a low latency. +fn configure_storage_retries( + searcher_context: &SearcherContext, + index_storage: Arc, +) -> Arc { + if let Some(storage_timeout_policy) = &searcher_context.searcher_config.storage_timeout_policy { + Arc::new(TimeoutAndRetryStorage::new( + index_storage, + storage_timeout_policy.clone(), + )) + } else { + index_storage + } +} + /// Opens a `tantivy::Index` for the given split with several cache layers: /// - A split footer cache given by `SearcherContext.split_footer_cache`. /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. +/// - An ephemeral unbounded cache directory (whose lifetime is tied to the +/// returned `Index` if no `ByteRangeCache` is provided). #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, -) -> anyhow::Result { - // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, - // if configured in the searcher config. - // - // The goal here is too ensure a low latency. - - let index_storage_with_retry_on_timeout = if let Some(storage_timeout_policy) = - &searcher_context.searcher_config.storage_timeout_policy - { - Arc::new(TimeoutAndRetryStorage::new( - index_storage, - storage_timeout_policy.clone(), - )) - } else { - index_storage - }; + ephemeral_unbounded_cache: Option, +) -> anyhow::Result<(Index, HotDirectory)> { + let index_storage_with_retry_on_timeout = + configure_storage_retries(searcher_context, index_storage); let (hotcache_bytes, bundle_storage) = open_split_bundle( searcher_context, @@ -166,14 +174,14 @@ pub(crate) async fn open_index_with_caches( let directory = StorageDirectory::new(bundle_storage_with_cache); - let hot_directory = if ephemeral_unbounded_cache { - let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); + let hot_directory = if let Some(cache) = ephemeral_unbounded_cache { + let caching_directory = CachingDirectory::new(Arc::new(directory), cache); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? }; - let mut index = Index::open(hot_directory)?; + let mut index = Index::open(hot_directory.clone())?; if let Some(tokenizer_manager) = tokenizer_manager { index.set_tokenizers(tokenizer_manager.tantivy_manager().clone()); } @@ -182,7 +190,7 @@ pub(crate) async fn open_index_with_caches( .tantivy_manager() .clone(), ); - Ok(index) + Ok((index, hot_directory)) } /// Tantivy search does not make it possible to fetch data asynchronously during @@ -363,10 +371,23 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { num_attempted_splits: 1, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, } } +/// Compute the size of the index, store excluded. +fn compute_index_size(hot_directory: &HotDirectory) -> ByteSize { + let size_bytes = hot_directory + .get_file_lengths() + .iter() + .filter(|(path, _)| !path.to_string_lossy().ends_with("store")) + .map(|(_, size)| *size) + .sum(); + ByteSize(size_bytes) +} + /// Apply a leaf search on a single split. +#[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( searcher_context: &SearcherContext, mut search_request: SearchRequest, @@ -375,6 +396,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, + search_permit: &mut SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -400,15 +422,21 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); - let index = open_index_with_caches( + let byte_range_cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let (index, hot_directory) = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(byte_range_cache.clone()), ) .await?; - let split_schema = index.schema(); + + let index_size = compute_index_size(&hot_directory); + if index_size < search_permit.memory_allocation() { + search_permit.update_memory_usage(index_size); + } let reader = index .reader_builder() @@ -419,13 +447,33 @@ async fn leaf_search_single_split( let mut collector = make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; + let split_schema = index.schema(); let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; let collector_warmup_info = collector.warmup_info(); warmup_info.merge(collector_warmup_info); warmup_info.simplify(); + let warmup_start = Instant::now(); warmup(&searcher, &warmup_info).await?; + let warmup_end = Instant::now(); + let warmup_duration: Duration = warmup_end.duration_since(warmup_start); + let warmup_size = ByteSize(byte_range_cache.get_num_bytes()); + if warmup_size > search_permit.memory_allocation() { + warn!( + memory_usage = ?warmup_size, + memory_allocation = ?search_permit.memory_allocation(), + "current leaf search is consuming more memory than the initial allocation" + ); + } + crate::SEARCH_METRICS + .leaf_search_single_split_warmup_num_bytes + .observe(warmup_size.as_u64() as f64); + search_permit.update_memory_usage(warmup_size); + search_permit.free_warmup_slot(); + + let split_num_docs = split.num_docs; + let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { @@ -433,25 +481,31 @@ async fn leaf_search_single_split( crate::search_thread_pool() .run_cpu_intensive(move || { + let cpu_start = Instant::now(); + let cpu_thread_pool_wait_microsecs = cpu_start.duration_since(warmup_end); let _span_guard = span.enter(); // Our search execution has been scheduled, let's check if we can improve the // request based on the results of the preceding searches check_optimize_search_request(&mut search_request, &split, &split_filter); collector.update_search_param(&search_request); - if is_metadata_count_request_with_ast(&query_ast, &search_request) { - return Ok(( - search_request, - get_leaf_resp_from_count(searcher.num_docs() as u64), - )); - } - if collector.is_count_only() { - let count = query.count(&searcher)? as u64; - Ok((search_request, get_leaf_resp_from_count(count))) - } else { - searcher - .search(&query, &collector) - .map(|resp| (search_request, resp)) - } + let mut leaf_search_response: LeafSearchResponse = + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + get_leaf_resp_from_count(searcher.num_docs()) + } else if collector.is_count_only() { + let count = query.count(&searcher)? as u64; + get_leaf_resp_from_count(count) + } else { + searcher.search(&query, &collector)? + }; + leaf_search_response.resource_stats = Some(ResourceStats { + cpu_microsecs: cpu_start.elapsed().as_micros() as u64, + short_lived_cache_num_bytes: warmup_size.as_u64(), + split_num_docs, + warmup_microsecs: warmup_duration.as_micros() as u64, + cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros() + as u64, + }); + Result::<_, TantivyError>::Ok((search_request, leaf_search_response)) }) .await .map_err(|_| { @@ -1261,17 +1315,25 @@ pub async fn leaf_search( // We acquire all of the leaf search permits to make sure our single split search tasks // do no interleave with other leaf search requests. + let permit_sizes = split_with_req.iter().map(|(split, _)| { + compute_initial_memory_allocation( + split, + searcher_context + .searcher_config + .warmup_single_split_initial_allocation, + ) + }); let permit_futures = searcher_context .search_permit_provider - .get_permits(split_with_req.len()); + .get_permits(permit_sizes) + .await; for ((split, mut request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) { let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + .await; let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); if !can_be_better && !run_all_splits { @@ -1361,7 +1423,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - search_permit: SearchPermit, + mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1376,10 +1438,12 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, + &mut search_permit, ) .await; - // We explicitly drop it, to highlight it to the reader + // Explicitly drop the permit for readability. + // This should always happen after the ephemeral search cache is dropped. std::mem::drop(search_permit); if leaf_search_single_split_res.is_ok() { @@ -1417,6 +1481,15 @@ async fn leaf_search_single_split_wrapper( mod tests { use std::ops::Bound; + use bytes::BufMut; + use quickwit_directories::write_hotcache; + use rand::{thread_rng, Rng}; + use tantivy::directory::RamDirectory; + use tantivy::schema::{ + BytesOptions, FieldEntry, Schema, TextFieldIndexing, TextOptions, Value, + }; + use tantivy::TantivyDocument; + use super::*; fn bool_filter(ast: impl Into) -> QueryAst { @@ -1852,4 +1925,97 @@ mod tests { assert_eq!(rewrote_bounds_agg, no_bounds_agg); } } + + fn create_tantivy_dir_with_hotcache<'a, V>( + field_entry: FieldEntry, + field_value: V, + ) -> (HotDirectory, usize) + where + V: Value<'a>, + { + let field_name = field_entry.name().to_string(); + let mut schema_builder = Schema::builder(); + schema_builder.add_field(field_entry); + let schema = schema_builder.build(); + + let ram_directory = RamDirectory::create(); + let index = Index::open_or_create(ram_directory.clone(), schema.clone()).unwrap(); + + let mut index_writer = index.writer(15_000_000).unwrap(); + let field = schema.get_field(&field_name).unwrap(); + let mut new_doc = TantivyDocument::default(); + new_doc.add_field_value(field, field_value); + index_writer.add_document(new_doc).unwrap(); + index_writer.commit().unwrap(); + + let mut hotcache_bytes_writer = Vec::new().writer(); + write_hotcache(ram_directory.clone(), &mut hotcache_bytes_writer).unwrap(); + let hotcache_bytes = OwnedBytes::new(hotcache_bytes_writer.into_inner()); + let hot_directory = HotDirectory::open(ram_directory.clone(), hotcache_bytes).unwrap(); + (hot_directory, ram_directory.total_mem_usage()) + } + + #[test] + fn test_compute_index_size_without_store() { + // We don't want to make assertions on absolute index sizes (it might + // change in future Tantivy versions), but rather verify that the store + // is properly excluded from the computed size. + + // We use random bytes so that the store can't compress them + let mut payload = vec![0u8; 1024]; + thread_rng().fill(&mut payload[..]); + + let (hotcache_directory_stored_payload, directory_size_stored_payload) = + create_tantivy_dir_with_hotcache( + FieldEntry::new_bytes("payload".to_string(), BytesOptions::default().set_stored()), + &payload, + ); + let size_with_stored_payload = + compute_index_size(&hotcache_directory_stored_payload).as_u64(); + + let (hotcache_directory_index_only, directory_size_index_only) = + create_tantivy_dir_with_hotcache( + FieldEntry::new_bytes("payload".to_string(), BytesOptions::default()), + &payload, + ); + let size_index_only = compute_index_size(&hotcache_directory_index_only).as_u64(); + + assert!(directory_size_stored_payload > directory_size_index_only + 1000); + assert!(size_with_stored_payload.abs_diff(size_index_only) < 10); + } + + #[test] + fn test_compute_index_size_varies_with_data() { + // We don't want to make assertions on absolute index sizes (it might + // change in future Tantivy versions), but rather verify that an index + // with more data is indeed bigger. + + let indexing_options = + TextOptions::default().set_indexing_options(TextFieldIndexing::default()); + + let (hotcache_directory_larger, directory_size_larger) = create_tantivy_dir_with_hotcache( + FieldEntry::new_text("text".to_string(), indexing_options.clone()), + "Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium \ + doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore \ + veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam \ + voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur \ + magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, \ + qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non \ + numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat \ + voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis \ + suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum \ + iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, \ + vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?", + ); + let larger_size = compute_index_size(&hotcache_directory_larger).as_u64(); + + let (hotcache_directory_smaller, directory_size_smaller) = create_tantivy_dir_with_hotcache( + FieldEntry::new_text("text".to_string(), indexing_options), + "hi", + ); + let smaller_size = compute_index_size(&hotcache_directory_smaller).as_u64(); + + assert!(directory_size_larger > directory_size_smaller + 100); + assert!(larger_size > smaller_size + 100); + } } diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 491f66f3aee..016cdd5b00f 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -192,7 +192,8 @@ impl std::ops::RangeBounds for Range { #[cfg(test)] mod tests { use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortValue, SplitIdAndFooterOffsets, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortValue, + SplitIdAndFooterOffsets, }; use super::LeafSearchCache; @@ -252,6 +253,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: None, }; assert!(cache.get(split_1.clone(), query_1.clone()).is_none()); @@ -342,6 +344,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: Some(ResourceStats::default()), }; // for split_1, 1 and 1bis cover different timestamp ranges diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index a81a974d75d..b7c03a0c5ea 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -72,7 +72,9 @@ use quickwit_metastore::{ IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, }; -use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; +use quickwit_proto::search::{ + PartialHit, ResourceStats, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, +}; use quickwit_proto::types::IndexUid; use quickwit_storage::StorageResolver; pub use service::SearcherContext; @@ -340,3 +342,126 @@ pub fn searcher_pool_for_test( }), ) } + +pub(crate) fn merge_resource_stats_it<'a>( + stats_it: impl IntoIterator>, +) -> Option { + let mut acc_stats: Option = None; + for new_stats in stats_it { + merge_resource_stats(new_stats, &mut acc_stats); + } + acc_stats +} + +fn merge_resource_stats( + new_stats_opt: &Option, + stat_accs_opt: &mut Option, +) { + if let Some(new_stats) = new_stats_opt { + if let Some(stat_accs) = stat_accs_opt { + stat_accs.short_lived_cache_num_bytes += new_stats.short_lived_cache_num_bytes; + stat_accs.split_num_docs += new_stats.split_num_docs; + stat_accs.warmup_microsecs += new_stats.warmup_microsecs; + stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs; + stat_accs.cpu_microsecs += new_stats.cpu_microsecs; + } else { + *stat_accs_opt = Some(new_stats.clone()); + } + } +} +#[cfg(test)] +mod stats_merge_tests { + use super::*; + + #[test] + fn test_merge_resource_stats() { + let mut acc_stats = None; + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, None); + + let stats = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + merge_resource_stats(&stats, &mut acc_stats); + + assert_eq!(acc_stats, stats); + + let new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + merge_resource_stats(&new_stats, &mut acc_stats); + + let stats_plus_new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 150, + split_num_docs: 300, + warmup_microsecs: 450, + cpu_thread_pool_wait_microsecs: 600, + cpu_microsecs: 750, + }); + + assert_eq!(acc_stats, stats_plus_new_stats); + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, stats_plus_new_stats); + } + + #[test] + fn test_merge_resource_stats_it() { + let merged_stats = merge_resource_stats_it(Vec::<&Option>::new()); + assert_eq!(merged_stats, None); + + let stats1 = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + let merged_stats = merge_resource_stats_it(vec![&None, &stats1, &None]); + + assert_eq!(merged_stats, stats1); + + let stats2 = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + let stats3 = Some(ResourceStats { + short_lived_cache_num_bytes: 25, + split_num_docs: 50, + warmup_microsecs: 75, + cpu_thread_pool_wait_microsecs: 100, + cpu_microsecs: 125, + }); + + let merged_stats = merge_resource_stats_it(vec![&stats1, &stats2, &stats3]); + + assert_eq!( + merged_stats, + Some(ResourceStats { + short_lived_cache_num_bytes: 175, + split_num_docs: 350, + warmup_microsecs: 525, + cpu_thread_pool_wait_microsecs: 700, + cpu_microsecs: 875, + }) + ); + } +} diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 765203438d1..f796252c125 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -33,13 +33,14 @@ use quickwit_proto::search::{ SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_proto::types::IndexUid; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::schema::{Field, FieldType}; use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_permit_provider::compute_initial_memory_allocation; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -216,7 +217,10 @@ async fn leaf_list_terms_single_split( storage: Arc, split: SplitIdAndFooterOffsets, ) -> crate::Result { - let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let (index, _) = + open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; let split_schema = index.schema(); let reader = index .reader_builder() @@ -325,18 +329,26 @@ pub async fn leaf_list_terms( splits: &[SplitIdAndFooterOffsets], ) -> Result { info!(split_offsets = ?PrettySample::new(splits, 5)); + let permit_sizes = splits.iter().map(|split| { + compute_initial_memory_allocation( + split, + searcher_context + .searcher_config + .warmup_single_split_initial_allocation, + ) + }); + let permits = searcher_context + .search_permit_provider + .get_permits(permit_sizes) + .await; let leaf_search_single_split_futures: Vec<_> = splits .iter() - .map(|split| { + .zip(permits.into_iter()) + .map(|(split, search_permit_recv)| { let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = searcher_context_clone - .search_permit_provider - .get_permit() - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); - + let leaf_split_search_permit = search_permit_recv.await; // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS @@ -350,6 +362,11 @@ pub async fn leaf_list_terms( ) .await; timer.observe_duration(); + + // Explicitly drop the permit for readability. + // This should always happen after the ephemeral search cache is dropped. + std::mem::drop(leaf_split_search_permit); + leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err)) } }) diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 35b7d3115c5..55bff88a565 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -19,6 +19,7 @@ // See https://prometheus.io/docs/practices/naming/ +use bytesize::ByteSize; use once_cell::sync::Lazy; use quickwit_common::metrics::{ exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec, @@ -37,6 +38,7 @@ pub struct SearchMetrics { pub job_assigned_total: IntCounterVec<1>, pub leaf_search_single_split_tasks_pending: IntGauge, pub leaf_search_single_split_tasks_ongoing: IntGauge, + pub leaf_search_single_split_warmup_num_bytes: Histogram, } impl Default for SearchMetrics { @@ -52,6 +54,18 @@ impl Default for SearchMetrics { .copied() .collect(); + let pseudo_exponential_bytes_buckets = vec![ + ByteSize::mb(10).as_u64() as f64, + ByteSize::mb(20).as_u64() as f64, + ByteSize::mb(50).as_u64() as f64, + ByteSize::mb(100).as_u64() as f64, + ByteSize::mb(200).as_u64() as f64, + ByteSize::mb(500).as_u64() as f64, + ByteSize::gb(1).as_u64() as f64, + ByteSize::gb(2).as_u64() as f64, + ByteSize::gb(5).as_u64() as f64, + ]; + let leaf_search_single_split_tasks = new_gauge_vec::<1>( "leaf_search_single_split_tasks", "Number of single split search tasks pending or ongoing", @@ -124,6 +138,12 @@ impl Default for SearchMetrics { .with_label_values(["ongoing"]), leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks .with_label_values(["pending"]), + leaf_search_single_split_warmup_num_bytes: new_histogram( + "leaf_search_single_split_warmup_num_bytes", + "Size of the short lived cache for a single split once the warmup is done.", + "search", + pseudo_exponential_bytes_buckets, + ), job_assigned_total: new_counter_vec( "job_assigned_total", "Number of job assigned to searchers, per affinity rank.", diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 608bc87e479..724687148f2 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; use std::time::Duration; @@ -49,7 +50,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, info, info_span, instrument}; +use tracing::{debug, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -683,10 +684,46 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec bool { + // It is not worth considering small splits for this. + if split_num_docs < 100_000 { + return false; + } + // We multiply those figure by 1_000 for accuracy. + const PERCENTILE: u64 = 95; + const PRIOR_NUM_BYTES_PER_DOC: u64 = 3 * 1_000; + static NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR: AtomicU64 = + AtomicU64::new(PRIOR_NUM_BYTES_PER_DOC); + let num_bits_per_docs = num_bytes * 1_000 / split_num_docs; + let current_estimator = NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.load(Ordering::Relaxed); + let is_memory_intensive = num_bits_per_docs > current_estimator; + let new_estimator: u64 = if is_memory_intensive { + current_estimator.saturating_add(PRIOR_NUM_BYTES_PER_DOC * PERCENTILE / 100) + } else { + current_estimator.saturating_sub(PRIOR_NUM_BYTES_PER_DOC * (100 - PERCENTILE) / 100) + }; + // We do not use fetch_add / fetch_sub directly as they wrap around. + // Concurrency could lead to different results here, but really we don't care. + // + // This is just ignoring some gradient updates. + NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.store(new_estimator, Ordering::Relaxed); + is_memory_intensive +} + /// If this method fails for some splits, a partial search response is returned, with the list of /// faulty splits in the failed_splits field. #[instrument(level = "debug", skip_all)] @@ -744,9 +781,21 @@ pub(crate) async fn search_partial_hits_phase( has_intermediate_aggregation_result = leaf_search_response.intermediate_aggregation_result.is_some(), "Merged leaf search response." ); + + if let Some(resource_stats) = &leaf_search_response.resource_stats { + if is_top_5pct_memory_intensive( + resource_stats.short_lived_cache_num_bytes, + resource_stats.split_num_docs, + ) { + // We log at most 5 times per minute. + quickwit_common::rate_limited_info!(limit_per_min=5, split_num_docs=resource_stats.split_num_docs, %search_request.query_ast, short_lived_cached_num_bytes=resource_stats.short_lived_cache_num_bytes, query=%search_request.query_ast, "memory intensive query"); + } + } + if !leaf_search_response.failed_splits.is_empty() { quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } + Ok(leaf_search_response) } @@ -1114,7 +1163,6 @@ pub async fn root_search( mut metastore: MetastoreServiceClient, cluster_client: &ClusterClient, ) -> crate::Result { - info!(searcher_context = ?searcher_context, search_request = ?search_request); let start_instant = tokio::time::Instant::now(); let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1169,9 +1217,12 @@ pub async fn root_search( ) .await; + let elapsed = start_instant.elapsed(); + if let Ok(search_response) = &mut search_response_result { - search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + search_response.elapsed_time_micros = elapsed.as_micros() as u64; } + let label_values = if search_response_result.is_ok() { ["success"] } else { diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index f6883efb34b..64bc36ff3a6 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -18,109 +18,221 @@ // along with this program. If not, see . use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; -use tokio::sync::oneshot; +use quickwit_proto::search::SplitIdAndFooterOffsets; +#[cfg(test)] +use tokio::sync::watch; +use tokio::sync::{mpsc, oneshot}; -/// `SearchPermitProvider` is a distributor of permits to perform single split -/// search operation. +/// Distributor of permits to perform split search operation. /// -/// Requests are served in order. +/// Requests are served in order. Each permit initially reserves a slot for the +/// warmup (limit concurrent downloads) and a pessimistic amount of memory. Once +/// the warmup is completed, the actual memory usage is set and the warmup slot +/// is released. Once the search is completed and the permit is dropped, the +/// remaining memory is also released. #[derive(Clone)] pub struct SearchPermitProvider { - inner_arc: Arc>, + message_sender: mpsc::UnboundedSender, + #[cfg(test)] + actor_stopped: watch::Receiver, +} + +#[derive(Debug)] +pub enum SearchPermitMessage { + Request { + permit_sender: oneshot::Sender>, + permit_sizes: Vec, + }, + UpdateMemory { + memory_delta: i64, + }, + FreeWarmupSlot, + Drop { + memory_size: u64, + warmup_slot_freed: bool, + }, +} + +/// Makes very pessimistic estimate of the memory allocation required for a split search +/// +/// This is refined later on when more data is available about the split. +pub fn compute_initial_memory_allocation( + split: &SplitIdAndFooterOffsets, + warmup_single_split_initial_allocation: ByteSize, +) -> ByteSize { + let split_size = split.split_footer_start; + // we consider the configured initial allocation to be set for a large split with 10M docs + const LARGE_SPLIT_NUM_DOCS: u64 = 10_000_000; + let proportional_allocation = + warmup_single_split_initial_allocation.as_u64() * split.num_docs / LARGE_SPLIT_NUM_DOCS; + let size_bytes = [ + split_size, + proportional_allocation, + warmup_single_split_initial_allocation.as_u64(), + ] + .into_iter() + .min() + .unwrap(); + const MINIMUM_ALLOCATION_BYTES: u64 = 10_000_000; + ByteSize(size_bytes.max(MINIMUM_ALLOCATION_BYTES)) } impl SearchPermitProvider { - pub fn new(num_permits: usize) -> SearchPermitProvider { - SearchPermitProvider { - inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { - num_permits_available: num_permits, - permits_requests: VecDeque::new(), - })), + pub fn new(num_download_slots: usize, memory_budget: ByteSize) -> Self { + let (message_sender, message_receiver) = mpsc::unbounded_channel(); + #[cfg(test)] + let (state_sender, state_receiver) = watch::channel(false); + let actor = SearchPermitActor { + msg_receiver: message_receiver, + msg_sender: message_sender.downgrade(), + num_warmup_slots_available: num_download_slots, + total_memory_budget: memory_budget.as_u64(), + permits_requests: VecDeque::new(), + total_memory_allocated: 0u64, + #[cfg(test)] + stopped: state_sender, + }; + tokio::spawn(actor.run()); + Self { + message_sender, + #[cfg(test)] + actor_stopped: state_receiver, } } - /// Returns a future permit in the form of a oneshot Receiver channel. + /// Returns one permit future for each provided split metadata. /// - /// At this point the permit is not acquired yet. - #[must_use] - pub fn get_permit(&self) -> oneshot::Receiver { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permit(&self.inner_arc) - } - - /// Returns a list of future permits in the form of oneshot Receiver channels. + /// The permits returned are guaranteed to be resolved in order. In + /// addition, the permits are guaranteed to be resolved before permits + /// returned by subsequent calls to this function. /// - /// The permits returned are guaranteed to be resolved in order. - /// In addition, the permits are guaranteed to be resolved before permits returned by - /// subsequent calls to this function (or `get_permit`). - #[must_use] - pub fn get_permits(&self, num_permits: usize) -> Vec> { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permits(num_permits, &self.inner_arc) + /// The permit memory size is capped by per_permit_initial_memory_allocation. + pub async fn get_permits( + &self, + splits: impl IntoIterator, + ) -> Vec { + let (permit_sender, permit_receiver) = oneshot::channel(); + let permit_sizes = splits.into_iter().map(|size| size.as_u64()).collect(); + self.message_sender + .send(SearchPermitMessage::Request { + permit_sender, + permit_sizes, + }) + .expect("Receiver lives longer than sender"); + permit_receiver + .await + .expect("Receiver lives longer than sender") } } -struct InnerSearchPermitProvider { - num_permits_available: usize, - permits_requests: VecDeque>, +struct SearchPermitActor { + msg_receiver: mpsc::UnboundedReceiver, + msg_sender: mpsc::WeakUnboundedSender, + num_warmup_slots_available: usize, + /// Note it is possible for memory_allocated to exceed memory_budget temporarily, + /// if and only if a split leaf search task ended up using more than `initial_allocation`. + /// When it happens, new permits will not be assigned until the memory is freed. + total_memory_budget: u64, + total_memory_allocated: u64, + permits_requests: VecDeque<(oneshot::Sender, u64)>, + #[cfg(test)] + stopped: watch::Sender, } -impl InnerSearchPermitProvider { - fn get_permit( - &mut self, - inner_arc: &Arc>, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - self.assign_available_permits(inner_arc); - rx - } - - fn get_permits( - &mut self, - num_permits: usize, - inner_arc: &Arc>, - ) -> Vec> { - let mut permits = Vec::with_capacity(num_permits); - for _ in 0..num_permits { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - permits.push(rx); +impl SearchPermitActor { + async fn run(mut self) { + // Stops when the last clone of SearchPermitProvider is dropped. + while let Some(msg) = self.msg_receiver.recv().await { + self.handle_message(msg); + } + #[cfg(test)] + self.stopped.send(true).ok(); + } + + fn handle_message(&mut self, msg: SearchPermitMessage) { + match msg { + SearchPermitMessage::Request { + permit_sizes, + permit_sender, + } => { + let mut permits = Vec::with_capacity(permit_sizes.len()); + for permit_size in permit_sizes { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back((tx, permit_size)); + permits.push(SearchPermitFuture(rx)); + } + self.assign_available_permits(); + permit_sender + .send(permits) + // This is a request response pattern, so we can safely ignore the error. + .expect("Receiver lives longer than sender"); + } + SearchPermitMessage::UpdateMemory { memory_delta } => { + if self.total_memory_allocated as i64 + memory_delta < 0 { + panic!("More memory released than allocated, should never happen.") + } + self.total_memory_allocated = + (self.total_memory_allocated as i64 + memory_delta) as u64; + self.assign_available_permits(); + } + SearchPermitMessage::FreeWarmupSlot => { + self.num_warmup_slots_available += 1; + self.assign_available_permits(); + } + SearchPermitMessage::Drop { + memory_size, + warmup_slot_freed, + } => { + if !warmup_slot_freed { + self.num_warmup_slots_available += 1; + } + self.total_memory_allocated = self + .total_memory_allocated + .checked_sub(memory_size) + .expect("More memory released than allocated, should never happen."); + self.assign_available_permits(); + } } - self.assign_available_permits(inner_arc); - permits } - fn recycle_permit(&mut self, inner_arc: &Arc>) { - self.num_permits_available += 1; - self.assign_available_permits(inner_arc); + fn pop_next_request_if_serviceable(&mut self) -> Option<(oneshot::Sender, u64)> { + if self.num_warmup_slots_available == 0 { + return None; + } + if let Some((_, next_permit_size)) = self.permits_requests.front() { + if self.total_memory_allocated + next_permit_size <= self.total_memory_budget { + return self.permits_requests.pop_front(); + } + } + None } - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_permits_available > 0 { - let Some(sender) = self.permits_requests.pop_front() else { - break; - }; + fn assign_available_permits(&mut self) { + while let Some((permit_requester_tx, next_permit_size)) = + self.pop_next_request_if_serviceable() + { let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = sender.send(SearchPermit { - _ongoing_gauge_guard: ongoing_gauge_guard, - inner_arc: inner_arc.clone(), - recycle_on_drop: true, - }); - match send_res { - Ok(()) => { - self.num_permits_available -= 1; - } - Err(search_permit) => { - search_permit.drop_without_recycling_permit(); - } - } + self.total_memory_allocated += next_permit_size; + self.num_warmup_slots_available -= 1; + permit_requester_tx + .send(SearchPermit { + _ongoing_gauge_guard: ongoing_gauge_guard, + msg_sender: self.msg_sender.clone(), + memory_allocation: next_permit_size, + warmup_slot_freed: false, + }) + // if the requester dropped its receiver, we drop the newly + // created SearchPermit which releases the resources + .ok(); } crate::SEARCH_METRICS .leaf_search_single_split_tasks_pending @@ -128,41 +240,93 @@ impl InnerSearchPermitProvider { } } +#[derive(Debug)] pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_arc: Arc>, - recycle_on_drop: bool, + msg_sender: mpsc::WeakUnboundedSender, + memory_allocation: u64, + warmup_slot_freed: bool, } impl SearchPermit { - fn drop_without_recycling_permit(mut self) { - self.recycle_on_drop = false; - drop(self); + /// Update the memory usage attached to this permit. + /// + /// This will increase or decrease the available memory in the [`SearchPermitProvider`]. + pub fn update_memory_usage(&mut self, new_memory_usage: ByteSize) { + let new_usage_bytes = new_memory_usage.as_u64(); + let memory_delta = new_usage_bytes as i64 - self.memory_allocation as i64; + self.memory_allocation = new_usage_bytes; + self.send_if_still_running(SearchPermitMessage::UpdateMemory { memory_delta }); + } + + /// Drop the warmup permit, allowing more downloads to be started. Only one + /// slot is attached to each permit so calling this again has no effect. + pub fn free_warmup_slot(&mut self) { + if self.warmup_slot_freed { + return; + } + self.warmup_slot_freed = true; + self.send_if_still_running(SearchPermitMessage::FreeWarmupSlot); + } + + pub fn memory_allocation(&self) -> ByteSize { + ByteSize(self.memory_allocation) + } + + fn send_if_still_running(&self, msg: SearchPermitMessage) { + if let Some(sender) = self.msg_sender.upgrade() { + sender + .send(msg) + // Receiver instance in the event loop is never dropped or + // closed as long as there is a strong sender reference. + .expect("Receiver should live longer than sender"); + } } } impl Drop for SearchPermit { fn drop(&mut self) { - if !self.recycle_on_drop { - return; + self.send_if_still_running(SearchPermitMessage::Drop { + memory_size: self.memory_allocation, + warmup_slot_freed: self.warmup_slot_freed, + }); + } +} + +#[derive(Debug)] +pub struct SearchPermitFuture(oneshot::Receiver); + +impl Future for SearchPermitFuture { + type Output = SearchPermit; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let receiver = Pin::new(&mut self.get_mut().0); + match receiver.poll(cx) { + Poll::Ready(Ok(search_permit)) => Poll::Ready(search_permit), + Poll::Ready(Err(_)) => panic!("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."), + Poll::Pending => Poll::Pending, } - let mut inner_guard = self.inner_arc.lock().unwrap(); - inner_guard.recycle_permit(&self.inner_arc.clone()); } } #[cfg(test)] mod tests { + use std::iter::repeat; + use std::time::Duration; + + use futures::StreamExt; + use rand::seq::SliceRandom; use tokio::task::JoinSet; use super::*; #[tokio::test] - async fn test_search_permits_get_permits_future() { - // We test here that `get_permits_futures` does not interleave - let search_permits = SearchPermitProvider::new(1); + async fn test_search_permit_order() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); let mut all_futures = Vec::new(); - let first_batch_of_permits = search_permits.get_permits(10); + let first_batch_of_permits = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(10)) + .await; assert_eq!(first_batch_of_permits.len(), 10); all_futures.extend( first_batch_of_permits @@ -171,7 +335,9 @@ mod tests { .map(move |(i, fut)| ((1, i), fut)), ); - let second_batch_of_permits = search_permits.get_permits(10); + let second_batch_of_permits = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(10)) + .await; assert_eq!(second_batch_of_permits.len(), 10); all_futures.extend( second_batch_of_permits @@ -180,7 +346,6 @@ mod tests { .map(move |(i, fut)| ((2, i), fut)), ); - use rand::seq::SliceRandom; // not super useful, considering what join set does, but still a tiny bit more sound. all_futures.shuffle(&mut rand::thread_rng()); @@ -206,15 +371,110 @@ mod tests { } #[tokio::test] - async fn test_search_permits_receiver_race_condition() { - // Here we test that we don't have a problem if the Receiver is dropped. - // In particular, we want to check that there is not a race condition where drop attempts to - // lock the mutex. - let search_permits = SearchPermitProvider::new(1); - let permit_rx = search_permits.get_permit(); - let permit_rx2 = search_permits.get_permit(); - drop(permit_rx2); - drop(permit_rx); - let _permit_rx = search_permits.get_permit(); + async fn test_search_permit_early_drops() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); + let permit_fut1 = permit_provider + .get_permits(vec![ByteSize::mb(10)]) + .await + .into_iter() + .next() + .unwrap(); + let permit_fut2 = permit_provider + .get_permits([ByteSize::mb(10)]) + .await + .into_iter() + .next() + .unwrap(); + drop(permit_fut1); + let permit = permit_fut2.await; + assert_eq!(permit.memory_allocation, ByteSize::mb(10).as_u64()); + assert_eq!(*permit_provider.actor_stopped.borrow(), false); + + let _permit_fut3 = permit_provider + .get_permits([ByteSize::mb(10)]) + .await + .into_iter() + .next() + .unwrap(); + let mut actor_stopped = permit_provider.actor_stopped.clone(); + drop(permit_provider); + { + actor_stopped.changed().await.unwrap(); + assert!(*actor_stopped.borrow()); + } + } + + /// Tries to wait for a permit + async fn try_get(permit_fut: SearchPermitFuture) -> anyhow::Result { + // using a short timeout is a bit flaky, but it should be enough for these tests + let permit = tokio::time::timeout(Duration::from_millis(20), permit_fut).await?; + Ok(permit) + } + + #[tokio::test] + async fn test_memory_budget() { + let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100)); + let mut permit_futs = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(14)) + .await; + let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); + assert_eq!(remaining_permit_futs.len(), 4); + // we should be able to obtain 10 permits right away (100MB / 10MB) + let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) + .buffered(1) + .collect() + .await; + // the next permit is blocked by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // if we drop one of the permits, we can get a new one + permits.drain(0..1); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + let _new_permit = try_get(next_permit_fut).await.unwrap(); + // the next permit is blocked again by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // by setting a more accurate memory usage after a completed warmup, we can get more permits + permits[0].update_memory_usage(ByteSize::mb(4)); + permits[1].update_memory_usage(ByteSize::mb(6)); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_permit_fut).await.unwrap(); + } + + #[tokio::test] + async fn test_warmup_slot() { + let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100)); + let mut permit_futs = permit_provider + .get_permits(repeat(ByteSize::mb(1)).take(16)) + .await; + let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); + assert_eq!(remaining_permit_futs.len(), 6); + // we should be able to obtain 10 permits right away + let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) + .buffered(1) + .collect() + .await; + // the next permit is blocked by the warmup slots + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // if we drop one of the permits, we can get a new one + permits.drain(0..1); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_permit_fut).await.unwrap()); + // the next permit is blocked again by the warmup slots + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // we can explicitly free the warmup slot on a permit + permits[0].free_warmup_slot(); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_permit_fut).await.unwrap()); + // dropping that same permit does not free up another slot + permits.drain(0..1); + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // but dropping a permit for which the slot wasn't explicitly free does free up a slot + permits.drain(0..1); + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_blocked_permit_fut).await.unwrap()); } } diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 941e0d12612..0659965b40d 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -29,7 +29,7 @@ use quickwit_proto::search::{ LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType}; use tantivy::fastfield::Column; use tantivy::query::Query; @@ -116,6 +116,7 @@ async fn leaf_search_stream_single_split( mut stream_request: SearchStreamRequest, storage: Arc, ) -> crate::Result { + // TODO: Should we track the memory here using the SearchPermitProvider? let _leaf_split_stream_permit = searcher_context .split_stream_semaphore .acquire() @@ -127,12 +128,14 @@ async fn leaf_search_stream_single_split( &split, ); - let index = open_index_with_caches( + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let (index, _) = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(cache), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 0029f4dd3a7..d566463b42e 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -488,8 +488,10 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); + let leaf_search_split_semaphore = SearchPermitProvider::new( + searcher_config.max_num_concurrent_split_searches, + searcher_config.warmup_memory_budget, + ); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9ef3b7f523f..425e4f9a043 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -21,7 +21,8 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use tantivy::directory::OwnedBytes; @@ -344,31 +345,54 @@ impl Drop for NeedMutByteRangeCache { /// cached data, the changes may or may not get recorded. /// /// At the moment this is hardly a cache as it features no eviction policy. +#[derive(Clone)] pub struct ByteRangeCache { - inner: Mutex>, + inner_arc: Arc, +} + +struct Inner { + num_stored_bytes: AtomicU64, + need_mut_byte_range_cache: Mutex>, } impl ByteRangeCache { /// Creates a slice cache that never removes any entry. pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { + let need_mut_byte_range_cache = + NeedMutByteRangeCache::with_infinite_capacity(cache_counters); + let inner = Inner { + num_stored_bytes: AtomicU64::default(), + need_mut_byte_range_cache: Mutex::new(need_mut_byte_range_cache), + }; ByteRangeCache { - inner: Mutex::new(NeedMutByteRangeCache::with_infinite_capacity( - cache_counters, - )), + inner_arc: Arc::new(inner), } } + /// Overall amount of bytes stored in the cache. + pub fn get_num_bytes(&self) -> u64 { + self.inner_arc.num_stored_bytes.load(Ordering::Relaxed) + } + /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { - self.inner.lock().unwrap().get_slice(path, byte_range) + self.inner_arc + .need_mut_byte_range_cache + .lock() + .unwrap() + .get_slice(path, byte_range) } /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - self.inner - .lock() - .unwrap() - .put_slice(path, byte_range, bytes) + let mut need_mut_byte_range_cache_locked = + self.inner_arc.need_mut_byte_range_cache.lock().unwrap(); + need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); + let num_bytes = need_mut_byte_range_cache_locked.num_bytes; + drop(need_mut_byte_range_cache_locked); + self.inner_arc + .num_stored_bytes + .store(num_bytes, Ordering::Relaxed); } } @@ -446,13 +470,13 @@ mod tests { .sum(); // in some case we have ranges touching each other, count_items count them // as only one, but cache count them as 2. - assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64); + assert!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() .filter(|stored| **stored) .count(); - assert_eq!(cache.inner.lock().unwrap().num_bytes, expected_byte_count as u64); + assert_eq!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_bytes, expected_byte_count as u64); } Operation::Get { range, @@ -519,7 +543,7 @@ mod tests { ); { - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -531,7 +555,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);