Skip to content

Commit

Permalink
blop
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Nov 13, 2024
1 parent b84a28d commit bc43556
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 41 deletions.
2 changes: 0 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@ message LeafSearchRequest {
repeated string index_uris = 9;
}

message ResourceStats {
uint64 short_lived_cache_num_bytes = 1;
uint64 split_num_docs = 2;
uint64 warmup_microsecs = 3;
uint64 cpu_thread_pool_wait_microsecs = 4;
uint64 cpu_microsecs = 5;
}

/// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
Expand Down Expand Up @@ -479,6 +487,8 @@ message LeafSearchResponse {

// postcard serialized intermediate aggregation_result.
optional bytes intermediate_aggregation_result = 6;

ResourceStats resource_stats = 8;
}

message SnippetRequest {
Expand Down
17 changes: 17 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ http = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
postcard = { workspace = true }
prost = { workspace = true }
rayon = { workspace = true }
Expand All @@ -34,7 +33,6 @@ tokio = { workspace = true }
tokio-stream = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ttl_cache = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }
Expand Down
10 changes: 9 additions & 1 deletion quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn};
use crate::retry::search::LeafSearchRetryPolicy;
use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds};
use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy};
use crate::{SearchError, SearchJobPlacer, SearchServiceClient};
use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient};

/// Maximum number of put requests emitted to perform a replicated given PUT KV.
const MAX_PUT_KV_ATTEMPTS: usize = 6;
Expand Down Expand Up @@ -317,6 +317,13 @@ fn merge_original_with_retry_leaf_search_response(
(Some(left), None) => Some(left),
(None, None) => None,
};
let mut stats = [
original_response.resource_stats.as_ref(),
retry_response.resource_stats.as_ref(),
]
.into_iter()
.flat_map(|el_opt| el_opt);
let resource_stats = merge_resource_stats_it(&mut stats);
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: original_response.num_hits + retry_response.num_hits,
Expand All @@ -326,6 +333,7 @@ fn merge_original_with_retry_leaf_search_response(
partial_hits: original_response.partial_hits,
num_successful_splits: original_response.num_successful_splits
+ retry_response.num_successful_splits,
resource_stats,
})
}

Expand Down
23 changes: 20 additions & 3 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use itertools::Itertools;
use quickwit_common::binary_heap::{SortKeyMapper, TopK};
use quickwit_doc_mapper::WarmupInfo;
use quickwit_proto::search::{
LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue,
SplitSearchError,
LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortOrder,
SortValue, SplitSearchError,
};
use quickwit_proto::types::SplitId;
use serde::Deserialize;
Expand All @@ -40,7 +40,7 @@ use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyErro

use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span};
use crate::top_k_collector::{specialized_top_k_segment_collector, QuickwitSegmentTopKCollector};
use crate::GlobalDocAddress;
use crate::{merge_resource_stats, merge_resource_stats_it, GlobalDocAddress};

#[derive(Clone, Debug)]
pub(crate) enum SortByComponent {
Expand Down Expand Up @@ -586,13 +586,15 @@ impl SegmentCollector for QuickwitSegmentCollector {
}
None => None,
};

Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: self.num_hits,
partial_hits,
failed_splits: Vec::new(),
num_attempted_splits: 1,
num_successful_splits: 1,
resource_stats: None,
})
}
}
Expand Down Expand Up @@ -918,6 +920,11 @@ fn merge_leaf_responses(
return Ok(leaf_responses.pop().unwrap());
}

let mut resource_stats_it = leaf_responses
.iter()
.flat_map(|leaf_response| leaf_response.resource_stats.as_ref());
let merged_resource_stats = merge_resource_stats_it(&mut resource_stats_it);

let merged_intermediate_aggregation_result: Option<Vec<u8>> =
merge_intermediate_aggregation_result(
aggregations_opt,
Expand Down Expand Up @@ -959,6 +966,7 @@ fn merge_leaf_responses(
failed_splits,
num_attempted_splits,
num_successful_splits,
resource_stats: merged_resource_stats,
})
}

Expand Down Expand Up @@ -1182,6 +1190,7 @@ pub(crate) struct IncrementalCollector {
num_attempted_splits: u64,
num_successful_splits: u64,
start_offset: usize,
resource_stats: ResourceStats,
}

impl IncrementalCollector {
Expand All @@ -1202,6 +1211,7 @@ impl IncrementalCollector {
failed_splits: Vec::new(),
num_attempted_splits: 0,
num_successful_splits: 0,
resource_stats: ResourceStats::default(),
}
}

Expand All @@ -1214,8 +1224,13 @@ impl IncrementalCollector {
num_attempted_splits,
intermediate_aggregation_result,
num_successful_splits,
resource_stats,
} = leaf_response;

if let Some(leaf_resource_stats) = &resource_stats {
merge_resource_stats(leaf_resource_stats, &mut self.resource_stats);
}

self.num_hits += num_hits;
self.top_k_hits.add_entries(partial_hits.into_iter());
self.failed_splits.extend(failed_splits);
Expand Down Expand Up @@ -1265,6 +1280,7 @@ impl IncrementalCollector {
num_attempted_splits: self.num_attempted_splits,
num_successful_splits: self.num_successful_splits,
intermediate_aggregation_result,
resource_stats: Some(self.resource_stats),
})
}
}
Expand Down Expand Up @@ -1771,6 +1787,7 @@ mod tests {
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None,
resource_stats: None,
}],
);

Expand Down
45 changes: 28 additions & 17 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::ops::Bound;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

use anyhow::Context;
use futures::future::try_join_all;
Expand All @@ -43,7 +44,7 @@ use tantivy::aggregation::AggregationLimitsGuard;
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::Field;
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term};
use tokio::task::JoinError;
use tracing::*;

Expand Down Expand Up @@ -363,6 +364,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
num_attempted_splits: 1,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
}
}

Expand Down Expand Up @@ -427,10 +429,13 @@ async fn leaf_search_single_split(
warmup_info.merge(collector_warmup_info);
warmup_info.simplify();

let warmup_start = Instant::now();
warmup(&searcher, &warmup_info).await?;
let warmup_end = Instant::now();
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);

let trace_id = crate::get_trace_id();
info!(trace_id=%trace_id, split_id=%split.split_id.as_str(), num_docs=split.num_docs, input_data=byte_range_cache.get_num_bytes(), "split search input data memory");
let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes();
let split_num_docs = split.num_docs;

let span = info_span!("tantivy_search");

Expand All @@ -439,25 +444,31 @@ async fn leaf_search_single_split(

crate::search_thread_pool()
.run_cpu_intensive(move || {
let cpu_start = Instant::now();
let cpu_thread_pool_wait_microsecs = cpu_start.duration_since(warmup_end);
let _span_guard = span.enter();
// Our search execution has been scheduled, let's check if we can improve the
// request based on the results of the preceding searches
check_optimize_search_request(&mut search_request, &split, &split_filter);
collector.update_search_param(&search_request);
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok((
search_request,
get_leaf_resp_from_count(searcher.num_docs() as u64),
));
}
if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
Ok((search_request, get_leaf_resp_from_count(count)))
} else {
searcher
.search(&query, &collector)
.map(|resp| (search_request, resp))
}
let mut leaf_search_response: LeafSearchResponse =
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
get_leaf_resp_from_count(searcher.num_docs() as u64)
} else if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
get_leaf_resp_from_count(count)
} else {
searcher.search(&query, &collector)?
};
leaf_search_response.resource_stats = Some(quickwit_proto::search::ResourceStats {
cpu_microsecs: cpu_start.elapsed().as_micros() as u64,
short_lived_cache_num_bytes,
split_num_docs,
warmup_microsecs: warmup_duration.as_micros() as u64,
cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros()
as u64,
});
Result::<_, TantivyError>::Ok((search_request, leaf_search_response))
})
.await
.map_err(|_| {
Expand Down
29 changes: 19 additions & 10 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ mod tests;

pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
use opentelemetry::trace::TraceId;
use quickwit_common::thread_pool::ThreadPool;
use quickwit_common::tower::Pool;
use quickwit_doc_mapper::DocMapper;
Expand All @@ -73,7 +72,9 @@ use quickwit_metastore::{
IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState,
};
use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets};
use quickwit_proto::search::{
PartialHit, ResourceStats, SearchRequest, SearchResponse, SplitIdAndFooterOffsets,
};
use quickwit_proto::types::IndexUid;
use quickwit_storage::StorageResolver;
pub use service::SearcherContext;
Expand Down Expand Up @@ -342,12 +343,20 @@ pub fn searcher_pool_for_test(
)
}

pub(crate) fn get_trace_id() -> TraceId {
use opentelemetry::trace::TraceContextExt as _;
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
tracing::Span::current()
.context()
.span()
.span_context()
.trace_id()
pub(crate) fn merge_resource_stats_it(
stats_it: &mut dyn Iterator<Item = &ResourceStats>,
) -> Option<ResourceStats> {
let mut acc_stats: ResourceStats = stats_it.next()?.clone();
for new_stats in stats_it {
merge_resource_stats(new_stats, &mut acc_stats);
}
Some(acc_stats)
}

fn merge_resource_stats(new_stats: &ResourceStats, stat_accs: &mut ResourceStats) {
stat_accs.short_lived_cache_num_bytes += new_stats.short_lived_cache_num_bytes;
stat_accs.split_num_docs += new_stats.split_num_docs;
stat_accs.warmup_microsecs += new_stats.warmup_microsecs;
stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs;
stat_accs.cpu_microsecs += new_stats.cpu_microsecs;
}
8 changes: 2 additions & 6 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult
use tantivy::collector::Collector;
use tantivy::schema::{Field, FieldEntry, FieldType, Schema};
use tantivy::TantivyError;
use tracing::{debug, info, info_span, instrument};
use tracing::{debug, info_span, instrument};

use crate::cluster_client::ClusterClient;
use crate::collector::{make_merge_collector, QuickwitAggregations};
Expand Down Expand Up @@ -683,6 +683,7 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec<LeafSea
num_attempted_splits: 1,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
})
.collect()
}
Expand Down Expand Up @@ -1114,9 +1115,6 @@ pub async fn root_search(
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
let trace_id = crate::get_trace_id();

info!(trace_id=%trace_id, search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: search_request.index_id_patterns.clone(),
Expand Down Expand Up @@ -1173,8 +1171,6 @@ pub async fn root_search(

let elapsed = start_instant.elapsed();

info!(trace_id=%trace_id, num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed");

if let Ok(search_response) = &mut search_response_result {
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
}
Expand Down

0 comments on commit bc43556

Please sign in to comment.