From e1b7125147696b449378e98060010afce97300a8 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 15 Nov 2024 11:32:09 +0900 Subject: [PATCH] Limit search memory usage associated with warmup. Due to tantivy limitations, searching a split requires downloading all of the required data, and keep them in memory. We call this phase warmup. Before this PR, the only thing that curbed memory usage was the search permits: only N split search may happen concurrently. Unfortunately, the amount of data required here varies vastly. We need a mechanism to measure and avoid running more split search when memory is tight. Just using a semaphore is however not an option. We do not know beforehands how much memory will be required by a split search, so it could easily lead to a dead lock. Instead, this commit builds upon the search permit provider. The search permit provider is in charge of managing a configurable memory budget for this warmup memory. We introduce here a configurable "warmup_single_split_initial_allocation". A new leaf split search cannot be started if this memory is not available. This initial allocation is meant to be greater than what will be actually needed most of the time. The split search then holds this allocation until the end of warmup. After warmup, we can get the actual memory usage by interrogating the warmup cache. We can then update the amount of memory held. (most of the time, this should mean releasing some memory) In addition, in this PR, at this point, we also release the warmup search permit: We still have to perform the actual task of searching, but the thread pool will take care of limiting the number of concurrent task. Closes #5355 --- .../quickwit-config/src/node_config/mod.rs | 12 +++- quickwit/quickwit-search/src/leaf.rs | 6 +- .../src/search_permit_provider.rs | 66 +++++++++++++++---- quickwit/quickwit-search/src/service.rs | 2 +- 4 files changed, 68 insertions(+), 18 deletions(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 3eef1f10428..190e5b44295 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,6 +226,12 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, + + // TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation` + // TODO set serde default + pub warmup_memory_budget: ByteSize, + // TODO set serde default + pub warmup_single_split_initial_allocation: ByteSize, } /// Configuration controlling how fast a searcher should timeout a `get_slice` @@ -263,7 +269,7 @@ impl StorageTimeoutPolicy { impl Default for SearcherConfig { fn default() -> Self { - Self { + SearcherConfig { fast_field_cache_capacity: ByteSize::gb(1), split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), @@ -274,6 +280,10 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, + // TODO change this to the method used for serde default. + warmup_memory_budget: ByteSize::gb(1), + // TODO change this to the method used for serde default. + warmup_single_split_initial_allocation: ByteSize::mb(50), } } } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 94d6bd0ccb1..78dfc706b7c 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -376,6 +376,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, + search_permit: &mut SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -434,9 +435,9 @@ async fn leaf_search_single_split( let warmup_duration: Duration = warmup_end.duration_since(warmup_start); let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes(); + search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes); let split_num_docs = split.num_docs; - let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { @@ -1378,7 +1379,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - search_permit: SearchPermit, + mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1393,6 +1394,7 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, + &mut search_permit, ) .await; diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index f6883efb34b..07aad825eb1 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -20,8 +20,10 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; +use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; use tokio::sync::oneshot; +use tracing::warn; /// `SearchPermitProvider` is a distributor of permits to perform single split /// search operation. @@ -33,11 +35,14 @@ pub struct SearchPermitProvider { } impl SearchPermitProvider { - pub fn new(num_permits: usize) -> SearchPermitProvider { + pub fn new(num_permits: usize, memory_budget: ByteSize, initial_allocation: ByteSize) -> SearchPermitProvider { SearchPermitProvider { inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { num_permits_available: num_permits, + memory_budget: memory_budget.as_u64(), permits_requests: VecDeque::new(), + memory_allocated: 0u64, + initial_allocation: initial_allocation.as_u64(), })), } } @@ -65,6 +70,14 @@ impl SearchPermitProvider { struct InnerSearchPermitProvider { num_permits_available: usize, + + // Note it is possible for memory_allocated to exceed memory_budget temporarily, + // if and only if a split leaf search task ended up using more than `initial_allocation`. + // + // When it happens, new permits will not be assigned until the memory is freed. + memory_budget: u64, + memory_allocated: u64, + initial_allocation: u64, permits_requests: VecDeque>, } @@ -94,30 +107,29 @@ impl InnerSearchPermitProvider { permits } - fn recycle_permit(&mut self, inner_arc: &Arc>) { - self.num_permits_available += 1; - self.assign_available_permits(inner_arc); - } - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_permits_available > 0 { - let Some(sender) = self.permits_requests.pop_front() else { + while self.num_permits_available > 0 && self.memory_allocated + self.initial_allocation <= self.memory_budget { + let Some(permit_requester_tx) = self.permits_requests.pop_front() else { break; }; let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = sender.send(SearchPermit { + let send_res = permit_requester_tx.send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, inner_arc: inner_arc.clone(), - recycle_on_drop: true, + warmup_permit_held: true, + memory_allocation: self.initial_allocation, }); match send_res { Ok(()) => { self.num_permits_available -= 1; + self.memory_allocated += self.initial_allocation; } Err(search_permit) => { + // We cannot just decrease the num_permits_available in all case and rely on + // the drop logic here: it would cause a dead lock on the inner_arc Mutex. search_permit.drop_without_recycling_permit(); } } @@ -131,23 +143,49 @@ impl InnerSearchPermitProvider { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, inner_arc: Arc>, - recycle_on_drop: bool, + warmup_permit_held: bool, + memory_allocation: u64, } impl SearchPermit { + /// After warm up, we have a proper estimate of the memory usage of a single split leaf search. + /// + /// We can then set the actual memory usage. + pub fn set_actual_memory_usage_and_release_permit_after(&mut self, new_memory_usage: u64) { + if new_memory_usage > self.memory_allocation { + warn!(memory_usage=new_memory_usage, memory_allocation=self.memory_allocation, "current leaf search is consuming more memory than the initial allocation"); + } + let mut inner_guard = self.inner_arc.lock().unwrap(); + let delta = new_memory_usage as i64 - inner_guard.initial_allocation as i64; + inner_guard.memory_allocated += delta as u64; + inner_guard.num_permits_available += 1; + if inner_guard.memory_allocated > inner_guard.memory_budget { + warn!(memory_allocated=inner_guard.memory_allocated, memory_budget=inner_guard.memory_budget, "memory allocated exceeds memory budget"); + } + self.memory_allocation = new_memory_usage; + inner_guard.assign_available_permits(&self.inner_arc); + } + fn drop_without_recycling_permit(mut self) { - self.recycle_on_drop = false; + self.warmup_permit_held = false; + self.memory_allocation = 0u64; drop(self); } } impl Drop for SearchPermit { fn drop(&mut self) { - if !self.recycle_on_drop { + // This is not just an optimization. This is necessary to avoid a dead lock when the + // permit requester dropped its receiver channel. + if !self.warmup_permit_held && self.memory_allocation == 0 { return; } let mut inner_guard = self.inner_arc.lock().unwrap(); - inner_guard.recycle_permit(&self.inner_arc.clone()); + if self.warmup_permit_held { + inner_guard.num_permits_available += 1; + } + inner_guard.memory_allocated -= self.memory_allocation; + inner_guard.assign_available_permits(&self.inner_arc); } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 0029f4dd3a7..734344f2c74 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -489,7 +489,7 @@ impl SearcherContext { &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); + SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, searcher_config.warmup_single_split_initial_allocation); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;