Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use unified allocator for execution iterators #613

Merged
merged 12 commits into from
Jul 10, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Jun 29, 2024

Which issue does this PR close?

Closes #648.
Relates to #387.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya viirya marked this pull request as draft June 29, 2024 23:00
@codecov-commenter
Copy link

codecov-commenter commented Jun 29, 2024

Codecov Report

Attention: Patch coverage is 0% with 5 lines in your changes missing coverage. Please review.

Project coverage is 33.58%. Comparing base (eff2897) to head (49995dd).
Report is 4 commits behind head on main.

Files Patch % Lines
...n/scala/org/apache/comet/vector/StreamReader.scala 0.00% 3 Missing ⚠️
...mmon/src/main/scala/org/apache/comet/package.scala 0.00% 1 Missing ⚠️
...ain/scala/org/apache/comet/vector/NativeUtil.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #613      +/-   ##
============================================
+ Coverage     33.42%   33.58%   +0.16%     
- Complexity      805      828      +23     
============================================
  Files           109      109              
  Lines         42462    42531      +69     
  Branches       9342     9344       +2     
============================================
+ Hits          14191    14286      +95     
+ Misses        25322    25296      -26     
  Partials       2949     2949              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@viirya viirya force-pushed the unify_allocator branch 3 times, most recently from 7d4899a to dd09e1a Compare July 3, 2024 06:13
@viirya
Copy link
Member Author

viirya commented Jul 7, 2024

The OOM issue of some TPCDS queries in CI will be fixed by #639 .

@viirya
Copy link
Member Author

viirya commented Jul 9, 2024

This only got failures on CometTPCDSQuerySuite with sort merge join configs (broadcast and hash join configs are passed).

But I don't see any details about the failure in CI logs. Only got:

2024-07-09T15:55:27.4236260Z [ERROR] Failed to execute goal org.scalatest:scalatest-maven-plugin:2.0.2:test (test) on project comet-spark-spark3.4_2.12: There are test failures -> [Help 1]
2024-07-09T15:55:27.4283847Z org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.scalatest:scalatest-maven-plugin:2.0.2:test (test) on project comet-spark-spark3.4_2.12: There are test failures
2024-07-09T15:55:27.4621881Z     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 (MojoExecutor.java:333)
2024-07-09T15:55:27.4630194Z     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:316)
2024-07-09T15:55:27.4634523Z     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:212)
2024-07-09T15:55:27.4635530Z     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:174)
2024-07-09T15:55:27.4667105Z     at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 (MojoExecutor.java:75)
2024-07-09T15:55:27.4756898Z     at org.apache.maven.lifecycle.internal.MojoExecutor$1.run (MojoExecutor.java:162)
2024-07-09T15:55:27.4764057Z     at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute (DefaultMojosExecutionStrategy.java:39)
2024-07-09T15:55:27.4785783Z     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:159)
2024-07-09T15:55:27.4796446Z     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:105)
2024-07-09T15:55:27.4807376Z     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:73)
2024-07-09T15:55:27.4814525Z     at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:53)
2024-07-09T15:55:27.4817912Z     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:118)
2024-07-09T15:55:27.4821849Z     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:261)
2024-07-09T15:55:27.4828407Z     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:173)
2024-07-09T15:55:27.4829379Z     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:101)
2024-07-09T15:55:27.4832730Z     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:906)
2024-07-09T15:55:27.4835266Z     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:283)
2024-07-09T15:55:27.4836081Z     at org.apache.maven.cli.MavenCli.main (MavenCli.java:206)
2024-07-09T15:55:27.4839506Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
2024-07-09T15:55:27.4842763Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
2024-07-09T15:55:27.4849012Z     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
2024-07-09T15:55:27.4861001Z     at java.lang.reflect.Method.invoke (Method.java:566)
2024-07-09T15:55:27.4870461Z     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:283)
2024-07-09T15:55:27.4885612Z     at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:226)
2024-07-09T15:55:27.4890602Z     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:407)
2024-07-09T15:55:27.4899835Z     at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:348)
2024-07-09T15:55:27.4904594Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
2024-07-09T15:55:27.4911451Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
2024-07-09T15:55:27.4915859Z     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
2024-07-09T15:55:27.4919199Z     at java.lang.reflect.Method.invoke (Method.java:566)
2024-07-09T15:55:27.4932920Z     at org.apache.maven.wrapper.BootstrapMainStarter.start (BootstrapMainStarter.java:52)
2024-07-09T15:55:27.4936029Z     at org.apache.maven.wrapper.WrapperExecutor.execute (WrapperExecutor.java:161)
2024-07-09T15:55:27.4946395Z     at org.apache.maven.wrapper.MavenWrapperMain.main (MavenWrapperMain.java:73)
2024-07-09T15:55:27.4963372Z Caused by: org.apache.maven.plugin.MojoFailureException: There are test failures
2024-07-09T15:55:27.4981790Z     at org.scalatest.tools.maven.TestMojo.execute (TestMojo.java:109)
2024-07-09T15:55:27.4997308Z     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:126)
2024-07-09T15:55:27.5010381Z     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 (MojoExecutor.java:328)
2024-07-09T15:55:27.5024796Z     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:316)
2024-07-09T15:55:27.5030990Z     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:212)
2024-07-09T15:55:27.5041603Z     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:174)
2024-07-09T15:55:27.5047002Z     at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 (MojoExecutor.java:75)
2024-07-09T15:55:27.5052368Z     at org.apache.maven.lifecycle.internal.MojoExecutor$1.run (MojoExecutor.java:162)
2024-07-09T15:55:27.5058972Z     at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute (DefaultMojosExecutionStrategy.java:39)
2024-07-09T15:55:27.5060896Z     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:159)
2024-07-09T15:55:27.5066526Z     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:105)
2024-07-09T15:55:27.5071471Z     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:73)
2024-07-09T15:55:27.5079711Z     at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:53)
2024-07-09T15:55:27.5147147Z     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:118)
2024-07-09T15:55:27.5153540Z     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:261)
2024-07-09T15:55:27.5163314Z     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:173)
2024-07-09T15:55:27.5174125Z     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:101)
2024-07-09T15:55:27.5180318Z     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:906)
2024-07-09T15:55:27.5186235Z     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:283)
2024-07-09T15:55:27.5191786Z     at org.apache.maven.cli.MavenCli.main (MavenCli.java:206)
2024-07-09T15:55:27.5196494Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
2024-07-09T15:55:27.5202334Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
2024-07-09T15:55:27.5209329Z     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
2024-07-09T15:55:27.5215915Z     at java.lang.reflect.Method.invoke (Method.java:566)
2024-07-09T15:55:27.5222526Z     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:283)
2024-07-09T15:55:27.5228160Z     at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:226)
2024-07-09T15:55:27.5234243Z     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:407)
2024-07-09T15:55:27.5242323Z     at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:348)
2024-07-09T15:55:27.5254924Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
2024-07-09T15:55:27.5261708Z     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
2024-07-09T15:55:27.5265063Z     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
2024-07-09T15:55:27.5272249Z     at java.lang.reflect.Method.invoke (Method.java:566)
2024-07-09T15:55:27.5278462Z     at org.apache.maven.wrapper.BootstrapMainStarter.start (BootstrapMainStarter.java:52)
2024-07-09T15:55:27.5288016Z     at org.apache.maven.wrapper.WrapperExecutor.execute (WrapperExecutor.java:161)
2024-07-09T15:55:27.5293407Z     at org.apache.maven.wrapper.MavenWrapperMain.main (MavenWrapperMain.java:73)
2024-07-09T15:55:27.5298503Z [ERROR] 
2024-07-09T15:55:27.5303484Z [ERROR] 
2024-07-09T15:55:27.5307253Z [ERROR] For more information about the errors and possible solutions, please read the following articles:
2024-07-09T15:55:27.5317294Z [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
2024-07-09T15:55:27.5323976Z [ERROR] 
2024-07-09T15:55:27.5340567Z [ERROR] After correcting the problems, you can resume the build with the command
2024-07-09T15:55:27.5353491Z [ERROR]   mvn <args> -rf :comet-spark-spark3.4_2.12

I also cannot reproduce it locally.

@viirya viirya force-pushed the unify_allocator branch 3 times, most recently from 2f64c7a to f5cac20 Compare July 9, 2024 21:18
"q70a",
// TODO: unknown failure (seems memory usage over Github action runner) in CI with q72-v2.7
// in https://github.com/apache/datafusion-comet/pull/613.
// "q72",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In latest run, I saw Error: Process completed with exit code 143.. It seems like the memory usage is larger than the Github action runner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a few particular queries (q72, q16) seems to use more memory than others. q72 cannot be run through sort merge join config now in the CI runner due to its resource limit, but I can run it locally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will investigate the two queries further but they seem not related to the changes here.

@viirya viirya marked this pull request as ready for review July 10, 2024 13:49
@viirya viirya requested a review from andygrove July 10, 2024 13:53
Comment on lines +111 to +113
// TODO: unknown failure (seems memory usage over Github action runner) in CI with q72 in
// https://github.com/apache/datafusion-comet/pull/613.
// "q72",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 on skipping running the official q72 query by default (because it is so ridiculous), especially in CI. However, maybe we should consider running an optimized version where the join order is sensible, which makes it at least 10x faster and uses far less memory. I will file a follow on issue to discuss this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of q72 is to test vendors join reordering rules, and that isn't really very relevant to Spark or Comet since Spark queries typically don't have access to statistics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the version I have been using locally. Since we are not aiming to run the official TPC-DS benchmarks, but just our derived benchmarks, and also given that we are comparing Spark to Comet for the same queries, I think this would be fine to use by default as long it is well documented in our benchmarking guide.

I do think we should still test with the original q72 as a separate exercise though, because if Spark can run it then Comet should be able to as well (with the same memory configuration).

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join item on (i_item_sk = cs_item_sk)
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity 
  and d3.d_date > d1.d_date + 5
  and hd_buy_potential = '501-1000'
  and d1.d_year = 1999
  and cd_marital_status = 'S'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
 LIMIT 100;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of q72 is to test vendors join reordering rules, and that isn't really very relevant to Spark or Comet since Spark queries typically don't have access to statistics.

Btw, Spark has the capacity to do join reordering if statistics are available but it relies on enabling CBO features which are disabled by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 on skipping running the official q72 query by default (because it is so ridiculous), especially in CI. However, maybe we should consider running an optimized version where the join order is sensible, which makes it at least 10x faster and uses far less memory. I will file a follow on issue to discuss this.

Sounds good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we should still test with the original q72 as a separate exercise though, because if Spark can run it then Comet should be able to as well (with the same memory configuration).

Yea. As I mentioned earlier, I will investigate q72 further to see why it requires extra memory in Comet. Just disable it to unblock this PR.

Comment on lines +195 to +196
conf.set(CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key, "true")
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we can close #387 we should either change the default for COMET_SHUFFLE_ENFORCE_MODE_ENABLED or remove it completely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a separate PR but we should not close the issue when we merge this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me create another issue for this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #648 for this PR.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @viirya

Removing the debugging flags I added.
@viirya viirya merged commit 3370612 into apache:main Jul 10, 2024
73 checks passed
@viirya
Copy link
Member Author

viirya commented Jul 10, 2024

Merged. Thanks @andygrove

himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* feat: Use unified allocator for execution iterators

* Disable CometTakeOrderedAndProjectExec

* Add comment

* Increase heap memory

* Enable CometTakeOrderedAndProjectExec

* More

* More

* Reduce heap memory

* Run sort merge join TPCDS with -e for debugging

* Add -X flag

* Disable q72 and q72-v2.7

* Update .github/workflows/benchmark.yml
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Arrow Java reports memory leak when Comet shuffle is enabled with AQE coalesce partition
3 participants