Skip to content

Commit

Permalink
Limit and monitor warmup memory usage (#5568)
Browse files Browse the repository at this point in the history
* Measure and log the amount of memory taken by a split search, and log this.

* Limit search memory usage associated with warmup.

Due to tantivy limitations, searching a split requires downloading all
of the required data, and keep them in memory. We call this phase
warmup.

Before this PR, the only thing that curbed memory usage was the search
permits: only N split search may happen concurrently.
Unfortunately, the amount of data required here varies vastly.

We need a mechanism to measure and avoid running more split search
when memory is tight.

Just using a semaphore is however not an option. We do not know
beforehands how much memory will be required by a split search, so it could easily
lead to a dead lock.

Instead, this commit builds upon the search permit provider.

The search permit provider is in charge of managing a configurable memory budget for this warmup memory.

We introduce here a configurable "warmup_single_split_initial_allocation".
A new leaf split search cannot be started if this memory is not
available. This initial allocation is meant to be greater than what will
be actually needed most of the time.

The split search then holds this allocation until the end of warmup.
After warmup, we can get the actual memory usage by interrogating the
warmup cache. We can then update the amount of memory held.
(most of the time, this should mean releasing some memory)

In addition, in this PR, at this point, we also release the warmup search permit:

We still have to perform the actual task of searching, but the thread
pool will take care of limiting the number of concurrent task.

Closes #5355

* Bring some clarifications and remove single permit getter

* Make search permit provider into an actor.

Also attach the permit to the actual memory cache to ensure memory is freed at the right moment.

* Revert weird cargo lock update

* Improve separation of concern by using wrapping instead of nesting

Adding an extra generic field into the cache to optionally allow permit tracking is weird.
Instead, we make the directory generic on the type of cache and use a wrapped cache
when tracking is necessary.

* Fix clippy

* Fix undefined incremental resource stat

* Add tests to permit provider

* Improve and test stats merging utils

* Fix minor typos

* Add test for permit resizing

* Increase default warmup memory

* Increase default warmup memory

* Add warmup cache metric

* Limit permit memory size with split size

* Also use num_docs to estimate init cache size

* Restore sort on HotCache file list

* Minor closure renaming

* Add minimum allocation size

* Increase default warmup memory to limit its effect

---------

Co-authored-by: Paul Masurel <[email protected]>
  • Loading branch information
rdettai and fulmicoton authored Dec 12, 2024
1 parent ec95419 commit 3ec6a07
Show file tree
Hide file tree
Showing 19 changed files with 972 additions and 224 deletions.
14 changes: 13 additions & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ pub struct SearcherConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,
pub warmup_memory_budget: ByteSize,
pub warmup_single_split_initial_allocation: ByteSize,
}

/// Configuration controlling how fast a searcher should timeout a `get_slice`
Expand Down Expand Up @@ -263,7 +265,7 @@ impl StorageTimeoutPolicy {

impl Default for SearcherConfig {
fn default() -> Self {
Self {
SearcherConfig {
fast_field_cache_capacity: ByteSize::gb(1),
split_footer_cache_capacity: ByteSize::mb(500),
partial_request_cache_capacity: ByteSize::mb(64),
Expand All @@ -274,6 +276,8 @@ impl Default for SearcherConfig {
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,
warmup_memory_budget: ByteSize::gb(100),
warmup_single_split_initial_allocation: ByteSize::gb(1),
}
}
}
Expand Down Expand Up @@ -308,6 +312,14 @@ impl SearcherConfig {
split_cache_limits.max_file_descriptors
);
}
if self.warmup_single_split_initial_allocation > self.warmup_memory_budget {
anyhow::bail!(
"warmup_single_split_initial_allocation ({}) must be lower or equal to \
warmup_memory_budget ({})",
self.warmup_single_split_initial_allocation,
self.warmup_memory_budget
);
}
}
Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,9 @@ mod tests {
min_throughtput_bytes_per_secs: 100_000,
timeout_millis: 2_000,
max_num_retries: 2
})
}),
warmup_memory_budget: ByteSize::gb(100),
warmup_single_split_initial_allocation: ByteSize::gb(1),
}
);
assert_eq!(
Expand Down
24 changes: 15 additions & 9 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,27 @@ use tantivy::{Directory, HasLen};
pub struct CachingDirectory {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
}

impl CachingDirectory {
/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// Warning: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, byte_range_cache)
}

/// Creates a new CachingDirectory.
///
/// Warning: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand All @@ -59,7 +65,7 @@ impl fmt::Debug for CachingDirectory {

struct CachingFileHandle {
path: PathBuf,
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
underlying_filehandle: Arc<dyn FileHandle>,
}

Expand Down
32 changes: 13 additions & 19 deletions quickwit/quickwit-directories/src/hot_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,12 @@ impl StaticDirectoryCache {
self.file_lengths.get(path).copied()
}

/// return the files and their cached lengths
pub fn get_stats(&self) -> Vec<(PathBuf, usize)> {
pub fn get_file_lengths(&self) -> Vec<(PathBuf, u64)> {
let mut entries = self
.slices
.file_lengths
.iter()
.map(|(path, cache)| (path.to_owned(), cache.len()))
.map(|(path, len)| (path.clone(), *len))
.collect::<Vec<_>>();

entries.sort_by_key(|el| el.0.to_owned());
entries
}
Expand Down Expand Up @@ -265,10 +263,6 @@ impl StaticSliceCache {
}
None
}

pub fn len(&self) -> usize {
self.bytes.len()
}
}

