Skip to content

Commit

Permalink
address comment and remove unnecessary log
Browse files Browse the repository at this point in the history
  • Loading branch information
zjuwangg committed Jan 2, 2025
1 parent 7e20ba0 commit efff600
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.unsafe

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}
import org.apache.spark.unsafe.Platform
Expand All @@ -33,6 +34,7 @@ import org.apache.spark.unsafe.array.LongArray
* all bytesBuffer's length plus together
*/
// scalastyle:off no.finalize
@Experimental
case class UnsafeBytesBufferArray(
arraySize: Int,
bytesBufferLengths: Array[Int],
Expand Down Expand Up @@ -99,7 +101,6 @@ case class UnsafeBytesBufferArray(
return new Array[Byte](0)
}
val bytes = new Array[Byte](bytesBufferLengths(index))
log.warn(s"get bytesBuffer at index $index bytesBuffer length ${bytes.length}")
Platform.copyMemory(
longArray.getBaseObject,
longArray.getBaseOffset + bytesBufferOffset(index),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.gluten.utils.ArrowAbiUtil
import org.apache.gluten.vectorized.{ColumnarBatchSerializerJniWrapper, NativeColumnarToRowJniWrapper}

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -56,6 +57,7 @@ import scala.collection.JavaConverters.asScalaIteratorConverter
* @param mode
* the broadcast mode.
*/
@Experimental
case class UnsafeColumnarBuildSideRelation(
private var output: Seq[Attribute],
private var batches: UnsafeBytesBufferArray,
Expand Down Expand Up @@ -135,7 +137,6 @@ case class UnsafeColumnarBuildSideRelation(
val length = bytesBufferLengths(i)
val tmpBuffer = new Array[Byte](length)
in.read(tmpBuffer)
log.warn(s"readExternal $i--- $length")
batches.putBytesBuffer(i, tmpBuffer)
}
}
Expand Down Expand Up @@ -169,7 +170,9 @@ case class UnsafeColumnarBuildSideRelation(

override def deserialized: Iterator[ColumnarBatch] = {
val runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName, "BuildSideRelation#transform")
Runtimes.contextInstance(
BackendsApiManager.getBackendName,
"UnsafeBuildSideRelation#deserialize")
val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime)
val serializerHandle: Long = {
val allocator = ArrowBufferAllocators.contextInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,15 @@ class VeloxTPCHV1BhjSuite extends VeloxTPCHSuite {
}
}

/** BroadcastBuildSideRelation use onheap. */
class VeloxTPCHV1BhjOnheapSuite extends VeloxTPCHSuite {
override def subType(): String = "v1-bhj-onheap"
/** BroadcastBuildSideRelation use off-heap. */
class VeloxTPCHV1BhjOffheapSuite extends VeloxTPCHSuite {
override def subType(): String = "v1-bhj-off-heap"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.sources.useV1SourceList", "parquet")
.set("spark.sql.autoBroadcastJoinThreshold", "30M")
.set(GlutenConfig.VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP.key, "false")
.set(GlutenConfig.VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP.key, "true")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2264,8 +2264,8 @@ object GlutenConfig {
val VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP =
buildConf("spark.gluten.velox.offHeapBroadcastBuildRelation.enabled")
.internal()
.doc("If enabled, broadcast build relation will use offheap memory. " +
.doc("Experimental: If enabled, broadcast build relation will use offheap memory. " +
"Otherwise, broadcast build relation will use onheap memory.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)
}

0 comments on commit efff600

Please sign in to comment.