-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vectorized hash grouping #6904
Vectorized hash grouping #6904
Conversation
@@ -16,100 +16,181 @@ | |||
// under the License. | |||
|
|||
//! Hash aggregation through row format | |||
//! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This diff is going to be hard to read -- I recommend anyone interested look at the code directly here: https://github.com/alamb/arrow-datafusion/blob/alamb/fast_gby_hash/datafusion/core/src/physical_plan/aggregates/row_hash.rs
self.sums | ||
.resize_with(total_num_groups, || T::default_value()); | ||
|
||
// NullState dispatches / handles tracking nulls and groups that saw no values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased that most of the update logic for handling nulls / filters is encapsulated (and tested) in NullState::accumulate
group_indices, | ||
values.nulls(), // ignore values | ||
opt_filter, | ||
|group_index| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use wrapping add as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what the implications are -- would that be faster (but not check overflow)? I am happy to change it but I honestly don't know
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested it mainly for consistency.
AFAIK wrapping_add
doesn't check for overflow but wraps, which is the default already, but in debug mode it causes an exception.
Maybe we should remove the wrapping_add
in the other places and "just" use +
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I tried to use wrapping_add
then rustc
told me I needed to check the return type. Given I never expect an overflow and this is on the hot path, I think we should not use wrapping_add
warning: unused return value of `core::num::<impl i64>::wrapping_add` that must be used
--> datafusion/physical-expr/src/aggregate/count.rs:130:17
|
130 | self.counts[group_index].wrapping_add(1);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: this returns the result of the operation, without modifying the original
= note: `#[warn(unused_must_use)]` on by default
help: use `let _ = ...` to ignore the resulting value
|
130 | let _ = self.counts[group_index].wrapping_add(1);
| +++++++
Maybe we should remove the wrapping_add in the other places and "just" use + etc.
What other places are you referring to? I didn't see any in this PR but maybe I am misunderstanding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're using add_wrapping
in sum, average:
sum.add_wrapping(new_value)
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I was confused -- I used add_wrapping
(part of std::ops
) rather than wrapping_add
(part of Arrow NativeType) 🤦
I played around with it -- it turns out anything with an actual rust type (like i64) I can use +=
but for templated code on ArrowNativeType
I can't use +=
I have to use wrapping_add
This is how the code would look like if I use add_wrapping
, which I think looks a bit more tortured (though it is consistent). Do you think I should make the change?
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index 287970de29..cfb1713d8e 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -156,9 +156,10 @@ impl GroupsAccumulator for CountGroupsAccumulator {
.iter()
.zip(group_indices.iter())
.zip(partial_counts.iter())
- .for_each(|((filter_value, &group_index), partial_count)| {
+ .for_each(|((filter_value, &group_index), &partial_count)| {
if let Some(true) = filter_value {
- self.counts[group_index] += partial_count;
+ let count = &mut self.counts[group_index];
+ *count = count.add_wrapping(partial_count);
}
}),
None => group_indices.iter().zip(partial_counts.iter()).for_each(
.zip(partial_counts.iter()) | ||
.for_each(|((filter_value, &group_index), partial_count)| { | ||
if let Some(true) = filter_value { | ||
self.counts[group_index] += partial_count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrapping_add
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above: #6904 (comment)
@@ -44,6 +49,9 @@ impl<T> VecAllocExt for Vec<T> { | |||
|
|||
self.push(x); | |||
} | |||
fn allocated_size(&self) -> usize { | |||
std::mem::size_of::<T>() * self.capacity() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this calculation into its own function as leaving it inline in the group code made it harder to follow in my opinion
@@ -1022,6 +1112,224 @@ impl RowAccumulator for MinRowAccumulator { | |||
} | |||
} | |||
|
|||
trait MinMax { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold do you know of any pre-existing trait that will provide a min based on type?
@@ -0,0 +1,879 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file contains the templated, inner loops for the accumulators that handles filtering and nulls with specialized loops. I think it could be made faster with some more specialization but I think the complexity might have diminishing returns.
.map(|array| array.slice(offset, length)) | ||
.collect(); | ||
|
||
if let Some(f) = filter_opt { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR, but seems weird/inefficient to me the filter
works on sliced arrays rather than just on the original arrays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I agree -- that might make a nice follow on PR if someone wants to optimize aggregates with filters. They aren't used in TPCH or ClickBench so this isn't likely to show up in any profiling we are doing
I think this PR is ready to run. I am still working to gather some clickbench numbers, but otherwise I think this is ready for review |
Given the size and scope of this PR I plan to leave it open for at least a few days to allow anyone who wants to a chance to review or comment. If you need more time please let me know Also, thank you so much @Dandandan for all the help. It has been great working on this PR with you |
(added clickbench results to the PR summary: TLDR they look quite promising ) Update: q32 now runs too where previously it failed due to OOM Details
|
* parallel csv scan * add max line length * Update according to review comments * Update Configuration doc --------- Co-authored-by: Andrew Lamb <[email protected]>
…ps/decimals (apache#6939) * Add additional test coverage for aggregaes using dates/times/timestamps/decimals * Add coverage for date32/date64
I wrote some additional tests and actually found another bug (related to bitand): #6952 |
I am going to run one final performance measurement and if that looks good merge this in and keep working on |
My most recent changes have slowed things down a little, but I have an idea of how to get the performance back. Let's merge this and proceed with development on main. Thank you everyone for your help and encouragement
|
amazing 🚀 |
@alamb @Dandandan @ozankabak |
Awesome -- thank you @mingmwang cc @ozankabak who I think has been thinking about join ordering as well |
Which issue does this PR close?
Part of #6889
Closes #6800
Closes #4973
Closes #956
Closes #846
Closes #418
Rationale for this change
Much faster grouping performance and lower memory usage for large numbers of groups
TPCH (SF1)
ClickBench results
Methodology
1. Run `bash.run.sh` this script: https://github.com/ClickHouse/ClickBench/tree/main/datafusion 2. Save the resulting `result.csv`: [result-fast_gby_hash.csv](https://github.com/apache/arrow-datafusion/files/12014914/result-fast_gby_hash.csv) [result-main.csv](https://github.com/apache/arrow-datafusion/files/12014915/result-main.csv) 3. Run report: `datafusion-cli -f report.sql`
report.sql
What changes are included in this PR?
The code in this PR was written by myself and @Dandandan
GroupedHashAggregateStream
to use vectorized / multi-group updatesGroupsAccumulator
trait with the new vectorized API for managing and updating group stateGroupsAccumulator
for all aggregators that haveRowAccumulator
variantsaccumulate
functionGroupsAccumulator
in terms ofAccumulator
(for slower, but simpler accumulators)Here is the list of
RowAccumulator
s (aka accumulators that have specialized implementations).CountRowAccumulator
MaxRowAccumulator
MinRowAccumulator
AvgRowAccumulator
SumRowAccumulator
BitAndRowAccumulator
BitOrRowAccumulator
BitXorRowAccumulator
BoolAndRowAccumulator
BoolOrRowAccumulator
Follow on work
I plan to remove the GroupedAggregateStream (and all RowAccumulators) as a follow on PR
Are these changes tested?
Yes -- both new and existing tests
Are there any user-facing changes?
Much faster performance -- see above