Skip to content

Commit

Permalink
Set taskIdMapsForShuffle only in columnar case
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor committed Jun 11, 2024
1 parent c120a02 commit 15571b8
Showing 1 changed file with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Loggin
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds =
taskIdMapsForShuffle.computeIfAbsent(handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized {
mapTaskIds.add(context.taskAttemptId())
}
handle match {
case columnarShuffleHandle: ColumnarShuffleHandle[K @unchecked, V @unchecked] =>
val mapTaskIds =
taskIdMapsForShuffle.computeIfAbsent(handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized {
mapTaskIds.add(context.taskAttemptId())
}
GlutenShuffleWriterWrapper.genColumnarShuffleWriter(
shuffleBlockResolver,
columnarShuffleHandle,
Expand All @@ -87,17 +87,17 @@ class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Loggin
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val (blocksByAddress, canEnableBatchFetch) = {
GlutenShuffleUtils.getReaderParam(
handle,
startMapIndex,
endMapIndex,
startPartition,
endPartition)
}
val shouldBatchFetch =
canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)
if (handle.isInstanceOf[ColumnarShuffleHandle[_, _]]) {
val (blocksByAddress, canEnableBatchFetch) = {
GlutenShuffleUtils.getReaderParam(
handle,
startMapIndex,
endMapIndex,
startPartition,
endPartition)
}
val shouldBatchFetch =
canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
blocksByAddress,
Expand All @@ -120,18 +120,26 @@ class ColumnarShuffleManager(conf: SparkConf) extends ShuffleManager with Loggin

/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
Option(taskIdMapsForShuffle.remove(shuffleId)).foreach {
mapTaskIds =>
mapTaskIds.iterator.foreach {
mapId => shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
if (taskIdMapsForShuffle.contains(shuffleId)) {
Option(taskIdMapsForShuffle.remove(shuffleId)).foreach {
mapTaskIds =>
mapTaskIds.iterator.foreach {
mapId => shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
} else {
sortShuffleManager.unregisterShuffle(shuffleId)
}
true
}

/** Shut down this ShuffleManager. */
override def stop(): Unit = {
shuffleBlockResolver.stop()
if (!taskIdMapsForShuffle.isEmpty) {
shuffleBlockResolver.stop()
} else {
sortShuffleManager.stop
}
}
}

Expand Down

0 comments on commit 15571b8

Please sign in to comment.