struct StaticSliceCacheBuilder {
Expand Down Expand Up @@ -376,12 +370,12 @@ impl HotDirectory {
}),
})
}
/// Get files and their cached sizes.
pub fn get_stats_per_file(
hot_cache_bytes: OwnedBytes,
) -> anyhow::Result<Vec<(PathBuf, usize)>> {
let static_cache = StaticDirectoryCache::open(hot_cache_bytes)?;
Ok(static_cache.get_stats())

/// Get all the files in the directory and their sizes.
///
/// The actual cached data is a very small fraction of this length.
pub fn get_file_lengths(&self) -> Vec<(PathBuf, u64)> {
self.inner.cache.get_file_lengths()
}
}

Expand Down Expand Up @@ -704,10 +698,10 @@ mod tests {
assert_eq!(directory_cache.get_file_length(three_path), Some(300));
assert_eq!(directory_cache.get_file_length(four_path), None);

let stats = directory_cache.get_stats();
assert_eq!(stats[0], (one_path.to_owned(), 8));
assert_eq!(stats[1], (three_path.to_owned(), 0));
assert_eq!(stats[2], (two_path.to_owned(), 7));
let file_lengths = directory_cache.get_file_lengths();
assert_eq!(file_lengths[0], (one_path.to_owned(), 100));
assert_eq!(file_lengths[1], (three_path.to_owned(), 300));
assert_eq!(file_lengths[2], (two_path.to_owned(), 200));

assert_eq!(
directory_cache
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@ message LeafSearchRequest {
repeated string index_uris = 9;
}

message ResourceStats {
uint64 short_lived_cache_num_bytes = 1;
uint64 split_num_docs = 2;
uint64 warmup_microsecs = 3;
uint64 cpu_thread_pool_wait_microsecs = 4;
uint64 cpu_microsecs = 5;
}

/// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
Expand Down Expand Up @@ -479,6 +487,8 @@ message LeafSearchResponse {

// postcard serialized intermediate aggregation_result.
optional bytes intermediate_aggregation_result = 6;

ResourceStats resource_stats = 8;
}

message SnippetRequest {
Expand Down
17 changes: 17 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn};
use crate::retry::search::LeafSearchRetryPolicy;
use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds};
use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy};
use crate::{SearchError, SearchJobPlacer, SearchServiceClient};
use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient};

/// Maximum number of put requests emitted to perform a replicated given PUT KV.
const MAX_PUT_KV_ATTEMPTS: usize = 6;
Expand Down Expand Up @@ -317,6 +317,10 @@ fn merge_original_with_retry_leaf_search_response(
(Some(left), None) => Some(left),
(None, None) => None,
};
let resource_stats = merge_resource_stats_it([
&original_response.resource_stats,
&retry_response.resource_stats,
]);
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: original_response.num_hits + retry_response.num_hits,
Expand All @@ -326,6 +330,7 @@ fn merge_original_with_retry_leaf_search_response(
partial_hits: original_response.partial_hits,
num_successful_splits: original_response.num_successful_splits
+ retry_response.num_successful_splits,
resource_stats,
})
}

Expand Down
Loading

0 comments on commit 3ec6a07

Please sign in to comment.