Skip to content

Commit

Permalink
feat(7181): build multiple-level cascade tree.
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Aug 22, 2023
1 parent 7d0d2fa commit 813a693
Showing 1 changed file with 32 additions and 15 deletions.
47 changes: 32 additions & 15 deletions datafusion/core/src/physical_plan/sorts/cascade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -51,10 +52,11 @@ impl<C: Cursor + Unpin + Send + 'static> SortPreservingCascadeStream<C> {
// Refer to YieldedCursorStream for where the concat would happen (TODO).
let streams = Arc::new(parking_lot::Mutex::new(streams));

let max_streams_per_merge = 2; // TODO: change this to 10, once we have tested with 2 (to force more leaf nodes)
let mut divided_streams: Vec<MergeStream<C>> =
Vec::with_capacity(stream_count / max_streams_per_merge + 1);
let max_streams_per_merge = 2; // TODO: change this to 10, once we have tested with 2 (to force more cascade levels)
let mut divided_streams: VecDeque<MergeStream<C>> =
VecDeque::with_capacity(stream_count / max_streams_per_merge + 1);

// build leaves
for stream_offset in (0..stream_count).step_by(max_streams_per_merge) {
let limit =
std::cmp::min(stream_offset + max_streams_per_merge, stream_count);
Expand All @@ -63,7 +65,7 @@ impl<C: Cursor + Unpin + Send + 'static> SortPreservingCascadeStream<C> {
let streams =
OffsetCursorStream::new(Arc::clone(&streams), stream_offset, limit);

divided_streams.push(Box::pin(SortPreservingMergeStream::new(
divided_streams.push_back(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
metrics.clone(),
batch_size,
Expand All @@ -72,21 +74,36 @@ impl<C: Cursor + Unpin + Send + 'static> SortPreservingCascadeStream<C> {
)));
}

let next_level: CursorStream<C> =
Box::new(YieldedCursorStream::new(divided_streams));
// build rest of tree
let mut next_level: VecDeque<MergeStream<C>> =
VecDeque::with_capacity(divided_streams.len() / max_streams_per_merge + 1);
while divided_streams.len() > 1 || !next_level.is_empty() {
let fan_in: Vec<MergeStream<C>> = divided_streams
.drain(0..std::cmp::min(max_streams_per_merge, divided_streams.len()))
.collect();

let root: MergeStream<C> = Box::pin(SortPreservingMergeStream::new(
next_level,
metrics.clone(),
batch_size,
fetch,
reservation,
));
next_level.push_back(Box::pin(SortPreservingMergeStream::new(
Box::new(YieldedCursorStream::new(fan_in)),
metrics.clone(),
batch_size,
if divided_streams.is_empty() && next_level.is_empty() {
fetch
} else {
None
}, // fetch, the LIMIT, is applied to the final merge
reservation.new_empty(),
)));
// in order to maintain sort-preserving streams, don't mix the merge tree levels.
if divided_streams.is_empty() {
divided_streams = next_level.drain(..).collect();
}
}

let cascade = root;
Self {
aborted: false,
cascade,
cascade: divided_streams
.remove(0)
.expect("must have a root merge stream"),
schema,
metrics,
}
Expand Down

0 comments on commit 813a693

Please sign in to comment.