Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7750][VL] Move ColumnarBuildSideRelation's memory occupation to Spark off-heap #8127

Merged
merged 3 commits into from
Jan 2, 2025

Conversation

zjuwangg
Copy link
Contributor

@zjuwangg zjuwangg commented Dec 3, 2024

What changes were proposed in this pull request?

Implement feature: #7750

How was this patch tested?

Add unit test and manual run tpcds in my dev box.

@zjuwangg zjuwangg marked this pull request as draft December 3, 2024 02:28
@github-actions github-actions bot added CORE works for Gluten Core VELOX labels Dec 3, 2024
Copy link

github-actions bot commented Dec 3, 2024

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

github-actions bot commented Dec 3, 2024

Run Gluten Clickhouse CI on x86

1 similar comment
Copy link

github-actions bot commented Dec 3, 2024

Run Gluten Clickhouse CI on x86

Copy link

github-actions bot commented Dec 4, 2024

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from 6a0e31f to 17c8b5a Compare December 11, 2024 10:16
Copy link

Run Gluten Clickhouse CI on x86

3 similar comments
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from 640cf4e to d23b542 Compare December 12, 2024 12:07
@zjuwangg zjuwangg marked this pull request as ready for review December 12, 2024 12:07
@zjuwangg zjuwangg changed the title [Draft][Gluten-7750][VL] Move ColumnarBuildSideRelation's memory occupation to Spark off-heap [Gluten-7750][VL] Move ColumnarBuildSideRelation's memory occupation to Spark off-heap Dec 12, 2024
Copy link

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from d23b542 to b3e118f Compare December 12, 2024 13:09
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

2 similar comments
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer self-requested a review December 17, 2024 02:29
@zhztheplayer zhztheplayer changed the title [Gluten-7750][VL] Move ColumnarBuildSideRelation's memory occupation to Spark off-heap [GLUTEN-7750][VL] Move ColumnarBuildSideRelation's memory occupation to Spark off-heap Dec 18, 2024
Copy link

#7750

@zhouyuan
Copy link
Contributor

CC @surnaik

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just roughly went though the PR. Seems in good shape already.

Saw CI is failing BTW.

Comment on lines 127 to 141
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(SparkEnv.get.conf, Long.MaxValue, Long.MaxValue / 2, 1),
0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to explain a bit against this code? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in Broadcast, shared by multiple tasks, so here introduce a new TaskMemoryManager instead of use task-related TaskMemoryManager.
Similar to UnsafeHashRelation in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L389-L410

Copy link
Member

@zhztheplayer zhztheplayer Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. And I need to apologize that my previous comment #7750 (comment) was wrong, because we are apparently facing the similar issue with Gazelle project here again.

I see finalize() is used and I assume it's working in your test, right? And given that invocations on finalize() relies on JVM GC, how do we make sure the off-heap memory can be released to avoid off-heap overhead, even JVM GC is never triggered?

This code (line :127 - :129) looks feasible but the allocation is still not managed by the default memory manager. Do we have a better approach? Otherwise Yarn kill issue will be led.

We can revisit the topic and the code a little bit and hopefully we can come up with an optimal solution. My description in #7750 was indeed a bit misleading, this issue doesn't look to have some kind of "simple" solution so far.

Copy link
Contributor

@yikf yikf Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer Spark uses the GC mechanism of Cleaner to clean up broadcasts. finalize() should also be a feasible solution, but it cannot be actively cleaned up when offheap is insufficient. As for the need for a unified memory manager to manage this memory, we can directly use SparkEnv.get.memoryManager. see https://github.com/apache/incubator-gluten/pull/8349/files#diff-f4bdaf17dd276a8e42e2a1d9b2b865ea8d68c23b05523fae56e900fd78f972e5R151

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit:

Broadcast variables can usually be assumed to be small enough, and the way of finalize() is roughly acceptable (maybe I'm wrong). Later, we can look for a more suitable solution, such as implementing the spill of UnsafeBytesBufferArray?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yikf Thank you for continuing the work. I was on vacation either today so sorry for the late reply.

@zhztheplayer Hi, terry is on vacation recently. What's your opinion on this idea? I can continue to follow up.

I was thinking about using storage memory but the implementation is complicated? If so let's put that idea on hold.

we can directly use SparkEnv.get.memoryManager.

👍 Sounds good to me if it can utilize Spark's off-heap memory, since at least the memory is tracked. Though the release could still be a problem? Because Spark doesn't do System.gc() on off-heap OOM.

If the issue does apply, maybe we can consider one of the following:

Option 1. Hack into Spark to make sure System.gc() is triggered on OOM. (if there's an apporach without modifications on vanilla Spark's code)
Option 2. Set a fixed capacity (e.g., 15% of off-heap memory) for the outstanding task memory manager used in the PR. When this part of memory is run out, trigger System.gc(), then if still unsatisified, throw an OOM.

The above are just based on my assumption. Let me know if any other possible solutions.

Copy link
Contributor

@yikf yikf Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer There is another idea whose feasibility is uncertain.

Spark uses the GC mechanism on the Driver side to clean broadcast variables through ContextCleaner and supports the attachListener. We can fully utilize this mechanism.

Prerequisite,

  1. Implement our own set of endpoints for the off-heap broadcast cleanup.
  2. Setup the endpoint if use off-heap broadcast.
  3. UnsafeColumnarBuildSideRelation record the broadcast id.
  4. Implement ContextCleaner listener.
  5. deserialize the UnsafeColumnarBuildSideRelation at executor side, use a singleton class record broadcast id and the UnsafeColumnarBuildSideRelation (read external).

Then,
When the Driver triggers broadcast clearing, the listener catchs the event and sends the clearing request to the executor side using endpoint. The endpoint of the executor uses broadcastId to find the corresponding relation. Actively penalizes the release method to free offheap memory. don't use finalize().

The idea is similar to cleaning the broadcast block on the executor side.

Not sure if it will work, but if it does, this idea seems to be a final solution that fits the current Spark mechanics.

Copy link
Member

@zhztheplayer zhztheplayer Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the thoughtful insights.

More or less, the idea looks similar to the solution CH backend is using. In case you happend to miss that one, could refer to code starting from here.

Looks like feasible to me though there are 2 limitations coming out from my mind with the solution:

  1. The GC timing of driver-side variable is still uncertain
  2. What if the driver-side variable is GC-ed, while executor-side variable is still in use? (Perhaps, set driver-side variable to null manually, immediately after broadcasted to testify)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2 does not seem to happen, at least until the end of the current stage, functions on the driver side will refer to broadcast variables, not sure if this is the case.

The gc timing on the driver side is also not controllable, but let's revisit the topic, we need to free up offheap memory when the broadcast variable is not in use, but there doesn't seem to be a certain thing on the executor side to recognize that the broadcast is no longer in use, so we need to trigger it on the executor side via gc, which is also somewhat strange in nature... Whether it's actively triggered gc or triggered by the jvm, and active gc is actually not a good solution in my opinion, it affects a lot of things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about using storage memory but the implementation is complicated? If so let's put that idea on hold.
Using storage memory is a bit more complex and Spark broadcast mechanism basic implements is based on Spark's storage memory in nature. We can try this way later to see if things is ok.

Copy link
Contributor

@yikf yikf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, with a few minor comments. I think it has a good shape to merging if CI passed.

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from b34bc8c to 18f1c01 Compare December 23, 2024 09:56
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from 3845bf4 to d05429b Compare December 25, 2024 17:24
Copy link

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from d05429b to 1a3d339 Compare December 25, 2024 17:43
Copy link

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from 1a3d339 to 1811c06 Compare December 25, 2024 17:54
Copy link

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from 1811c06 to e526095 Compare December 27, 2024 06:01
Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
@lwz9103
Copy link
Contributor

lwz9103 commented Dec 27, 2024

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from e526095 to c69940f Compare December 27, 2024 10:23
Copy link

Run Gluten Clickhouse CI on x86

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically only the releasing issue remaining, from my perspective. Perhaps we can find a temporary solution then merge this next week. Thanks everyone.

Copy link

Run Gluten Clickhouse CI on x86

@zjuwangg
Copy link
Contributor Author

Just make the offHeapBroadcastBuildRelation default behavior to false and adress the comments!
Thanks for your review @zhztheplayer @yikf

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like feasible to merge since the feature is marked with experimental flags.

Thank you for working on this @zjuwangg. Also would you like to open an issue ticket in advance to track on the OOM risk as we mentioned? Thanks.

@zjuwangg
Copy link
Contributor Author

zjuwangg commented Jan 1, 2025 via email

Copy link

github-actions bot commented Jan 2, 2025

Run Gluten Clickhouse CI on x86

@zjuwangg zjuwangg force-pushed the m_move_broadcast_2_offheap branch from 1c534ce to efff600 Compare January 2, 2025 10:31
Copy link

github-actions bot commented Jan 2, 2025

Run Gluten Clickhouse CI on x86

Copy link

github-actions bot commented Jan 2, 2025

Run Gluten Clickhouse CI on x86

@zhztheplayer zhztheplayer merged commit dda601b into apache:main Jan 2, 2025
48 checks passed
@zjuwangg zjuwangg deleted the m_move_broadcast_2_offheap branch January 10, 2025 06:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CORE works for Gluten Core VELOX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants