Skip to content

Commit

Permalink
Measure and log the amount of memory taken by a split search, and log…
Browse files Browse the repository at this point in the history
… this.
  • Loading branch information
fulmicoton committed Nov 15, 2024
1 parent d4ad40d commit 3cf9f38
Show file tree
Hide file tree
Showing 13 changed files with 703 additions and 291 deletions.
705 changes: 473 additions & 232 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

22 changes: 14 additions & 8 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tantivy::{Directory, HasLen};
pub struct CachingDirectory {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
}

impl CachingDirectory {
Expand All @@ -42,12 +42,18 @@ impl CachingDirectory {
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, byte_range_cache)
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand All @@ -59,7 +65,7 @@ impl fmt::Debug for CachingDirectory {

struct CachingFileHandle {
path: PathBuf,
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
underlying_filehandle: Arc<dyn FileHandle>,
}

Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@ message LeafSearchRequest {
repeated string index_uris = 9;
}

message ResourceStats {
uint64 short_lived_cache_num_bytes = 1;
uint64 split_num_docs = 2;
uint64 warmup_microsecs = 3;
uint64 cpu_thread_pool_wait_microsecs = 4;
uint64 cpu_microsecs = 5;
}

/// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
Expand Down Expand Up @@ -479,6 +487,8 @@ message LeafSearchResponse {

// postcard serialized intermediate aggregation_result.
optional bytes intermediate_aggregation_result = 6;

ResourceStats resource_stats = 8;
}

message SnippetRequest {
Expand Down
17 changes: 17 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn};
use crate::retry::search::LeafSearchRetryPolicy;
use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds};
use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy};
use crate::{SearchError, SearchJobPlacer, SearchServiceClient};
use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient};

/// Maximum number of put requests emitted to perform a replicated given PUT KV.
const MAX_PUT_KV_ATTEMPTS: usize = 6;
Expand Down Expand Up @@ -317,6 +317,13 @@ fn merge_original_with_retry_leaf_search_response(
(Some(left), None) => Some(left),
(None, None) => None,
};
let mut stats = [
original_response.resource_stats.as_ref(),
retry_response.resource_stats.as_ref(),
]
.into_iter()
.flat_map(|el_opt| el_opt);
let resource_stats = merge_resource_stats_it(&mut stats);
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: original_response.num_hits + retry_response.num_hits,
Expand All @@ -326,6 +333,7 @@ fn merge_original_with_retry_leaf_search_response(
partial_hits: original_response.partial_hits,
num_successful_splits: original_response.num_successful_splits
+ retry_response.num_successful_splits,
resource_stats,
})
}

Expand Down
23 changes: 20 additions & 3 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use itertools::Itertools;
use quickwit_common::binary_heap::{SortKeyMapper, TopK};
use quickwit_doc_mapper::WarmupInfo;
use quickwit_proto::search::{
LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue,
SplitSearchError,
LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortOrder,
SortValue, SplitSearchError,
};
use quickwit_proto::types::SplitId;
use serde::Deserialize;
Expand All @@ -40,7 +40,7 @@ use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyErro

use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span};
use crate::top_k_collector::{specialized_top_k_segment_collector, QuickwitSegmentTopKCollector};
use crate::GlobalDocAddress;
use crate::{merge_resource_stats, merge_resource_stats_it, GlobalDocAddress};

#[derive(Clone, Debug)]
pub(crate) enum SortByComponent {
Expand Down Expand Up @@ -586,13 +586,15 @@ impl SegmentCollector for QuickwitSegmentCollector {
}
None => None,
};

Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: self.num_hits,
partial_hits,
failed_splits: Vec::new(),
num_attempted_splits: 1,
num_successful_splits: 1,
resource_stats: None,
})
}
}
Expand Down Expand Up @@ -918,6 +920,11 @@ fn merge_leaf_responses(
return Ok(leaf_responses.pop().unwrap());
}

let mut resource_stats_it = leaf_responses
.iter()
.flat_map(|leaf_response| leaf_response.resource_stats.as_ref());
let merged_resource_stats = merge_resource_stats_it(&mut resource_stats_it);

let merged_intermediate_aggregation_result: Option<Vec<u8>> =
merge_intermediate_aggregation_result(
aggregations_opt,
Expand Down Expand Up @@ -959,6 +966,7 @@ fn merge_leaf_responses(
failed_splits,
num_attempted_splits,
num_successful_splits,
resource_stats: merged_resource_stats,
})
}

