From fea65eef9b4859d5a287c8f3dcd8fff2e59bf628 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 19 Jan 2024 09:55:48 +0800 Subject: [PATCH] [Core] Move batch size limit to Velox backend (#4445) --- .../backendsapi/velox/VeloxBackend.scala | 14 ++++++++++++++ .../CHCelebornHashBasedColumnarShuffleWriter.scala | 2 ++ .../CelebornHashBasedColumnarShuffleWriter.scala | 2 -- ...loxCelebornHashBasedColumnarShuffleWriter.scala | 13 +++++++++++++ .../spark/shuffle/ColumnarShuffleWriter.scala | 14 ++++++++++++-- .../main/scala/io/glutenproject/GlutenConfig.scala | 8 ++------ 6 files changed, 43 insertions(+), 10 deletions(-) diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 85ac4a2c5da2..5278277b63d2 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -66,6 +66,8 @@ object BackendSettings extends BackendSettingsApi { val GLUTEN_VELOX_UDF_LIB_PATHS = getBackendConfigPrefix() + ".udfLibraryPaths" val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = getBackendConfigPrefix() + ".driver.udfLibraryPaths" + val MAXIMUM_BATCH_SIZE: Int = 32768 + override def supportFileFormatRead( format: ReadFileFormat, fields: Array[StructField], @@ -379,6 +381,7 @@ object BackendSettings extends BackendSettingsApi { override def shuffleSupportedCodec(): Set[String] = SHUFFLE_SUPPORTED_CODEC override def resolveNativeConf(nativeConf: java.util.Map[String, String]): Unit = { + checkMaxBatchSize(nativeConf) UDFResolver.resolveUdfConf(nativeConf) } @@ -407,4 +410,15 @@ object BackendSettings extends BackendSettingsApi { SparkShimLoader.getSparkShims.enableNativeWriteFilesByDefault() ) } + + private def checkMaxBatchSize(nativeConf: java.util.Map[String, String]): Unit = { + if (nativeConf.containsKey(GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY)) { + val maxBatchSize = nativeConf.get(GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY).toInt + if (maxBatchSize > MAXIMUM_BATCH_SIZE) { + throw new IllegalArgumentException( + s"The maximum value of ${GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY}" + + s" is $MAXIMUM_BATCH_SIZE for Velox backend.") + } + } + } } diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala index 795c50ce47df..46c77ab64361 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala @@ -56,6 +56,8 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V]( private var splitResult: CHSplitResult = _ + private val nativeBufferSize: Int = GlutenConfig.getConf.shuffleWriterBufferSize + @throws[IOException] override def internalWrite(records: Iterator[Product2[K, V]]): Unit = { if (!records.hasNext) { diff --git a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala index eb04cfc63fc4..620b637a4abc 100644 --- a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala @@ -64,8 +64,6 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V]( protected val blockManager: BlockManager = SparkEnv.get.blockManager - protected val nativeBufferSize: Int = - math.min(GlutenConfig.getConf.shuffleWriterBufferSize, GlutenConfig.getConf.maxBatchSize) protected val customizedCompressionCodec: String = GlutenShuffleUtils.getCompressionCodec(conf) protected val bufferCompressThreshold: Int = diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala index c94b5e1b51c8..c007c49e1588 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala @@ -54,6 +54,19 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( private var splitResult: SplitResult = _ + private lazy val nativeBufferSize = { + val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize + val maxBatchSize = GlutenConfig.getConf.maxBatchSize + if (bufferSize > maxBatchSize) { + logInfo( + s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " + + s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).") + maxBatchSize + } else { + bufferSize + } + } + private def availableOffHeapPerTask(): Long = { val perTask = SparkMemoryUtil.getCurrentAvailableOffHeapMemory / SparkResourceUtil.getTaskSlots(conf) diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index b4e5fae3dc4d..8509a2f1af5e 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -61,8 +61,18 @@ class ColumnarShuffleWriter[K, V]( .map(_.getAbsolutePath) .mkString(",") - private val nativeBufferSize = - math.min(GlutenConfig.getConf.shuffleWriterBufferSize, GlutenConfig.getConf.maxBatchSize) + private lazy val nativeBufferSize = { + val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize + val maxBatchSize = GlutenConfig.getConf.maxBatchSize + if (bufferSize > maxBatchSize) { + logInfo( + s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " + + s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).") + maxBatchSize + } else { + bufferSize + } + } private val nativeMergeBufferSize = GlutenConfig.getConf.maxBatchSize diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index b74acaca34a7..eca815b2af58 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -932,9 +932,7 @@ object GlutenConfig { buildConf(GLUTEN_MAX_BATCH_SIZE_KEY) .internal() .intConf - .checkValue( - v => v > 0 && v <= 32768, - s"$GLUTEN_MAX_BATCH_SIZE_KEY must in the range of [0, 32768].") + .checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.") .createWithDefault(4096) // if not set, use COLUMNAR_MAX_BATCH_SIZE instead @@ -942,9 +940,7 @@ object GlutenConfig { buildConf(GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE) .internal() .intConf - .checkValue( - v => v > 0 && v <= 32768, - s"$GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE must in the range of [0, 32768].") + .checkValue(_ > 0, s"$GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE must be positive.") .createOptional val COLUMNAR_LIMIT_ENABLED =