Skip to content

Commit

Permalink
optimize topn requests
Browse files Browse the repository at this point in the history
add logic to detect which splits will deliver the top n results for
requests. This is only supported for match_all requests, with optional
sort_by on timestamp sorting.

start_timestamp, end_timestamp as well as a filter on the timestamp field
is not supported currently but could be.
  • Loading branch information
PSeitz committed Jun 4, 2024
1 parent f8590d5 commit 6cae67f
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 19 deletions.
2 changes: 2 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ message SplitIdAndFooterOffsets {
optional int64 timestamp_start = 4;
// The highest timestamp appearing in the split, in seconds since epoch
optional int64 timestamp_end = 5;
// The number of docs in the split
uint64 num_docs = 6;
}

// Hits returned by a FetchDocRequest.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

158 changes: 139 additions & 19 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ async fn leaf_search_single_split(
return Ok(cached_answer);
}

let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date and time filter, where the current
// split can't have better results.
//
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(split.num_docs));
}

let split_id = split.split_id.to_string();
let index = open_index_with_caches(
searcher_context,
Expand All @@ -382,19 +393,6 @@ async fn leaf_search_single_split(

let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date, where the current split can't have
// better results.
//
// TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save
// opening the index and execute this earlier. Opening splits is typically served from the
// cache, so there may be no gain adding that info to SplitIdAndFooterOffsets.
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64));
}

let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

Expand Down Expand Up @@ -879,6 +877,130 @@ impl CanSplitDoBetter {
}
}

/// This function tries to detect upfront which splits contain the top n hits and convert other
/// split searches to count only searches. It also optimizes split order.
///
/// Returns the search_requests with their split.
fn optimize(
&self,
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
self.optimize_split_order(&mut splits);
// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>();

if request.aggregation_request.is_some() || request.search_after.is_some() {
return Ok(split_with_req);
}
let query_ast: QueryAst = serde_json::from_str(request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if request.start_timestamp.is_some() || request.end_timestamp.is_some() {
return Ok(split_with_req);
}

if !matches!(query_ast, QueryAst::MatchAll) {
return Ok(split_with_req);
}

// Count the number of splits which contain enough documents
let count_required_splits =
move |split_with_req: &[(SplitIdAndFooterOffsets, SearchRequest)],
num_requested_docs: u64| {
let mut num_docs = 0;
split_with_req
.iter()
.take_while(|(split, _req)| {
let need_more_docs = num_docs < num_requested_docs;
num_docs += split.num_docs;
need_more_docs
})
.count()
};

// reuse the detected sort order in split_filter
// we want to detect cases where we can convert some split queries to count only queries
let num_requested_docs = request.start_offset + request.max_hits;
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
// In this case there is no sort order, we order by split id.
// If the the first split has enough documents, we can convert the other queries to
// count only queries
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
for (_split, ref mut request) in &mut split_with_req[num_splits..] {
disable_search_request_hits(request);
}
}
CanSplitDoBetter::Uninformative => {}
CanSplitDoBetter::SplitTimestampLower(_) => {
// We order by timestamp asc. split_with_req is sorted by timestamp_start.
//
// Calculate the number of splits which are guaranteed to deliver enough documents.
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
assert!(
num_splits > 0,
"We should always have at least one split to search"
);
//
// If we know that some splits will deliver enough documents, we can convert the
// others to count only queries.
// Since we only have start and end ranges and don't know the distribution we make
// sure the splits dont' overlap, since the distribution of two
// splits could be like this (dot is a timestamp doc on a x axis), for top 2
// queries.
// ```
// [. .] Split1 has enough docs, but last doc is not in top 2
// [.. .] Split2 first doc is in top2
// ```
// Let's get the biggest timestamp_end of the first num_splits splits
let biggest_end_timestamp = split_with_req
.iter()
.take(num_splits)
.map(|(split, _)| split.timestamp_end())
.max()
.unwrap();
for (split, ref mut request) in split_with_req.iter_mut().skip(num_splits) {
if split.timestamp_start() > biggest_end_timestamp {
disable_search_request_hits(request);
}
}
}
CanSplitDoBetter::SplitTimestampHigher(_) => {
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
//
// Calculate the number of splits which are guaranteed to deliver enough documents.
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
assert!(
num_splits > 0,
"We should always have at least one split to search"
);
// We have the number of splits we need to search to get enough docs, now we need to
// find the splits that don't overlap.
//
// Let's get the smallest timestamp_start of the first num_splits splits
let smallest_start_timestamp = split_with_req
.iter()
.take(num_splits)
.map(|(split, _)| split.timestamp_start())
.min()
.unwrap();
for (split, ref mut request) in split_with_req.iter_mut().skip(num_splits) {
if split.timestamp_end() < smallest_start_timestamp {
disable_search_request_hits(request);
}
}
}
CanSplitDoBetter::FindTraceIdsAggregation(_) => {}
}

Ok(split_with_req)
}

/// Returns whether the given split can possibly give documents better than the one already
/// known to match.
fn can_be_better(&self, split: &SplitIdAndFooterOffsets) -> bool {
Expand Down Expand Up @@ -1071,14 +1193,14 @@ pub async fn leaf_search(
searcher_context: Arc<SearcherContext>,
request: Arc<SearchRequest>,
index_storage: Arc<dyn Storage>,
mut splits: Vec<SplitIdAndFooterOffsets>,
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
aggregations_limits: AggregationLimits,
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5));

let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name());
split_filter.optimize_split_order(&mut splits);
let split_with_req = split_filter.optimize(request.clone(), splits)?;

// if client wants full count, or we are doing an aggregation, we want to run every splits.
// However if the aggregation is the tracing aggregation, we don't actually need all splits.
Expand All @@ -1088,22 +1210,20 @@ pub async fn leaf_search(

let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());
let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, &aggregations_limits)?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

for split in splits {
for (split, mut request) in split_with_req {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");

let mut request = (*request).clone();

let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
continue;
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn
.time_range
.as_ref()
.map(|time_range| *time_range.end()),
num_docs: split_metadata.num_docs as u64,
}
}

Expand Down
33 changes: 33 additions & 0 deletions quickwit/rest-api-tests/run_tests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env python3

import copy
import glob
import gzip
import http
import json
import os
import requests
import random
import shutil
import subprocess
import sys
Expand Down Expand Up @@ -97,6 +99,16 @@ def run_request_step(method, step, previous_result):
kvargs["data"] = open(body_from_file, 'rb').read()

kvargs = resolve_previous_result(kvargs, previous_result)
shuffle_ndjson = step.get("shuffle_ndjson", None)
if shuffle_ndjson is not None:
docs_per_split = distribute_items(shuffle_ndjson, step.get("min_splits", 1), step.get("max_splits", 5), step.get("seed", None))

for i, bucket in enumerate(docs_per_split):
new_step = copy.deepcopy(step)
del new_step["shuffle_ndjson"]
new_step["ndjson"] = bucket
run_request_step(method, new_step, previous_result)
return;
ndjson = step.get("ndjson", None)
if ndjson is not None:
# Add a newline at the end to please elasticsearch -> "The bulk request must be terminated by a newline [\\n]".
Expand All @@ -116,6 +128,27 @@ def run_request_step(method, step, previous_result):
raise e
return json_resp

def distribute_items(items, min_buckets, max_buckets, seed=None):
if seed is None:
seed = random.randint(0, 10000)
random.seed(seed)

# Determine the number of buckets
num_buckets = random.randint(min_buckets, max_buckets)

# Initialize empty buckets
buckets = [[] for _ in range(num_buckets)]

# Distribute items randomly into buckets
for item in items:
random_bucket = random.randint(0, num_buckets - 1)
buckets[random_bucket].append(item)

# Print the seed for reproducibility
print(f"Seed: {seed}")

return buckets

def check_result(result, expected, context_path = ""):
if type(expected) == dict and "$expect" in expected:
expectations = expected["$expect"]
Expand Down
Loading

0 comments on commit 6cae67f

Please sign in to comment.