Expand Down Expand Up @@ -1182,6 +1190,7 @@ pub(crate) struct IncrementalCollector {
num_attempted_splits: u64,
num_successful_splits: u64,
start_offset: usize,
resource_stats: ResourceStats,
}

impl IncrementalCollector {
Expand All @@ -1202,6 +1211,7 @@ impl IncrementalCollector {
failed_splits: Vec::new(),
num_attempted_splits: 0,
num_successful_splits: 0,
resource_stats: ResourceStats::default(),
}
}

Expand All @@ -1214,8 +1224,13 @@ impl IncrementalCollector {
num_attempted_splits,
intermediate_aggregation_result,
num_successful_splits,
resource_stats,
} = leaf_response;

if let Some(leaf_resource_stats) = &resource_stats {
merge_resource_stats(leaf_resource_stats, &mut self.resource_stats);
}

self.num_hits += num_hits;
self.top_k_hits.add_entries(partial_hits.into_iter());
self.failed_splits.extend(failed_splits);
Expand Down Expand Up @@ -1265,6 +1280,7 @@ impl IncrementalCollector {
num_attempted_splits: self.num_attempted_splits,
num_successful_splits: self.num_successful_splits,
intermediate_aggregation_result,
resource_stats: Some(self.resource_stats),
})
}
}
Expand Down Expand Up @@ -1771,6 +1787,7 @@ mod tests {
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None,
resource_stats: None,
}],
);

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
false,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
63 changes: 40 additions & 23 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,28 @@ use std::ops::Bound;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

use anyhow::Context;
use futures::future::try_join_all;
use quickwit_common::pretty::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder,
SortValue, SplitIdAndFooterOffsets, SplitSearchError,
CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError
};
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver, TimeoutAndRetryStorage,
wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes,
SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::Field;
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term};
use tokio::task::JoinError;
use tracing::*;

Expand Down Expand Up @@ -134,7 +134,7 @@ pub(crate) async fn open_index_with_caches(
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
ephemeral_unbounded_cache: Option<ByteRangeCache>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -166,8 +166,8 @@ pub(crate) async fn open_index_with_caches(

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let hot_directory = if let Some(cache) = ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new(Arc::new(directory), cache);
HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)?
} else {
HotDirectory::open(directory, hotcache_bytes.read_bytes()?)?
Expand Down Expand Up @@ -363,6 +363,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
num_attempted_splits: 1,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
}
}

Expand Down Expand Up @@ -400,12 +401,14 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();
Expand All @@ -425,33 +428,47 @@ async fn leaf_search_single_split(
warmup_info.merge(collector_warmup_info);
warmup_info.simplify();

let warmup_start = Instant::now();
warmup(&searcher, &warmup_info).await?;
let warmup_end = Instant::now();
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);

let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes();
let split_num_docs = split.num_docs;


let span = info_span!("tantivy_search");

let (search_request, leaf_search_response) = {
let split = split.clone();

crate::search_thread_pool()
.run_cpu_intensive(move || {
let cpu_start = Instant::now();
let cpu_thread_pool_wait_microsecs = cpu_start.duration_since(warmup_end);
let _span_guard = span.enter();
// Our search execution has been scheduled, let's check if we can improve the
// request based on the results of the preceding searches
check_optimize_search_request(&mut search_request, &split, &split_filter);
collector.update_search_param(&search_request);
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok((
search_request,
get_leaf_resp_from_count(searcher.num_docs() as u64),
));
}
if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
Ok((search_request, get_leaf_resp_from_count(count)))
} else {
searcher
.search(&query, &collector)
.map(|resp| (search_request, resp))
}
let mut leaf_search_response: LeafSearchResponse =
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
get_leaf_resp_from_count(searcher.num_docs() as u64)
} else if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
get_leaf_resp_from_count(count)
} else {
searcher.search(&query, &collector)?
};
leaf_search_response.resource_stats = Some(quickwit_proto::search::ResourceStats {
cpu_microsecs: cpu_start.elapsed().as_micros() as u64,
short_lived_cache_num_bytes,
split_num_docs,
warmup_microsecs: warmup_duration.as_micros() as u64,
cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros()
as u64,
});
Result::<_, TantivyError>::Ok((search_request, leaf_search_response))
})
.await
.map_err(|_| {
Expand Down
Loading

0 comments on commit 3cf9f38

Please sign in to comment.