-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skipping partial aggregation when it is not helping for high cardinality aggregates #11627
Conversation
Thank you @korowa -- I think this is the right approach. The challenge when I tried it before was that it slowed down some queries. We should run some benchmarks (I can help maybe tomorrow) |
|
||
match opt_filter { | ||
Some(filter) => { | ||
values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use filter kernel here instead of zipping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about filter, but kernels sound like a good idea, I'll try to switch to using them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @Dandandan is suggesting using https://docs.rs/arrow/latest/arrow/compute/kernels/filter/fn.filter.html
However, that would likely require a second copy of the values (apply/collect filter result and then apply prim_fn
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the example shows that it filters out values from the source array, and conversion to state must produce the same number of elements, just placing nulls zeros instead of filtered values, so I'm planning to look for smth like "apply null mask".
I've started with some benchmarks (criterion based ones) and they show that current code for nullable columns (at least for count) is significantly slower that for non nullable ones (~15 times 😞 ), probably some part of this time can be recovered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, we can't use filter here as we need to produce the values as is.
I think we should be able to build the values based on the values buffer and handle nulls separately:
- no filter: just pass null mask of
values
- filter present:
bitwise_and
both null masks
this should also be beneficial for the non-null case, as it avoids the iterator/builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For "no filter" -- casting values.logical_nulls()
to i64 helps a bit. Regarding bitwise_and
-- I'll try (the problem with all logical functions is that filter
may also contain nulls)
I am starting to run clickbench and tpch benchmarks on this PR. Will report results shortly. It is a really neat idea to have the threshold's configurable |
Here are my benchmark results - they look quite good. Other than ClickBench Q32 and TPCH Q17 they all looks faster 😍 Details
I am going to rerun the numbers to make sure they are reproducable and then give this PR a closer look |
The subsequent runs look good (I don't think there is any slowdown in TPCH Q17, but there is still a slowdown in ClickBench Q32) Details
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really cool @korowa . Thank you so much
Not only is it cool that it improves performance in many cases, it is cool that it has in incremental approach (can implemente convert_to_state
for GroupsAccumulators over time)
I have two concerns:
- That this approach may overfit the problem (aka that it isn't generalizeable outside the context of the benchmark runs)
- That this approach might preclude making some larger changes (like simply turning off the intermediate generation)
and produce batched "as-is", replacing aggregate accumulators inputs with corresponding intermediate aggregate states (in order not to break record batch schema for downstream operators -- specifically, for CoalesceBatches)
I wonder if you have thought about some way to disable aggregation entirely in the partial aggregation phase (as in avoid having to convert it into the state
)? The challenge as you have pointed out is that the state types may be different than the input, so it would likely be a larger/more involved change 🤔
I want to think about this PR some more, but I think it is really nice and I am inclined to say we should proceed with this approach
I think to merge it I would like to see:
- Some more background comments on why this approach (the existing code in this PR is already very well commented about what it does 🥇 ) -- I plan to help with this
- Look into why the clickbench queries got slower (I am worried there is some tuning now required which will be hard to get totally optimal)
} | ||
|
||
// Transforms input batch to intermediate aggregate state, without grouping it | ||
fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite clever
}); | ||
builder.finish() | ||
} | ||
(None, None) => Int64Array::from_value(1, values.len()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is unfortunate that we need to create this over and over again 🤔
@alamb thank you for sharing benchmark results -- I'll check out if any of them benefited from this feature (I suppose it shouldn't be triggered in many of them) and will look for the possible reasons of q32 (and other queries) slowdown (actually this one -- q32, should benefit most, when producing state will be implemented for Regarding you comments:
Probably, but I supposed this idea to be opposite to overfitting, since it relies more on the input data, rather then fixed settings (I may be wrong here however).
Initially I've been considering to make partial aggregation just propagate input batches as is, and adding some internal flag into their schema metadata (pointing that final aggregation to use |
let mut builder = Int64Builder::with_capacity(values.len()); | ||
nulls | ||
.into_iter() | ||
.for_each(|is_valid| builder.append_value(is_valid as i64)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe .collect()
should be slightly faster and less verbose than a builder here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better, we should be able to cast the null array to int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW: into_iter().map().collect::<Int64Array>()
seems to be slower than appending values to the builder 🤔
Awesome -- I am planning to look into them as well
The more I think about it, the more I agree with you. While there are tuning knobs (e.g. the fraction of tuples aggregates) I do think they are general.
I think this makes sense and I agree with your conclusion |
My plan here is to spend time tomorrow morning doing some additional investigation / testing on the branch and unless I find any blockers I think we should proceed with it. What I am thinking is that between this PR and the StringView PR #11667 we are going to be in pretty sweet shape. The improvements with this change are so compelling in my opinion that I think we can document any potential performance regressions that this PR causes, and then work on them as a follow on before the release. |
FWIW: regarding benchmarks -- running with Regarding Q32 -- I've run it separately and got equal runtimes for both branches (due to AVG it's not able to skip partial aggregation yet)
|
5a33f11
to
aeb6c84
Compare
I spent some time this morning playing around with ClickBench query 32 locally and I agree any slowdown does not look significant or a blocker. Q32 SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; Running from datafusion-cli: ./datafusion-cli-skip-partial -c "SELECT \"WatchID\", \"ClientIP\", COUNT(*) AS c, SUM(\"IsRefresh\"), AVG(\"ResolutionWidth\") FROM 'hits.parquet' GROUP BY \"WatchID\", \"ClientIP\" ORDER BY c DESC LIMIT 10;" datafusion-cli -c "SELECT \"WatchID\", \"ClientIP\", COUNT(*) AS c, SUM(\"IsRefresh\"), AVG(\"ResolutionWidth\") FROM 'hits.parquet' GROUP BY \"WatchID\", \"ClientIP\" ORDER BY c DESC LIMIT 10;" Here are the timings I got:
|
aeb6c84
to
9b7e4c8
Compare
I also tried out Q32 (that has AVG so can't use this optimization yet) but removed the 1000 partitions, this PRandrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ ./datafusion-cli-skip-partial -c "set datafusion.execution.target_partitions = 1000; SELECT \"WatchID\", \"ClientIP\", COUNT(*) AS c, SUM(\"IsRefresh\") FROM 'hits.parquet' GROUP BY \"WatchID\", \"ClientIP\" ORDER BY c DESC LIMIT 10;"
Elapsed 0.001 seconds.
+---------------------+-------------+---+-----------------------------+
| WatchID | ClientIP | c | sum(hits.parquet.IsRefresh) |
+---------------------+-------------+---+-----------------------------+
| 7904046282518428963 | 1509330109 | 2 | 0 |
| 8566928176839891583 | -1402644643 | 2 | 0 |
| 6655575552203051303 | 1611957945 | 2 | 0 |
| 7224410078130478461 | -776509581 | 2 | 0 |
| 9102894172721185728 | 1489622498 | 1 | 1 |
| 8964981845434484863 | 1822336830 | 1 | 0 |
| 6991883311913569583 | -745122562 | 1 | 0 |
| 6787783378461221127 | -506600142 | 1 | 0 |
| 6042898921955304644 | 2054220936 | 1 | 0 |
| 5581365862985039198 | 104944290 | 1 | 0 |
+---------------------+-------------+---+-----------------------------+
10 row(s) fetched.
Elapsed 6.378 seconds.
1000 partitions, mainandrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ datafusion-cli -c "set datafusion.execution.target_partitions = 1000; SELECT \"WatchID\", \"ClientIP\", COUNT(*) AS c, SUM(\"IsRefresh\") FROM 'hits.parquet' GROUP BY \"WatchID\", \"ClientIP\" ORDER BY c DESC LIMIT 10;"
DataFusion CLI v40.0.0
0 row(s) fetched.
Elapsed 0.002 seconds.
+---------------------+-------------+---+-----------------------------+
| WatchID | ClientIP | c | sum(hits.parquet.IsRefresh) |
+---------------------+-------------+---+-----------------------------+
| 7904046282518428963 | 1509330109 | 2 | 0 |
| 8566928176839891583 | -1402644643 | 2 | 0 |
| 6655575552203051303 | 1611957945 | 2 | 0 |
| 7224410078130478461 | -776509581 | 2 | 0 |
| 6780795588237729988 | 1894276368 | 1 | 1 |
| 6158430646513894356 | -1557291761 | 1 | 0 |
| 8433113762047612962 | 1214823432 | 1 | 0 |
| 8783130976633619349 | 1072197582 | 1 | 0 |
| 4959259883895284379 | 2023656393 | 1 | 0 |
| 6328586531975293675 | 1549952556 | 1 | 1 |
+---------------------+-------------+---+-----------------------------+
10 row(s) fetched.
Elapsed 7.771 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent a bunch more time reviewing this PR today and I think it is good and could be merged as is. Thank you so much @korowa and @Dandandan )
Before merging this PR I think we need
- Run the benchmarks one more time
- Give it a few more days to gather any more review comments
Here are the follow up items I suggest (and I can file tickets):
- More documentation (I started here Improve aggregation documentation for multi-phase aggregation #11695)
- Add a metric to record when group by switches to skip partial aggregate mode, so we can see when this happens in
EXPLAIN ANALYZE
plans - File tickets to support
convert_to_state
for other GroupsAccumulators (likeAVG
for example) -- I think this could be done by the larger community easier after the additional documentaiton (and they can follow the test pattern you have in this PR)
FYI @kazuyukitanimura -- I wonder if you have time to review this change in the context of hash aggregate spilling as you originally contributed #7400
Context:
- Describes this issue: Improve Memory usage + performance with large numbers of groups / High Cardinality Aggregates #6937
- Additional background documentation: Improve aggregation documentation for multi-phase aggregation #11695
@@ -90,6 +94,69 @@ struct SpillState { | |||
merging_group_by: PhysicalGroupBy, | |||
} | |||
|
|||
struct SkipAggregationProbe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @kazuyukitanimura -- I wonder if you have time to review this change to hash aggregate spilling as you originally contributed #7400
Context:
- Describes this issue: Improve Memory usage + performance with large numbers of groups / High Cardinality Aggregates #6937
- Additional background documentation: Improve aggregation documentation for multi-phase aggregation #11695
|
||
match opt_filter { | ||
Some(filter) => { | ||
values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @Dandandan is suggesting using https://docs.rs/arrow/latest/arrow/compute/kernels/filter/fn.filter.html
However, that would likely require a second copy of the values (apply/collect filter result and then apply prim_fn
)
@@ -484,6 +612,12 @@ impl Stream for GroupedHashAggregateStream { | |||
( | |||
if self.input_done { | |||
ExecutionState::Done | |||
} else if self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit is that putting this into a function (like self.should_skip_aggregation()
) would make this logic easier to follow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #11821 with a proposal for this change
@alamb this is also a bit unexpected, since default value of rows to fire check after is 100_000 and its applied per partition (each partition is going to process at least 100k rows normally, without skipping aggregation), and the total number of rows in the file ~100kk (if I'm not mistaken). So this optimization should not benefit in this case, as in case of 1000 partitions each partition will read ~100_000 rows anyway 🤔 |
We will also take a look today or tomorrow |
let mut builder = Int64Builder::with_capacity(values.len()); | ||
nulls.into_iter().zip(filter.iter()).for_each( | ||
|(is_valid, filter_value)| { | ||
builder.append_value( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bitwise_and
+ cast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use the nullif
kernel here
Something like
let nulls = and(nulls, not(filter));
let output = nullif(values);
Update: or maybe we could just and
the nulls from the input and the filter (as nulls
is the validity mask` 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up wtih this in #11734:
/// Converts a `BooleanBuffer` representing a filter to a `NullBuffer`
/// where the NullBuffer is true for all values that were true
/// in the filter and `null` for any values that were false or null
fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
let (filter_bools, filter_nulls) = filter.clone().into_parts();
// Only keep values where the filter was true
// convert all false to null
let filter_bools = NullBuffer::from(filter_bools);
NullBuffer::union(Some(&filter_bools), filter_nulls.as_ref())
}
/// Compute the final null mask for an array
///
/// The output null mask :
/// * is true (non null) for all values that were true in the filter and non null in the input
/// * is false (null) for all values that were false in the filter or null in the input
fn filtered_null_mask(
opt_filter: Option<&BooleanArray>,
input: &dyn Array,
) -> Option<NullBuffer> {
let opt_filter = opt_filter.and_then(filter_to_nulls);
NullBuffer::union(opt_filter.as_ref(), input.nulls())
}
And then you compute the final null mask without messing with the input:
let nulls = filtered_null_mask(opt_filter, sums);
let sums = PrimitiveArray::<T>::new(sums.values().clone(), nulls)
.with_data_type(self.sum_data_type.clone());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, using NullBuffer::union
is much better for the readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was the missing link for me (thank you!) -- we can operate directly on underlying buffers.
I've rewritten state conversion for count on bitand on buffers + cast to Int64 in the end, and according to benchmarks from the commit it got 20-25% faster.
Just a suggestion -- won't it be better to use BooleanBuffer + & (bitand operator) instead of NullBuffer + union? NullBuffer is a bit confusing, so I've "pulled" the logic from union right into state conversion function.
Additionally, I plan to prepare benches and minimize ArrayBuilder usage for min / max / sum during tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten state conversion for count on bitand on buffers + cast to Int64 in the end, and according to benchmarks from the commit it got 20-25% faster.
🎉
Just a suggestion -- won't it be better to use BooleanBuffer + & (bitand operator) instead of NullBuffer + union? NullBuffer is a bit confusing, so I've "pulled" the logic from union right into state conversion function.
I think they are equivalent: NullBuffer
just wraps BooleanBuffer
and NullBuffer::union
just calls &
underneath: https://docs.rs/arrow-buffer/52.2.0/src/arrow_buffer/buffer/null.rs.html#76 (after replicating the match(nulls, filter)
logic)
I don't have a strong opinion about which is more/less confusing
What I suggest we do is pull the logic to compute the output null mask basd on the optional input nullmask and the optional filter into a function (like fn filtered_null_mask
) as it will be used in basically all of the convert_to_state
implementations. As long as it is well documented, I think either implementation will work well
Additionally, I plan to prepare benches and minimize ArrayBuilder usage for min / max / sum during tomorrow.
Sounds good -- would you like to keep updating this PR or shall we merge this PR and continue improvements with additional PRs on main?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to make these few changes in this PR (along with merging docs update and review suggestions) -- don't think it'll take long enough to accumulate any significant conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. We will wait for you to let us know when it is ready to merge
filter.into_iter().for_each(|filter_value| { | ||
builder.append_value(filter_value.is_some_and(|val| val) as i64) | ||
}); | ||
builder.finish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cast
?
I added a couple of suggestions for performance |
@korowa If we added a metric that tracks when this mode switched in, I think it would be easier to diagnose what is going on. I will make a PR to do so. |
@ozankabak if I may toot my own horn a bit, I would personally suggest checking out the docs I wrote korowa#172 (and #11695) before the code of this PR as I tried to explain more at a high level what it is doing. |
a99c081
to
bd0a2bd
Compare
bd0a2bd
to
5b183d6
Compare
5b183d6
to
b3c033f
Compare
@alamb I'm totally fine with that -- taking into account, that there are already some followups/improvements for this feature, it's not worth blocking them (since making state conversion for Please let me know if there are any changes/fixes that have to be done in order to make this PR ready for merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another look at this PR and I think it is looking very nice. Thank you again @korowa and all reviewers
I will plan to merge it tomorrow (Monday) and file follow on tickets to track additional work.
🚀 |
Thank you again everyone for all your work. I am hoping this is the first step towards some significantly improved TPCH / ClickBench performance I filed the following follow on tickets / PRs: |
Which issue does this PR close?
Related to #6937.Closes #6937
Rationale for this change
Currently DF plans (almost always) two aggregation operators -- Partial and Final, executing one after another with Partial output being input for Final. In case aggregate input is almost/close to unique, Partial aggregation doesn't group data well (output row count +- same as input rowcount), and DF ends up with doing the same work twice.
Suggestion is to start skipping partial aggregation after some fixed amount of input rows, in case at that moment accumulated unique groups / input rows exceeds some fixed threshold value (which by default is somewhere between 0.5 and 1, but closer to 1), and produce batched "as-is", replacing aggregate accumulators inputs with corresponding intermediate aggregate states (in order not to break record batch schema for downstream operators -- specifically, for
CoalesceBatches
)What changes are included in this PR?
skip_partial_aggregation_probe_rows_threshold
andskip_partial_aggregation_probe_ratio_threshold
-- the first is responsible for input rows to aggregate before checking aggregation ratio, the second -- for rate thresholdGroupedHashAggregateStream.skip_aggregation_probe
and related methods for updating state / obtaining information if further input aggregation may be skippedGroupsAccumulator.convert_to_state
, and its implementations forPrimitiveGroupsAccumulator
(sum / min / max) andCount
accumulators -- method responsible for convertingRecordBatch
to intermediate aggregate state, without grouping input data, and ``GroupsAccumulator.convert_to_state_supported`, which indicates that accumulator is able to perform conversion described above.Are these changes tested?
Added tests for switching to
SkippingAggregation
state for aggregate stream, and sqllogictests to validate correctness of accumulators in skipping aggregation mode.Are there any user-facing changes?
Partial aggregation results may now contain records with duplicating values of
GROUP BY
expressions