Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(7181): cascading loser tree merges #7379

Closed
wants to merge 30 commits into from

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Aug 22, 2023

Which issue does this PR close?

External sorting (cascading merges) of the internal-sorted (in-memory) SortPreservingMergeStream.

Closes #7181

Rationale for this change

The current loser tree sort in 0(n log(k)), handing all incoming record batches in a single loser tree merge.
Planned change is to have a cascaded merge with each fan-in up to 10 streams, which should not change the overall asymptotic upper bounds -- but does introduce the ability to do additional performance improvements (such as multi-core).

Performance change

UPDATED
(caveat: on local machine)

Benchmarking -- both branches with 10 threads on M1 Pro chip.
Screen Shot 2023-09-02 at 7 09 36 PM

What changes are included in this PR?

How it works:

  • Each merge uses the same code (in-memory sorting using the sort-preserving-merge loser tree).
  • The sort_order from previous merges, are yielded/streamed into the next merge in the cascade tree.
  • The cascade tree root (final merge) node does the construction and yielding of record batches.

Performance considerations:

  • Focused on not adding too much code to the inner loop of the merge.
  • Each merge is only sorting the normalized keys (accessed via the cursors).
  • The distribution function (a.k.a. how distribute streams between merges) is order preserving.
    • The loser tree (min heap), at each merge, has at most 1 pointer each from input stream at any given moment.
    • Merge stream outputs, are inputs to the next merge stream.
    • Therefore, any downstream (cascaded) merges are also ordered and cannot advance beyond each other.
  • Record batch slicing is expensive. Therefore, only slice (and pass around in merges) the cursors.
    • cursors have the normalized key for comparison.
    • batches are tracked (write once, read multiple).
  • Streams are buffered and multithreaded (when possible).

Are these changes tested?

Current tests are passing. , if and only if, the corresponding change in arrow-rs is also linked.
See the temporary commit 50c8636.

Updated:: removed the need to additional changes in arrow-rs.

Are there any user-facing changes?

Changes are to internal APIs only.

@github-actions github-actions bot added the core Core DataFusion crate label Aug 22, 2023
//! Merge that deals with an arbitrary size of streaming inputs.
//! This is an order-preserving merge.

use crate::physical_plan::metrics::BaselineMetrics;
Copy link
Contributor Author

@wiedld wiedld Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mod is moved code, with the only change => to use the cascaded SortPreservingCascadeStream.


