Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Move ColumnarBuildSideRelation's memory occupation to Spark off-heap #7750

Open
zhztheplayer opened this issue Oct 31, 2024 · 15 comments · May be fixed by #7885 or #7902
Open

[VL] Move ColumnarBuildSideRelation's memory occupation to Spark off-heap #7750

zhztheplayer opened this issue Oct 31, 2024 · 15 comments · May be fixed by #7885 or #7902
Labels
enhancement New feature or request

Comments

@zhztheplayer
Copy link
Member

zhztheplayer commented Oct 31, 2024

So far ColumnarBuildSideRelation is allocated on Spark JVM heap memory.

case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Array[Byte]]) {
  ...
}

It appears that we can replace batches: Array[Array[Byte]] with an off-heap allocated container to move the memory usage to off-heap. There should be a simple solution that doesn't require too much of refactor. (see #8127 (comment) about the edit)

This could avoid some of the heap OOM issues.

@zhztheplayer zhztheplayer added the enhancement New feature or request label Oct 31, 2024
@FelixYBW
Copy link
Contributor

FelixYBW commented Nov 1, 2024

Do you mean we can allocate the batch in Spark's offheap memory? if so it's a good solution.

@zhouyuan
Copy link
Contributor

zhouyuan commented Nov 1, 2024

CC @kecookier @NEUpanning

@zhztheplayer
Copy link
Member Author

zhztheplayer commented Nov 1, 2024

Do you mean we can allocate the batch in Spark's offheap memory? if so it's a good solution.

Correct, I assume it could be straightforward to simply change the allocation to off-heap. It's not likely similar with Gazelle's case which used to be tricky.

@Zand100
Copy link
Contributor

Zand100 commented Nov 6, 2024

Hi, I would like to do this ticket to learn more about Gluten. Could you please point me to where an off-heap allocated container is used in the code? Thank you.

@zhztheplayer
Copy link
Member Author

Thank you in advance for helping @Zand100 .

You can refer to vanilla Spark's code UnsafeHashedRelation where BytesToBytesMap can be allocated from off-heap. Probably we can adopt a similar approach in Gluten.

So I can assign this ticket to you, I suppose?

@FelixYBW
Copy link
Contributor

FelixYBW commented Nov 6, 2024

@Zand100 You may consider to upstream the PR to upstream spark as well. It's another solution about offheap/onheap conflict, that we move all Spark's large memory allocation to offheap once offheap is enabled.

@Zand100
Copy link
Contributor

Zand100 commented Nov 7, 2024

Thank you!

Just to check I'm on the right track, I'm trying to use the BytesToBytesMap instead of Array[Array[Bytes]] in https://github.com/apache/incubator-gluten/blob/main/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala#L39. Is that right?

Yes, you can assign the ticket to me.

@Zand100
Copy link
Contributor

Zand100 commented Nov 10, 2024

Hi, how should I handle constructing a ColumnarBuildSideRelation for the HashedRelationBroadcastMode case?

val fromBroadcast = from.asInstanceOf[Broadcast[HashedRelation]]
val fromRelation = fromBroadcast.value.asReadOnlyCopy()

https://github.com/apache/incubator-gluten/blob/main/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala#L97

If fromBroadcast is already an UnsafeHashedRelation, then it's already using a BytesToBytesMap, which is perfect. But if fromBroadcast is LongHashedRelation or GeneralHashedRelation , then should I convert it to UnsafeHashedRelation?

@Zand100
Copy link
Contributor

Zand100 commented Nov 11, 2024

Could you please review the draft #7885?

@zhztheplayer
Copy link
Member Author

@Zand100

Not necessarily to use BytesToBytesMap in Gluten if it's not for our use case. We can create another type of binary container implementing a Spark MemoryConsumer per our own demand.

@Zand100 Zand100 linked a pull request Nov 12, 2024 that will close this issue
@Zand100
Copy link
Contributor

Zand100 commented Nov 12, 2024

Thank you! Could you please review the draft of the new binary container? #7902

@Zand100
Copy link
Contributor

Zand100 commented Nov 14, 2024

Could you please review #7944 ? Should ColumnarBuildSideRelation still use the object handle (for example https://github.com/apache/incubator-gluten/blob/main/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala#L67) and construct the ColumnarBatch (I think that becomes on-heap again)?

@zjuwangg
Copy link
Contributor

zjuwangg commented Nov 28, 2024

Move ColumnarBuildSideRelation's memory occupation to Spark off-heap

We are very glad to see the discussion here. In our production environment, we have also been troubled by the out-of-memory (OOM) problem caused by the broadcast build relation using the heap memory for a long time. We adopted a similar approach as proposed by @zhztheplayer and made more optimizations (such as dividing large batches into small batches) in our production scenario.
We would like to share our approach and contribute it back in the following weeks.

Current gluten implement

image-3

  • Currently, when the ColumnarBuildSideRelation is broadcasted on the driver, the ColumnarBatch is deserialized and stored in the on-heap memory in the data structure batches: Array[Array[Byte]].
  • On the executor, when the ColumnarBuildSideRelation is constructed, the batches: Array[Array[Byte]] still remains in on-heap memory. In some extreme situations, this will consume a large amount of heap memory. Moreover, the batch in batches: Array[Array[Byte]] needs to be copied to off-heap memory through JNI, which will also be a waste of CPU resources and memory.

Proposed design

It went through two rounds of iterative development in our inner environment.

Round1: using unsafe offheap to store broadcast batches on executor

image-5

  • Introduce BytesArrayInOffheap to store broadcasted data in offheap memory.
  • On executor side, the broadcast data is first placed on off-heap, and during deserialize process one batch is copied/decoded each time.
  • On executor side, the broadcast data is directly read from offheap. We pass one batch in BytesArrayInOffheap memory address and size to underlying deserializer.

Additionally, another problem emerges where a certain batch in batches: Array[Array[Byte]] can be extremely large, which usually leads to out-of-memory (OOM) in off-heap memory. Consequently, we carried out our second round of optimization.

Round2: serialize more small batched to construct batches: Array[Array[Byte]]

  • In BroadcastUtils#serializeStream, support split batches: Iterator[ColumnarBatch] into more small batches

Implement Steps

We will introudce a config in GlutenConfig which controls whether use offheap to store the broadcast data. And when all related code get merged and after all things be done, we can remove the config and make this as default behavior.

Briefly Steps:

  • Introduce BytesArrayInOffheapUnsafeColumnarBuildSideRelation and spark.gluten.sql.BroadcastBuildRelationUseOffheap.enabled in GlutenConfig to Implement what we have done in Step1
    • support direct read from offheap when doing deserialize in native code
  • Add more interface BroadcastUtils#serializeStream and refactor the deserialize/serialize the process
    • support split single large batch into more small batche
  • Make the broadcast using offheap as the default behavior.

cc @WangGuangxin @weiting-chen

@zhztheplayer
Copy link
Member Author

Thank you for the comprehensive design @zjuwangg !. Let's move forward with an initial patch.

Would you like to place the content in a Google doc in which everyone can comment? Thanks. You can send a mail including the Google doc link to [email protected].

@zjuwangg
Copy link
Contributor

zjuwangg commented Dec 3, 2024

Thank you for the comprehensive design @zjuwangg !. Let's move forward with an initial patch.

Would you like to place the content in a Google doc in which everyone can comment? Thanks. You can send a mail including the Google doc link to [email protected].

Initial patch can be founded here #8127, and I also draft a design google doc https://docs.google.com/document/d/1eZNWPUEdiz2JPJfhyVn9hrk6SqJFRNzOMZm6u5Yredk/edit?tab=t.0#heading=h.1wu7kc4pvnqd. Eager to hear your thoughts and opinions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment