-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract CoalesceBatchesStream
to a struct
#11610
Conversation
/// Buffered row count | ||
buffered_rows: usize, | ||
/// Buffer for combining batches | ||
coalescer: BatchCoalescer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole point of the PR is to extract these fields and their management into a struct.
We plan to submit fetching support to |
CoalesceBatchesStream
to a struct
@ozankabak Sounds good to me -- I don't think there is any particular rush for this PR. I'll get this PR ready and then I can rebase it after limit is added. Though now that I think about it, perhaps adding limit would be more straightforward if it was encapsulated in a struct like this 🤔 |
117901b
to
8ef0d83
Compare
For reference, here is the PR we are preparing (and will submit to the upstream repo soon). |
I reviewed that PR and I believe it will be straightforward to resolve the conflicts once it is merged |
8ef0d83
to
352d521
Compare
assert_eq!(24, batches[1].num_rows()); | ||
assert_eq!(24, batches[2].num_rows()); | ||
assert_eq!(8, batches[3].num_rows()); | ||
// Handle fetch limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved this code out of the main CoalesceBatchesExec
poll loop and into a function.
Other than the is_closed()
handling this is literally a copy/paste
#[test] | ||
fn test_coalesce() { | ||
let batch = uint32_batch(0..8); | ||
Test::new() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the tests to actually check the contents of the coalesced batches, in addition to checking the sizes.
I also factored out much of the test repetition and and made it easier to understand I think
This will likely conflict with #11667 so I would like to merge that one first |
352d521
to
40ac302
Compare
/// 2. The output is a sequence of batches, with all but the last being at least | ||
/// `target_batch_size` rows. | ||
/// | ||
/// 3. Eventually this may also be able to handle other optimizations such as a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is my long term ambition -- to apply filter + coalesce in a single operation (and thus save a copy)
This PR is now ready for review when someone has a chance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @alamb
@ozankabak Now that synnada-ai#27 is merged, are we good to merge this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep -- good to go
Thank you @ozankabak and @andygrove for the review. |
Note most of this PR is additional documentation and tests. There is no new functionality
Which issue does this PR close?
Related to #9370
Related to #7957
Rationale for this change
StringViewArray
inCoalesceBatchesStream
#11587)CoalesceBatchesExec
to improve performance #7957) this is the first stepAs an example of what I hope to do with this, today we have a
FilterExec
which copies all rows that pass the filter into a newRecordBatch
followed byCoalesceBatches
which will likely copy the batch again into a larger setWith a struct like this, I think we have the possibility of the filter producing the output batch directly, thus skipping
the intermediate copy. See #7957 and #7957 (comment) for details
What changes are included in this PR?
CoalesceBatchesExec
I also happen to think this change makes the code easier to read as it separates
the coalescing logic from the stream handling logic.
Are these changes tested?
Yes, by existing CI and new unit tests
Are there any user-facing changes?
No changes, this is just an internal code reorganization
Benchmark results show now significant changes
Details