/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions
/// while preserving order.
pub fn streaming_merge(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we added a layer between streaming_merge() and SortPreservingMergeStream (a.k.a. the cascading stream layer) => therefore, decided to move this function to it's own mod.

@Dandandan
Copy link
Contributor

@wiedld are you sure of the benchmark results? TPC-H 17 doesn't contain any sorting, and other queries do not have very expensive sorts.

@alamb
Copy link
Contributor

alamb commented Aug 23, 2023

BTW I think the TPCH benchmark is not one that will likely show the power of this improvement, because as @Dandandan says the TPCH queries don't actually have any large sorts that I know of. I think we need to try the Sort benchmarks. I can help with this.

@alamb
Copy link
Contributor

alamb commented Aug 23, 2023

In addition to the sort benchmark (which might have its own issues) here is a suggested testing methodology:

  1. Use this dataset: traces_nd_random.zip (220MB):

  2. Build a release build of datafusion-cli (both on main and on this branchviacargo cargo build --release`)

  3. Compare performance of this command (which will resort the input data randomly and write it to an output file)

datafusion-cli -c "copy (select * from 'traces_nd_random.parquet' order by time desc) to '/tmp/test.parquet'"

@alamb
Copy link
Contributor

alamb commented Aug 23, 2023

@wiedld and I spoke a bit this afternoon and I think the next steps for this PR are to get a query that shows significant performance improvements. I think the one in #7379 (comment) is a good candidate

I don't really understand the code in this PR yet, but the way I suggest trying to add more parallelism is by "buffering" the the streams so that rather than computing everything on demand with poll_next spawn an explicit tokio::task for each input stream that will try to pull the next input while the current task is merging the input.

Maybe @crepererum or @tustvold can help with a suggestion on how to do the "add buffering/new tasks" in a reasonable rust way

@wiedld wiedld force-pushed the 7181/cascading-loser-tree-merges branch from 601d321 to 76fee2d Compare August 29, 2023 22:00
wiedld added 11 commits August 29, 2023 15:11
…o the same structure.

During the later stages of Cascade merge, we will no longer be sorting based on each streaming batch (one cursor at a time). Instead will be referencing a previous sort_order per [batch_idx][row_idx] when merging previous steps in the cascade. Therefore, in order to keep the same set of Cursors we are moving the input and output structures more closely together.

Later optimizations may be able to decouple these again.
…ge mod.

Merge mod has the SortPreservingMergeStream, containing the loser tree. This SortPreservingMergeStream struct will be used repeatedly as part of the cascading merge; in turn, the cascading merge will be implemented for the streaming_merge() method.
SortPreservingCascadeStream currrently has a single root node of SortPreservingMergeStream.
TODO: build out tree of SortPreservingMergeStream.
…ortPreservingCascadeStream doing the final interleave().

This commit knowingly fails for tests which are utilizing multiple polls to return all record batches. Specifically:
* dataframe::tests::with_column_renamed_join
* physical_plan::sorts::sort_preserving_merge::tests::test_partition_sort_streaming_input_output

TODO: splicing the RecordBatch and Cursor per merge yield.
This requires slicing the batches and cursors, when yielded in parts.

These two tests, reliant on multiple-polls of streaming data, now pass:
* dataframe::tests::with_column_renamed_join
* physical_plan::sorts::sort_preserving_merge::tests::test_partition_sort_streaming_input_output
* OffsetCursorStream enables the same RowCursorStream (with the same RowConverter) to be used across multiple leaf nodes.
* Each tree node is a merge (a.k.a. SortPreservingMergeStream).
* YieldedCursorStream enables the output from the previous merge, to be provided as input to the next merge.
…tches.

This also enabled simplification of code and cursor handling in the SortOrderBuilder.
@wiedld wiedld force-pushed the 7181/cascading-loser-tree-merges branch from 76fee2d to 173577b Compare August 29, 2023 22:16
@wiedld
Copy link
Contributor Author

wiedld commented Aug 29, 2023

Pushed another 2 commits to handle the planned improvements (reducing known expensive operations). Updated the PR description to show that the current sort benchmarks are (roughly) not changing as much -- which was our hope.

Moving on to the next step, which is moving towards buffering and multi-core.

@wiedld
Copy link
Contributor Author

wiedld commented Sep 13, 2023

Updated branch and resolved conflicts.

Note that CI will fail because have a temporary commit to point to the arrow-rs branch changes; and therefore it fails the cargo package audit. But at least it's easier to pull the branch and run it locally.

Note: we are intentionally delaying code review, due to higher priorities.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow @wiedld -- I am very impressed by this PR. I found the code easy to read, which is saying something given how complicated the entire area is. That being said I don't yet fully understand the changes here and am still working through them,

I also locally tested using this #7379 (comment) but that workload was dominated by parquet reading / writing so I couldn't see a difference

I am in the process of trying to reproduce your reported benchmark results

Thank you very much.

datafusion/core/src/physical_plan/sorts/builder.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_plan/sorts/cascade.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Sep 14, 2023

Here are the results of one of my benchmark runs. Very impressive @wiedld

--------------------
Benchmark sort.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Query        ┃  main_base ┃ 7181_cascading-loser-tree-merges ┃         Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ Qsort utf8   │ 72379.31ms │                        5969.19ms │ +12.13x faster │
│ Qsort int    │ 89964.04ms │                        6578.05ms │ +13.68x faster │
│ Qsort        │ 73612.54ms │                       10893.99ms │  +6.76x faster │
│ decimal      │            │                                  │                │
│ Qsort        │ 98054.80ms │                       98251.46ms │      no change │
│ integer      │            │                                  │                │
│ tuple        │            │                                  │                │
│ Qsort utf8   │ 73188.81ms │                        6900.60ms │ +10.61x faster │
│ tuple        │            │                                  │                │
│ Qsort mixed  │ 81693.99ms │                       30029.59ms │  +2.72x faster │
│ tuple        │            │                                  │                │
└──────────────┴────────────┴──────────────────────────────────┴────────────────┘

@alamb
Copy link
Contributor

alamb commented Sep 14, 2023

Given what I have seen with this PR, I think we should proceed with this PR -- @tustvold do you have time to give it a review as well?

There is likely to be an arrow release in the next few days -- @wiedld can you prepare a PR with whatever changes you need in arrow so we can turn this PR mergable (by relying on a released version of arrow?)

@wiedld wiedld force-pushed the 7181/cascading-loser-tree-merges branch from 4720f19 to 3786021 Compare September 15, 2023 07:17
/// 1. [`BatchCursorStream`] yields the initial cursors and batches. (e.g. a RowCursorStream)
/// 2. [`BatchTrackingStream`] collects the batches, to avoid passing those around. Yields a [`CursorStream`](super::stream::CursorStream).
/// 3. This initial CursorStream is for a number of partitions (e.g. 100).
/// 4. The initial single CursorStream is shared across multiple leaf nodes, using [`OffsetCursorStream`].
Copy link
Contributor Author

@wiedld wiedld Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in the notes: reason why a single input CursorStream is shared across leaves, is such that they share the same RowConverter. See updated comment.

After the arrow-rs version bump, will try to slightly change this design. Goal is to remove the mutex around the BatchTrackingStream and have instead the lock be only on a BatchTracker consumed by the OffsetCursorStream (and ofc also consumed in the final interleave in the cascade stream root).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutex removed. This explainer is also updated to reflect the latest design.

@wiedld wiedld marked this pull request as ready for review September 15, 2023 17:59
///
/// Unique representation of sliced cursor is denoted by the [`SlicedBatchCursorIdentifier`].
#[derive(Debug)]
pub struct BatchCursor<C: Cursor> {
Copy link
Contributor Author

@wiedld wiedld Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the CursorStream (not the BatchCursorStream which included the actual record batches). Therefore, I think this could have a better name.

It wraps the cursor, and maps it to the original (tracked) batch -- as well as tracking the sliced offset. Naming ideas?

wiedld added 2 commits October 5, 2023 14:28
  * Move record batch tracking into its own abstraction and with interior mutability
  * Split streams instead of locking, which removes the need to poll per offset subset.
  * As a reflection of this reduced responsibilty, rename OffsetCursorStream to BatchTrackerStream.
@wiedld wiedld force-pushed the 7181/cascading-loser-tree-merges branch from 79e80a6 to f97cc4d Compare October 6, 2023 05:33
RecordBatchReceiverStream::builder(self.schema(), input_partitions);
ReceiverStream::builder(self.schema(), input_partitions);
let input =
Arc::new(RecordBatchReceiverStreamAdaptor::new(self.input.clone()));
Copy link
Contributor Author

@wiedld wiedld Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecordBatchReceiverStream was made generic in this commit, such that it could handle a buffered stream of record_batches, or the sort_orders (yielded per each merge node).

In order to make generic, did the following:

  • create a StreamAdapter trait, with the StreamAdapter::call() to be used for ReceiverStream::run_input().
  • impl a RecordBatchReceiverStreamAdaptor that is used for record batches

Please let me know if I should have structured this differently.

@alamb
Copy link
Contributor

alamb commented Oct 30, 2023

Since we are working this PR through in pieces, marking this PR as draft so it is clear it is not waiting on feedback

@alamb alamb marked this pull request as draft October 30, 2023 20:38
Copy link

github-actions bot commented May 2, 2024

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label May 2, 2024
@wiedld
Copy link
Contributor Author

wiedld commented May 2, 2024

Working on other things. If/when we circle back, we'll be recreating differently.

@wiedld wiedld closed this May 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve performance of large sorts with Cascaded merge / tree
3 participants