Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of ClickBench Q18, Q35, #13449

Open
3 tasks
alamb opened this issue Nov 16, 2024 · 16 comments
Open
3 tasks

Improve performance of ClickBench Q18, Q35, #13449

alamb opened this issue Nov 16, 2024 · 16 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Nov 16, 2024

Is your feature request related to a problem or challenge?

While looking at the results of the most recent clickbench run

Here is the ClickBench page (link)

I see there are a few queries where DataFusion is significantly slower
Screenshot 2024-11-16 at 7 56 17 AM

The queries are:

Q18:

SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;

Q35:

SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;

Describe the solution you'd like

I would like the queries to go faster

Describe alternatives you've considered

Both queries look like

SELECT COUNT(...) cnt ... ORDER BY cnt DESC LIMIT 10

In other words they are "top 10 count" style queries

By default, DataFusion will compute the counts for all groups, and then pick only the top 10.

I suspect there is some fancier way to do this, perhaps by finding the top 10 values of count when emitting from the group operator or something. It would be interesting to see if we can see what other engines like DuckDB do with this query

Additional context

No response

@alamb alamb added the enhancement New feature or request label Nov 16, 2024
@alamb
Copy link
Contributor Author

alamb commented Nov 16, 2024

Perhaps we can use some variant of the topk grouping: https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/topk

@alamb
Copy link
Contributor Author

alamb commented Nov 16, 2024

First step for this project would be to profile the queries and see what we can improve

@jayzhan211
Copy link
Contributor

I run them and store here. Can download it for analysis

@jayzhan211
Copy link
Contributor

It seems GroupedTopKAggregateStream is not used for handling topK but SortExec with fetch 🤔

query TT
explain SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
----
logical_plan
01)Sort: count(*) DESC NULLS FIRST, fetch=10
02)--Projection: hits.UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m, hits.SearchPhrase, count(*)
03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"), to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]], aggr=[[count(Int64(1)) AS count(*)]]
04)------TableScan: hits projection=[EventTime, UserID, SearchPhrase]
physical_plan
01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m, SearchPhrase@2 as SearchPhrase, count(*)@3 as count(*)]
04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([UserID@0, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1, SearchPhrase@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID, date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[count(*)]
08)--------------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, projection=[EventTime, UserID, SearchPhrase]

query TT
explain SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
----
logical_plan
01)Sort: c DESC NULLS FIRST, fetch=10
02)--Projection: hits.ClientIP, hits.ClientIP - Int64(1), hits.ClientIP - Int64(2), hits.ClientIP - Int64(3), count(*) AS c
03)----Aggregate: groupBy=[[hits.ClientIP, __common_expr_1 AS hits.ClientIP - Int64(1), __common_expr_1 AS hits.ClientIP - Int64(2), __common_expr_1 AS hits.ClientIP - Int64(3)]], aggr=[[count(Int64(1)) AS count(*)]]
04)------Projection: CAST(hits.ClientIP AS Int64) AS __common_expr_1, hits.ClientIP
05)--------TableScan: hits projection=[ClientIP]
physical_plan
01)SortPreservingMergeExec: [c@4 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[c@4 DESC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[ClientIP@0 as ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - Int64(3), count(*)@4 as c]
04)------AggregateExec: mode=FinalPartitioned, gby=[ClientIP@0 as ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - Int64(3)], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([ClientIP@0, hits.ClientIP - Int64(1)@1, hits.ClientIP - Int64(2)@2, hits.ClientIP - Int64(3)@3], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[ClientIP@1 as ClientIP, __common_expr_1@0 - 1 as hits.ClientIP - Int64(1), __common_expr_1@0 - 2 as hits.ClientIP - Int64(2), __common_expr_1@0 - 3 as hits.ClientIP - Int64(3)], aggr=[count(*)]
08)--------------ProjectionExec: expr=[CAST(ClientIP@0 AS Int64) as __common_expr_1, ClientIP@0 as ClientIP]
09)----------------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, projection=[ClientIP]

@Dandandan
Copy link
Contributor

Dandandan commented Nov 17, 2024

Note that q18 and q35 got some speedup in #12996

@Rachelint
Copy link
Contributor

Rachelint commented Nov 17, 2024

For q18, I found string view lead to some regression in my local?


[[email protected]] 16:10:45 ~/arrow-datafusion
$ /home/db/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /home/db/arrow-datafusion/benchmarks/data/hits.parquet --queries-path /home/db/arrow-datafusion/benchmarks/queries/clickbench/queries.sql --query 18 --force-view-types
Running benchmarks with the following options: RunOpt { query: Some(18), common: CommonOpt { iterations: 5, partitions: None, batch_size: 8192, debug: false, force_view_types: true }, path: "/home/db/arrow-datafusion/benchmarks/data/hits.parquet", queries_path: "/home/db/arrow-datafusion/benchmarks/queries/clickbench/queries.sql", output_path: None }
Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Query 18 iteration 0 took 8186.2 ms and returned 10 rows
Query 18 iteration 1 took 8292.9 ms and returned 10 rows
Query 18 iteration 2 took 8276.1 ms and returned 10 rows
Query 18 iteration 3 took 8327.8 ms and returned 10 rows
Query 18 iteration 4 took 8165.0 ms and returned 10 rows
[ perf record: Woken up 762 times to write data ]
[ perf record: Captured and wrote 190.757 MB perf.data (24029 samples) ]

[[email protected]] 16:11:38 ~/arrow-datafusion
$ /home/db/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /home/db/arrow-datafusion/benchmarks/data/hits.parquet --queries-path /home/db/arrow-datafusion/benchmarks/queries/clickbench/queries.sql --query 18
Running benchmarks with the following options: RunOpt { query: Some(18), common: CommonOpt { iterations: 5, partitions: None, batch_size: 8192, debug: false, force_view_types: false }, path: "/home/db/arrow-datafusion/benchmarks/data/hits.parquet", queries_path: "/home/db/arrow-datafusion/benchmarks/queries/clickbench/queries.sql", output_path: None }
Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Query 18 iteration 0 took 7820.3 ms and returned 10 rows
Query 18 iteration 1 took 7648.6 ms and returned 10 rows
Query 18 iteration 2 took 7847.3 ms and returned 10 rows
Query 18 iteration 3 took 7629.2 ms and returned 10 rows
Query 18 iteration 4 took 7709.3 ms and returned 10 rows

@Dandandan
Copy link
Contributor

Dandandan commented Nov 17, 2024

Yes, the regression was tracked here #13188

Also #13275 tracks some further improvements in vectorized_append which seems to be pretty hot in the flamegraphs. 

@Rachelint
Copy link
Contributor

Rachelint commented Nov 17, 2024

Yes, the regression was tracked here #13188

Also #13275 tracks some further improvements in vectorized_append which seems to be pretty hot in the flamegraphs.

Yes, I profiled a verbose version flamegraph for q18 and found Vec::push in vectorized_append is so hot...

I am working on improving vectorized_equal_to and vectorized_append for primitives, seems can help for q18.

xxx

@Dandandan
Copy link
Contributor

Also date_part from extract(minute FROM to_timestamp_seconds("EventTime") seems to be taking some time

I added the following items to query 18 in the issue description:

@Dandandan
Copy link
Contributor

For the queries it seems also possible (but tricky) if the cardinality is high enough (i.e. copying into aggregation columns doesn't reduce memory usage very much), to first execute the aggregation, keep original data in RecordBatches and delay the materialization of other columns until at the end.

@jayzhan211
Copy link
Contributor

Possible improvement for date_part (required to upstream chrono crate) apache/arrow-rs#6746

@Rachelint
Copy link
Contributor

Rachelint commented Dec 1, 2024

I think we can optimize the plan to improve q35.

I found clickhouse has following optimization:

// Before
SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;

// After
SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP" ORDER BY c DESC LIMIT 10;

I am trying it in #13617

@Rachelint
Copy link
Contributor

Rachelint commented Dec 1, 2024

Amazing! q35 get 1.35x faster in my local, when add this optimize rule!

// Before
Q35: SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
Query 35 iteration 0 took 2371.1 ms and returned 10 rows

// After
Q35: SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
Query 35 iteration 0 took 1491.4 ms and returned 10 rows

But it still confuse me, why duckdb run q35 so fast. As I know, duckdb will not optimize the plan like clickhouse.

@jayzhan211
Copy link
Contributor

It seems like a hack on specific query 😆 but still great 👍🏻 !

@waruto210
Copy link
Contributor

Perhaps we can use some variant of the topk grouping: https://github.com/apache/datafusion/tree/main/datafusion/physical-plan/src/aggregates/topk

Similar optimization proposals were found in ClickHouse:
ClickHouse/ClickHouse#72610

@waruto210
Copy link
Contributor

waruto210 commented Dec 11, 2024

It seems GroupedTopKAggregateStream is not used for handling topK but SortExec with fetch 🤔

query TT
explain SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
----
logical_plan
01)Sort: count(*) DESC NULLS FIRST, fetch=10
02)--Projection: hits.UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m, hits.SearchPhrase, count(*)
03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"), to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]], aggr=[[count(Int64(1)) AS count(*)]]
04)------TableScan: hits projection=[EventTime, UserID, SearchPhrase]
physical_plan
01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m, SearchPhrase@2 as SearchPhrase, count(*)@3 as count(*)]
04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([UserID@0, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1, SearchPhrase@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID, date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[count(*)]
08)--------------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, projection=[EventTime, UserID, SearchPhrase]
query TT
explain SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
----
logical_plan
01)Sort: c DESC NULLS FIRST, fetch=10
02)--Projection: hits.ClientIP, hits.ClientIP - Int64(1), hits.ClientIP - Int64(2), hits.ClientIP - Int64(3), count(*) AS c
03)----Aggregate: groupBy=[[hits.ClientIP, __common_expr_1 AS hits.ClientIP - Int64(1), __common_expr_1 AS hits.ClientIP - Int64(2), __common_expr_1 AS hits.ClientIP - Int64(3)]], aggr=[[count(Int64(1)) AS count(*)]]
04)------Projection: CAST(hits.ClientIP AS Int64) AS __common_expr_1, hits.ClientIP
05)--------TableScan: hits projection=[ClientIP]
physical_plan
01)SortPreservingMergeExec: [c@4 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[c@4 DESC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[ClientIP@0 as ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - Int64(3), count(*)@4 as c]
04)------AggregateExec: mode=FinalPartitioned, gby=[ClientIP@0 as ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - Int64(3)], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([ClientIP@0, hits.ClientIP - Int64(1)@1, hits.ClientIP - Int64(2)@2, hits.ClientIP - Int64(3)@3], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[ClientIP@1 as ClientIP, __common_expr_1@0 - 1 as hits.ClientIP - Int64(1), __common_expr_1@0 - 2 as hits.ClientIP - Int64(2), __common_expr_1@0 - 3 as hits.ClientIP - Int64(3)], aggr=[count(*)]
08)--------------ProjectionExec: expr=[CAST(ClientIP@0 AS Int64) as __common_expr_1, ClientIP@0 as ClientIP]
09)----------------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, projection=[ClientIP]

count(*) needs to record count for all keys, top k aggregation is not applicable here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants