Skip to content

Commit

Permalink
[Core] Move batch size limit to Velox backend (apache#4445)
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma authored Jan 19, 2024
1 parent 7c853c6 commit fea65ee
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ object BackendSettings extends BackendSettingsApi {
val GLUTEN_VELOX_UDF_LIB_PATHS = getBackendConfigPrefix() + ".udfLibraryPaths"
val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = getBackendConfigPrefix() + ".driver.udfLibraryPaths"

val MAXIMUM_BATCH_SIZE: Int = 32768

override def supportFileFormatRead(
format: ReadFileFormat,
fields: Array[StructField],
Expand Down Expand Up @@ -379,6 +381,7 @@ object BackendSettings extends BackendSettingsApi {
override def shuffleSupportedCodec(): Set[String] = SHUFFLE_SUPPORTED_CODEC

override def resolveNativeConf(nativeConf: java.util.Map[String, String]): Unit = {
checkMaxBatchSize(nativeConf)
UDFResolver.resolveUdfConf(nativeConf)
}

Expand Down Expand Up @@ -407,4 +410,15 @@ object BackendSettings extends BackendSettingsApi {
SparkShimLoader.getSparkShims.enableNativeWriteFilesByDefault()
)
}

private def checkMaxBatchSize(nativeConf: java.util.Map[String, String]): Unit = {
if (nativeConf.containsKey(GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY)) {
val maxBatchSize = nativeConf.get(GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY).toInt
if (maxBatchSize > MAXIMUM_BATCH_SIZE) {
throw new IllegalArgumentException(
s"The maximum value of ${GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY}" +
s" is $MAXIMUM_BATCH_SIZE for Velox backend.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](

private var splitResult: CHSplitResult = _

private val nativeBufferSize: Int = GlutenConfig.getConf.shuffleWriterBufferSize

@throws[IOException]
override def internalWrite(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](

protected val blockManager: BlockManager = SparkEnv.get.blockManager

protected val nativeBufferSize: Int =
math.min(GlutenConfig.getConf.shuffleWriterBufferSize, GlutenConfig.getConf.maxBatchSize)
protected val customizedCompressionCodec: String = GlutenShuffleUtils.getCompressionCodec(conf)

protected val bufferCompressThreshold: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](

private var splitResult: SplitResult = _

private lazy val nativeBufferSize = {
val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
val maxBatchSize = GlutenConfig.getConf.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " +
s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).")
maxBatchSize
} else {
bufferSize
}
}

private def availableOffHeapPerTask(): Long = {
val perTask =
SparkMemoryUtil.getCurrentAvailableOffHeapMemory / SparkResourceUtil.getTaskSlots(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,18 @@ class ColumnarShuffleWriter[K, V](
.map(_.getAbsolutePath)
.mkString(",")

private val nativeBufferSize =
math.min(GlutenConfig.getConf.shuffleWriterBufferSize, GlutenConfig.getConf.maxBatchSize)
private lazy val nativeBufferSize = {
val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
val maxBatchSize = GlutenConfig.getConf.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " +
s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).")
maxBatchSize
} else {
bufferSize
}
}

private val nativeMergeBufferSize = GlutenConfig.getConf.maxBatchSize

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,19 +932,15 @@ object GlutenConfig {
buildConf(GLUTEN_MAX_BATCH_SIZE_KEY)
.internal()
.intConf
.checkValue(
v => v > 0 && v <= 32768,
s"$GLUTEN_MAX_BATCH_SIZE_KEY must in the range of [0, 32768].")
.checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.")
.createWithDefault(4096)

// if not set, use COLUMNAR_MAX_BATCH_SIZE instead
val SHUFFLE_WRITER_BUFFER_SIZE =
buildConf(GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE)
.internal()
.intConf
.checkValue(
v => v > 0 && v <= 32768,
s"$GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE must in the range of [0, 32768].")
.checkValue(_ > 0, s"$GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE must be positive.")
.createOptional

val COLUMNAR_LIMIT_ENABLED =
Expand Down

0 comments on commit fea65ee

Please sign in to comment.