Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize topn requests #5075

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ message SplitIdAndFooterOffsets {
optional int64 timestamp_start = 4;
// The highest timestamp appearing in the split, in seconds since epoch
optional int64 timestamp_end = 5;
// The number of docs in the split
uint64 num_docs = 6;
}

// Hits returned by a FetchDocRequest.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
}],
..Default::default()
}
Expand All @@ -406,13 +407,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}],
Expand Down Expand Up @@ -441,13 +444,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}
Expand Down
165 changes: 146 additions & 19 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ async fn leaf_search_single_split(
return Ok(cached_answer);
}

let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date and time filter, where the current
// split can't have better results.
//
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(split.num_docs));
}

let split_id = split.split_id.to_string();
let index = open_index_with_caches(
searcher_context,
Expand All @@ -382,19 +393,6 @@ async fn leaf_search_single_split(

let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date, where the current split can't have
// better results.
//
// TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save
// opening the index and execute this earlier. Opening splits is typically served from the
// cache, so there may be no gain adding that info to SplitIdAndFooterOffsets.
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64));
}

let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

Expand Down Expand Up @@ -808,6 +806,29 @@ pub(crate) fn rewrite_start_end_time_bounds(
}
}

/// Checks if request is a simple all query.
/// Simple in this case would still including sorting
fn is_simple_all_query(search_request: &SearchRequest) -> bool {
if search_request.aggregation_request.is_some() {
return false;
}

if search_request.search_after.is_some() {
return false;
}

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() {
return false;
}

let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else {
return false;
};

matches!(query_ast, QueryAst::MatchAll)
}

#[derive(Debug, Clone)]
enum CanSplitDoBetter {
Uninformative,
Expand Down Expand Up @@ -879,6 +900,114 @@ impl CanSplitDoBetter {
}
}

/// This function tries to detect upfront which splits contain the top n hits and convert other
/// split searches to count only searches. It also optimizes split order.
///
/// Returns the search_requests with their split.
fn optimize(
&self,
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
PSeitz marked this conversation as resolved.
Show resolved Hide resolved
self.optimize_split_order(&mut splits);

if !is_simple_all_query(&request) {
// no optimization opportunity here.
return Ok(splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>());
}

let num_requested_docs = request.start_offset + request.max_hits;

// Calculate the number of splits which are guaranteed to deliver enough documents.
let min_required_splits = splits
.iter()
.map(|split| split.num_docs)
// computing the partial sum
.scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| {
*partial_sum += num_docs_in_split;
Some(*partial_sum)
})
.take_while(|partial_sum| *partial_sum < num_requested_docs)
.count()
+ 1;

// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>();

// reuse the detected sort order in split_filter
// we want to detect cases where we can convert some split queries to count only queries
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we apply the logic on CanSplitDoBetter instead of the actual sort order here?
Can't we have CanSplitDoBetter set to informative even though docs are requested to be ordered by timestamp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah answering my own question. if sorted by timestmap, but we don't have a bound, then CanSplitDoBetter is set to Higher or Lower but with None

