Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Support StreamingAggregate if child output ordering is satisfied #3828

Merged
merged 3 commits into from
Nov 27, 2023

Conversation

ulysses-you
Copy link
Contributor

What changes were proposed in this pull request?

If the child output ordering satisfies the grouping keys then the grouping keys are pregrouped, we can choose to use StreamingAggregate. Velox supports StreamingAggregate which requires the input is pregrouped, so StreamingAggregate could use less memory to do aggregate as it does not need to hold all groups in memory and it could avoid spill.

For example, the query can hit this optimization if the join is SMJ:

SELECT c1, count(*), sum(c2) FROM (
SELECT t1.c1, t2.c2 FROM t1 t1 JOIN t1 t2 ON t1.c1 = t2.c1
)
GROUP BY c1;

How was this patch tested?

manually benchmark

OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 5.10.0-21-amd64
Intel(R) Core(TM) i5-10500 CPU @ 3.10GHz
streaming aggregate:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Enable streaming aggregate                         8074           8101          24          1.0        1009.2       1.0X
Disable streaming aggregate                       12338          12352          20          0.6        1542.2       0.7X

Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/oap-project/gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

Run Gluten Clickhouse CI

@ulysses-you ulysses-you force-pushed the streaming-aggregation branch from 1f11c45 to 68dd54d Compare November 24, 2023 01:39
Copy link

Run Gluten Clickhouse CI

.saveAsTable("t")

try {
doBenchmark()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me that we don't check query result in the benchmark code. So should we add some simple UT code with result check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I will add a test later

@ulysses-you ulysses-you force-pushed the streaming-aggregation branch from 68dd54d to db8201f Compare November 24, 2023 07:12
Copy link

Run Gluten Clickhouse CI

@ulysses-you
Copy link
Contributor Author

cc @zhztheplayer @rui-mo @PHILO-HE it's ready to review, thank you

@ulysses-you
Copy link
Contributor Author

/Benchmark Velox

@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_3828_time.csv log/native_master_11_25_2023_f456a7e86_time.csv difference percentage
q1 36.75 33.92 -2.830 92.30%
q2 24.62 24.63 0.014 100.06%
q3 37.41 37.97 0.555 101.48%
q4 36.49 36.67 0.179 100.49%
q5 70.86 70.09 -0.773 98.91%
q6 5.46 6.75 1.297 123.78%
q7 84.63 83.98 -0.647 99.24%
q8 88.13 86.87 -1.263 98.57%
q9 125.75 123.67 -2.084 98.34%
q10 46.22 44.37 -1.845 96.01%
q11 19.72 19.78 0.062 100.32%
q12 24.07 28.12 4.051 116.83%
q13 44.83 45.86 1.028 102.29%
q14 16.19 18.47 2.277 114.06%
q15 27.38 29.09 1.714 106.26%
q16 15.76 15.40 -0.366 97.68%
q17 100.53 99.59 -0.935 99.07%
q18 145.25 147.30 2.053 101.41%
q19 13.06 12.94 -0.116 99.11%
q20 26.76 27.33 0.574 102.15%
q21 219.88 219.66 -0.223 99.90%
q22 13.14 12.88 -0.265 97.98%
total 1222.87 1225.33 2.457 100.20%

@ulysses-you
Copy link
Contributor Author

/Benchmark Velox

@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_3828_time.csv log/native_master_11_25_2023_f456a7e86_time.csv difference percentage
q1 33.43 33.92 0.491 101.47%
q2 24.65 24.63 -0.016 99.93%
q3 37.19 37.97 0.781 102.10%
q4 37.34 36.67 -0.670 98.21%
q5 69.20 70.09 0.885 101.28%
q6 7.31 6.75 -0.559 92.35%
q7 85.90 83.98 -1.922 97.76%
q8 87.16 86.87 -0.291 99.67%
q9 125.12 123.67 -1.447 98.84%
q10 45.53 44.37 -1.152 97.47%
q11 20.22 19.78 -0.436 97.84%
q12 25.83 28.12 2.285 108.84%
q13 46.62 45.86 -0.766 98.36%
q14 14.89 18.47 3.572 123.99%
q15 28.70 29.09 0.393 101.37%
q16 15.61 15.40 -0.216 98.62%
q17 100.74 99.59 -1.147 98.86%
q18 147.88 147.30 -0.581 99.61%
q19 12.78 12.94 0.163 101.27%
q20 28.31 27.33 -0.983 96.53%
q21 222.67 219.66 -3.012 98.65%
q22 13.50 12.88 -0.618 95.42%
total 1230.57 1225.33 -5.246 99.57%

Copy link
Contributor

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

agg.child.outputOrdering
case _ => child.outputOrdering
}
val requiredOrdering = groupingExpressions.map(expr => SortOrder.apply(expr, Ascending))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ascending by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean it only fits with SMJ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC Ascending is the Spark default ordering, all the bulit-in SQL operators follow it. e.g., SMJ, Window, SortAggregate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it by Spark's limitation that a plan node can't require "asc or desc" child ordering at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's a kind of hard code, but so far, I did not see any requirements about changing it. Will Velox operators require or introduce Descending ordering ?

@@ -976,7 +976,9 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::AggregateRel& ag
if (aggRel.has_advanced_extension()) {
std::vector<TypePtr> types;
const auto& extension = aggRel.advanced_extension();
if (!validateInputTypes(extension, types)) {
// Aggregate always has advanced extension for steaming aggregate optimization,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: steaming -> streaming

Copy link

Run Gluten Clickhouse CI

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's already in good shape in my opinion.

val childOrdering = child match {
case agg: HashAggregateExecBaseTransformer
if agg.groupingExpressions == this.groupingExpressions =>
agg.child.outputOrdering
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why propagating child's child's ordering here? It would be great if we could also leave some comments in code. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments, hope it is clear now

Comment on lines 77 to 80
protected def isGroupingKeysPreGrouped: Boolean = {
if (!conf.getConf(GlutenConfig.COLUMNAR_PREFER_STREAMING_AGGREGATE)) {
return false
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check doesn't do as the method name isGroupingKeysPreGrouped said?

I would suggest renaming isGroupingKeysPreGrouped to something like isCapableForStreamingAggregation, or just moving this config check out.

Copy link

Run Gluten Clickhouse CI

@ulysses-you ulysses-you merged commit 4b5d0c1 into apache:main Nov 27, 2023
16 checks passed
@ulysses-you ulysses-you deleted the streaming-aggregation branch November 27, 2023 05:18
@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_3828_time.csv log/native_master_11_26_2023_f456a7e86_time.csv difference percentage
q1 35.23 34.38 -0.844 97.60%
q2 25.19 24.62 -0.562 97.77%
q3 37.42 37.74 0.322 100.86%
q4 38.06 36.67 -1.390 96.35%
q5 73.59 68.82 -4.772 93.52%
q6 7.29 7.12 -0.170 97.67%
q7 83.14 82.01 -1.125 98.65%
q8 87.89 87.72 -0.162 99.82%
q9 126.58 120.79 -5.792 95.42%
q10 45.38 44.22 -1.162 97.44%
q11 19.73 19.96 0.224 101.14%
q12 22.64 28.56 5.921 126.16%
q13 45.28 46.61 1.323 102.92%
q14 17.68 14.35 -3.335 81.14%
q15 25.91 29.15 3.240 112.50%
q16 15.59 15.41 -0.178 98.86%
q17 103.99 100.98 -3.015 97.10%
q18 151.44 145.61 -5.831 96.15%
q19 14.46 12.88 -1.576 89.10%
q20 33.74 27.79 -5.948 82.37%
q21 222.59 223.56 0.971 100.44%
q22 13.43 13.03 -0.400 97.02%
total 1246.24 1221.98 -24.260 98.05%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants