Skip to content

Commit

Permalink
[VL][1.2] Port 6432 6657 for Celeborn bug fix in branch 1.2 (apache#7922
Browse files Browse the repository at this point in the history
)

* [CELEBORN] CHCelebornColumnarShuffleWriter supports celeborn.client.spark.shuffle.writer to use memory sort shuffle in ClickHouse backend (apache#6432)

* [GLUTEN-6656][CELEBORN] Fix CelebornColumnarShuffleWriter assertion failed (apache#6657)

---------

Co-authored-by: Nicholas Jiang <[email protected]>
Co-authored-by: exmy <[email protected]>
  • Loading branch information
3 people authored Nov 18, 2024
1 parent 73b0fb6 commit 40685c3
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode

import java.io.IOException
import java.util.Locale
Expand All @@ -55,68 +56,29 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](

private var splitResult: CHSplitResult = _

private val nativeBufferSize: Int = GlutenConfig.getConf.shuffleWriterBufferSize

@throws[IOException]
override def internalWrite(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
handleEmptyIterator()
return
}

if (nativeShuffleWriter == -1L) {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
shuffleId,
mapId,
nativeBufferSize,
customizedCompressCodec,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarThrowIfMemoryExceed,
GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict,
GlutenConfig.getConf.chColumnarForceExternalSortShuffle,
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
)
CHNativeMemoryAllocators.createSpillable(
"CelebornShuffleWriter",
new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException(
"Fatal: spill() called before a celeborn shuffle writer " +
"is created. This behavior should be" +
"optimized by moving memory " +
"allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
val spilled = jniWrapper.evict(nativeShuffleWriter)
logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}
}
)
}
while (records.hasNext) {
val cb = records.next()._2.asInstanceOf[ColumnarBatch]
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
} else {
initShuffleWriter()
val col = cb.column(0).asInstanceOf[CHColumnVector]
val block = col.getBlockAddress
jniWrapper
.split(nativeShuffleWriter, block)
jniWrapper.split(nativeShuffleWriter, col.getBlockAddress)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
// This metric is important, AQE use it to decide if EliminateLimit
writeMetrics.incRecordsWritten(cb.numRows())
}
}

// If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1
if (nativeShuffleWriter == -1L) {
handleEmptyIterator()
return
}

splitResult = jniWrapper.stop(nativeShuffleWriter)

dep.metrics("splitTime").add(splitResult.getSplitTime)
Expand All @@ -135,6 +97,43 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
mapStatus = MapStatus(blockManager.shuffleServerId, splitResult.getRawPartitionLengths, mapId)
}

override def createShuffleWriter(columnarBatch: ColumnarBatch): Unit = {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
shuffleId,
mapId,
nativeBufferSize,
customizedCompressCodec,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarThrowIfMemoryExceed,
GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict,
GlutenConfig.getConf.chColumnarForceExternalSortShuffle,
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
)
CHNativeMemoryAllocators.createSpillable(
"CelebornShuffleWriter",
new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException(
"Fatal: spill() called before a celeborn shuffle writer is created. " +
"This behavior should be optimized by moving memory allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
val spilled = jniWrapper.evict(nativeShuffleWriter)
logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}
}
)
}

override def closeShuffleWriter(): Unit = {
jniWrapper.close(nativeShuffleWriter)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SHUFFLE_COMPRESS
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.BlockManager

import org.apache.celeborn.client.ShuffleClient
Expand Down Expand Up @@ -52,12 +53,23 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](

protected val mapId: Int = context.partitionId()

protected lazy val nativeBufferSize: Int = {
val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
val maxBatchSize = GlutenConfig.getConf.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " +
s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).")
maxBatchSize
} else {
bufferSize
}
}

protected val clientPushBufferMaxSize: Int = celebornConf.clientPushBufferMaxSize

protected val clientPushSortMemoryThreshold: Long = celebornConf.clientPushSortMemoryThreshold

protected val clientSortMemoryMaxSize: Long = celebornConf.clientPushSortMemoryThreshold

protected val shuffleWriterType: String =
celebornConf.shuffleWriterMode.name.toLowerCase(Locale.ROOT)

Expand Down Expand Up @@ -96,6 +108,10 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](

@throws[IOException]
final override def write(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
handleEmptyIterator()
return
}
internalWrite(records)
}

Expand All @@ -122,10 +138,18 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
}
}

def createShuffleWriter(columnarBatch: ColumnarBatch): Unit = {}

def closeShuffleWriter(): Unit = {}

def getPartitionLengths: Array[Long] = partitionLengths

def initShuffleWriter(columnarBatch: ColumnarBatch): Unit = {
if (nativeShuffleWriter == -1L) {
createShuffleWriter(columnarBatch)
}
}

def pushMergedDataToCeleborn(): Unit = {
val pushMergedDataTime = System.nanoTime
client.prepareForMergeData(shuffleId, mapId, context.attemptNumber())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](

private var splitResult: SplitResult = _

private lazy val nativeBufferSize = {
val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
val maxBatchSize = GlutenConfig.getConf.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " +
s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).")
maxBatchSize
} else {
bufferSize
}
}

private val memoryLimit: Long = if ("sort".equals(shuffleWriterType)) {
Math.min(clientSortMemoryMaxSize, clientPushBufferMaxSize * numPartitions)
} else {
availableOffHeapPerTask()
}

private def availableOffHeapPerTask(): Long = {
val perTask =
SparkMemoryUtil.getCurrentAvailableOffHeapMemory / SparkResourceUtil.getTaskSlots(conf)
Expand All @@ -82,49 +63,13 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](

@throws[IOException]
override def internalWrite(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
handleEmptyIterator()
return
}

while (records.hasNext) {
val cb = records.next()._2.asInstanceOf[ColumnarBatch]
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
} else {
initShuffleWriter(cb)
val handle = ColumnarBatches.getNativeHandle(cb)
if (nativeShuffleWriter == -1L) {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
nativeBufferSize,
customizedCompressionCodec,
compressionLevel,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
clientPushBufferMaxSize,
clientPushSortMemoryThreshold,
celebornPartitionPusher,
handle,
context.taskAttemptId(),
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId),
"celeborn",
shuffleWriterType,
GlutenConfig.getConf.columnarShuffleReallocThreshold
)
runtime.addSpiller(new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
// fixme pass true when being called by self
val pushed =
jniWrapper.nativeEvict(nativeShuffleWriter, size, false)
logInfo(s"Gluten shuffle writer: Pushed $pushed / $size bytes of data")
pushed
}
})
}
val startTime = System.nanoTime()
jniWrapper.write(nativeShuffleWriter, cb.numRows, handle, availableOffHeapPerTask())
dep.metrics("splitTime").add(System.nanoTime() - startTime)
Expand All @@ -135,8 +80,13 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
}
}

// If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1
if (nativeShuffleWriter == -1L) {
handleEmptyIterator()
return
}

val startTime = System.nanoTime()
assert(nativeShuffleWriter != -1L)
splitResult = jniWrapper.stop(nativeShuffleWriter)

dep
Expand All @@ -155,6 +105,38 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

override def createShuffleWriter(columnarBatch: ColumnarBatch): Unit = {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
nativeBufferSize,
customizedCompressionCodec,
compressionLevel,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
clientPushBufferMaxSize,
clientPushSortMemoryThreshold,
celebornPartitionPusher,
ColumnarBatches.getNativeHandle(columnarBatch),
context.taskAttemptId(),
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId),
"celeborn",
shuffleWriterType,
GlutenConfig.getConf.columnarShuffleReallocThreshold
)
runtime.addSpiller(new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
// fixme pass true when being called by self
val pushed = jniWrapper.nativeEvict(nativeShuffleWriter, size, false)
logInfo(s"Gluten shuffle writer: Pushed $pushed / $size bytes of data")
pushed
}
})
}

override def closeShuffleWriter(): Unit = {
jniWrapper.close(nativeShuffleWriter)
}
Expand Down

0 comments on commit 40685c3

Please sign in to comment.