// In this case there is no sort order, we order by split id.
// If the the first split has enough documents, we can convert the other queries to
// count only queries
for (_split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
disable_search_request_hits(request);
}
}
CanSplitDoBetter::Uninformative => {}
CanSplitDoBetter::SplitTimestampLower(_) => {
// We order by timestamp asc. split_with_req is sorted by timestamp_start.
//
// If we know that some splits will deliver enough documents, we can convert the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great explanation

// others to count only queries.
// Since we only have start and end ranges and don't know the distribution we make
// sure the splits dont' overlap, since the distribution of two
// splits could be like this (dot is a timestamp doc on a x axis), for top 2
// queries.
// ```
// [. .] Split1 has enough docs, but last doc is not in top 2
// [.. .] Split2 first doc is in top2
// ```
// Let's get the biggest timestamp_end of the first num_splits splits
let biggest_end_timestamp = split_with_req
.iter()
.take(min_required_splits)
.map(|(split, _)| split.timestamp_end())
.max()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MIN);
for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
if split.timestamp_start() > biggest_end_timestamp {
disable_search_request_hits(request);
}
}
}
CanSplitDoBetter::SplitTimestampHigher(_) => {
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
//
// We have the number of splits we need to search to get enough docs, now we need to
// find the splits that don't overlap.
//
// Let's get the smallest timestamp_start of the first num_splits splits
let smallest_start_timestamp = split_with_req
.iter()
.take(min_required_splits)
.map(|(split, _)| split.timestamp_start())
.min()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MAX);
for (split, ref mut request) in split_with_req.iter_mut().skip(min_required_splits)
{
if split.timestamp_end() < smallest_start_timestamp {
disable_search_request_hits(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't request count, the resulting disable_search_request does nothing.

Do we have a pass after that to entirely remove this split / request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strangely currently we don't have a no count parameter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already have an CountHits enum. We could add that or use the Option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't request count, the resulting disable_search_request does nothing.

No, it disables the returning of hits from that split. That leaves only the count, and in this simple query case the count can be served from metadata and is basically free.
Initially we don't know which split may contain the best splits so we return hits from all splits.

You already have an CountHits enum. We could add that or use the Option?

Yes, but that would be relevant for cases where we can't serve counts from metadata. But in that case we have already the run_all_splits optimization, which means we probably run num_cpu splits before early exiting.

}
}
}
CanSplitDoBetter::FindTraceIdsAggregation(_) => {}
}

Ok(split_with_req)
}

/// Returns whether the given split can possibly give documents better than the one already
/// known to match.
fn can_be_better(&self, split: &SplitIdAndFooterOffsets) -> bool {
Expand Down Expand Up @@ -1071,14 +1200,14 @@ pub async fn leaf_search(
searcher_context: Arc<SearcherContext>,
request: Arc<SearchRequest>,
index_storage: Arc<dyn Storage>,
mut splits: Vec<SplitIdAndFooterOffsets>,
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
aggregations_limits: AggregationLimits,
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5));

let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name());
split_filter.optimize_split_order(&mut splits);
let split_with_req = split_filter.optimize(request.clone(), splits)?;

// if client wants full count, or we are doing an aggregation, we want to run every splits.
// However if the aggregation is the tracing aggregation, we don't actually need all splits.
Expand All @@ -1088,22 +1217,20 @@ pub async fn leaf_search(

let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());
let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, &aggregations_limits)?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

for split in splits {
for (split, mut request) in split_with_req {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");

let mut request = (*request).clone();

let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
continue;
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/leaf_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let split_2 = SplitIdAndFooterOffsets {
Expand All @@ -215,6 +216,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let query_1 = SearchRequest {
Expand Down Expand Up @@ -269,20 +271,23 @@ mod tests {
split_footer_end: 100,
timestamp_start: Some(100),
timestamp_end: Some(199),
num_docs: 0,
};
let split_2 = SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: Some(150),
timestamp_end: Some(249),
num_docs: 0,
};
let split_3 = SplitIdAndFooterOffsets {
split_id: "split_3".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: Some(150),
timestamp_end: Some(249),
num_docs: 0,
};

let query_1 = SearchRequest {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn
.time_range
.as_ref()
.map(|time_range| *time_range.end()),
num_docs: split_metadata.num_docs as u64,
}
}

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/list_fields_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let split_2 = SplitIdAndFooterOffsets {
Expand All @@ -95,6 +96,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let result = ListFieldsEntryResponse {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let client_for_retry = retry_client(
&search_job_placer,
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/retry/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}],
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/retry/search_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let split_2 = SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_end: 100,
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let retry_policy = LeafSearchStreamRetryPolicy {};
let request = LeafSearchStreamRequest {
Expand Down
Loading
Loading