Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clickbench q32 not working in 32GB RAM #5969

Closed
jychen7 opened this issue Apr 11, 2023 · 15 comments
Closed

Clickbench q32 not working in 32GB RAM #5969

jychen7 opened this issue Apr 11, 2023 · 15 comments
Labels
bug Something isn't working

Comments

@jychen7
Copy link
Contributor

jychen7 commented Apr 11, 2023

Describe the bug

q32 does not work in c6a.4xlarge (32GB ram), it got killed.

SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10

To Reproduce

https://github.com/ClickHouse/ClickBench/tree/main/datafusion

Expected behavior

q32 should work

ps: DuckDB takes 5.28s in same type of instance

Additional context

I haven't try in larger instance, e.g. c6a.8xlarge (64GB ram) or up
it works in larger instance, refer to #5969 (comment)

Exec Plan

explain SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |   Sort: c DESC NULLS FIRST, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |     Projection: hits.WatchID, hits.ClientIP, COUNT(UInt8(1)) AS c, SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |       Aggregate: groupBy=[[hits.WatchID, hits.ClientIP]], aggr=[[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |         TableScan: hits projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |   SortPreservingMergeExec: [c@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |     SortExec: fetch=10, expr=[c@2 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |       ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, COUNT(UInt8(1))@2 as c, SUM(hits.IsRefresh)@3 as SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)@4 as AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |         AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |             RepartitionExec: partitioning=Hash([Column { name: "WatchID", index: 0 }, Column { name: "ClientIP", index: 1 }], 12), input_partitions=12                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |               AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |                 ParquetExec: limit=None, partitions={12 groups: [[hits.parquet:0..1231664704], [hits.parquet:1231664704..2463329408], [hits.parquet:2463329408..3694994112], [hits.parquet:3694994112..4926658816], [hits.parquet:4926658816..6158323520], [hits.parquet:6158323520..7389988224], [hits.parquet:7389988224..8621652928], [hits.parquet:8621652928..9853317632], [hits.parquet:9853317632..11084982336], [hits.parquet:11084982336..12316647040], [hits.parquet:12316647040..13548311744], [hits.parquet:13548311744..14779976446]]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
@jychen7 jychen7 added the bug Something isn't working label Apr 11, 2023
@comphead
Copy link
Contributor

Looking into this

@comphead
Copy link
Contributor

df-distinct-graph

@jychen7
Copy link
Contributor Author

jychen7 commented Apr 12, 2023

I am wondering whether a real TopK operator could reduce the memory usage for q32 to run
#3516 (comment)

@comphead
Copy link
Contributor

It takes some CPU time on datafusion::physical_plan::aggregates::row_hash::slice_and_maybe_filter

@jychen7 jychen7 changed the title Clickbench q32 not working Clickbench q32 not working in 32GB ram Apr 12, 2023
@jychen7 jychen7 changed the title Clickbench q32 not working in 32GB ram Clickbench q32 not working in 32GB RAM Apr 12, 2023
@jychen7
Copy link
Contributor Author

jychen7 commented Apr 12, 2023

I haven't try in larger instance, e.g. c6a.8xlarge (64GB ram) or up

confirm it works in c6a.8xlarge (32vCPU, 64GB RAM)

Query took 19.378 seconds

from htop, it uses full 32 core and up to 34 GB RAM

@jychen7
Copy link
Contributor Author

jychen7 commented Apr 12, 2023

@jychen7
Copy link
Contributor Author

jychen7 commented Apr 13, 2023

Before investigating deeper reducing memory usage, we may improve datafusion-cli (or a new option in datafusion core) to set the memory pool with machine memory limit. This crate could help: https://github.com/GuillaumeGomez/sysinfo

(ps: It already uses the number of CPU cores for parallel reading/sorting, but not memory)

https://github.com/apache/arrow-datafusion/blob/4c7833ebfdb2d022830bb97862e0ce36b0b3d6b1/datafusion/execution/src/runtime_env.rs#L152-L161


But unfortunately, my local run shows it is using peak 34GB in Activity Monitor (macOS) and sometimes pass, sometimes returns the following error

Before

unlimited, Query took 115.120 seconds
https://github.com/apache/arrow-datafusion/blob/4c7833ebfdb2d022830bb97862e0ce36b0b3d6b1/datafusion-cli/src/main.rs#L149-L152

After

limit 16*10^9 bytes (~15GB), 100% fraction
Resources exhausted: Failed to allocate additional 541794 bytes for GroupedHashAggregateStream[2] with 471971648 bytes already allocated - maximum available is 265860

fn create_runtime_env() -> Result<RuntimeEnv> {
    let rn_config = RuntimeConfig::new().with_memory_limit(
        16000000000,
        1.0
    );
    RuntimeEnv::new(rn_config)
}

I tried hardcode 32,000,000,000 bytes (29.8GB) limit and 1.0 fraction in c6a.4xlarge

q32 still got killed

@jychen7
Copy link
Contributor Author

jychen7 commented Apr 14, 2023

when running with RUST_LOG=debug datafusion-cli, I find out it is slow during do_sort, but not sure which part is slow: insert_batch or final sort.

So I add debug log in https://github.com/apache/arrow-datafusion/compare/main...jychen7:arrow-datafusion:debug-clickbench-q32?expand=1.
😮 Surprisingly, it takes most of time between Start do_sort and Start insert_batch

update: probably the log does not reflect true timing, due to the async call of insert_batch

[2023-04-14T02:26:51Z DEBUG datafusion::physical_plan::sorts::sort] Start do_sort for partition 11 of context session_id 039882c2-6b74-4d6a-9739-a50c4fbe1ac7 and task_id None
[2023-04-14T02:39:01Z DEBUG datafusion::physical_plan::sorts::sort] Start insert_batch for partition 11 with input size 295992

@comphead
Copy link
Contributor

reg to flamegraph the datafusion::physical_plan::aggregates::row_hash::slice_and_maybe_filter is expensive.
its expensive because excessive vector allocations. I will try to rewrite this method tomorrow and share results

@jychen7
Copy link
Contributor Author

jychen7 commented Apr 14, 2023

reg to flamegraph the datafusion::physical_plan::aggregates::row_hash::slice_and_maybe_filter is expensive.
it is expensive because excessive vector allocations

that makes sense. From my debugging above, I think SortExec is not a slow part. Looks like its children's plan like RepartitionExec or AggregateExec is slow

@comphead
Copy link
Contributor

comphead commented Apr 19, 2023

with optimized datafusion::physical_plan::aggregates::row_hash::slice_and_maybe_filter there is ~25% gain.
was 68sec, now 50 sec

2.4 GHz 8-Core Intel Core i9

@comphead
Copy link
Contributor

Filed #6064

@ozankabak
Copy link
Contributor

I wonder if this query still fails with OOM after the recent improvements. Also, do you guys think #6657 help with this query?

@alamb
Copy link
Contributor

alamb commented Jun 23, 2023

Also, do you guys think #6657 help with this query?

I do think so

@alamb
Copy link
Contributor

alamb commented Jul 17, 2023

After #6904 is merged, I have been able to run all click bench queries successfully with 32GB of ram where in datafusion 27.0.0 q32 caused an OOM for me. This I think this is done

@alamb alamb closed this as completed Jul 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants