diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala index c7d77e550083..d8bd31d6f4d9 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala @@ -35,6 +35,6 @@ object CHUTSoftAffinityManager extends AffinityManager { override lazy val detectDuplicateReading = true - override lazy val maxDuplicateReadingRecords = - GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE + override lazy val duplicateReadingMaxCacheItems = + GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE } diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala index 278e1b550092..dd82807e3454 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.datasources.FilePartition import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantReadWriteLock @@ -56,18 +55,34 @@ abstract class AffinityManager extends LogLevelUtil with Logging { lazy val detectDuplicateReading = true - lazy val maxDuplicateReadingRecords = - GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE + lazy val duplicateReadingMaxCacheItems = + GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE // rdd id -> patition id, file path, start, length - val rddPartitionInfoMap = new ConcurrentHashMap[Int, Array[(Int, String, Long, Long)]]() + val rddPartitionInfoMap: LoadingCache[Integer, Array[(Int, String, Long, Long)]] = + CacheBuilder + .newBuilder() + .maximumSize(duplicateReadingMaxCacheItems) + .build(new CacheLoader[Integer, Array[(Int, String, Long, Long)]] { + override def load(id: Integer): Array[(Int, String, Long, Long)] = { + Array.empty[(Int, String, Long, Long)] + } + }) // stage id -> execution id + rdd ids: job start / execution end - val stageInfoMap = new ConcurrentHashMap[Int, Array[Int]]() + val stageInfoMap: LoadingCache[Integer, Array[Int]] = + CacheBuilder + .newBuilder() + .maximumSize(duplicateReadingMaxCacheItems) + .build(new CacheLoader[Integer, Array[Int]] { + override def load(id: Integer): Array[Int] = { + Array.empty[Int] + } + }) // final result: partition composed key("path1_start_length,path2_start_length") --> array_host val duplicateReadingInfos: LoadingCache[String, Array[(String, String)]] = CacheBuilder .newBuilder() - .maximumSize(maxDuplicateReadingRecords) + .maximumSize(duplicateReadingMaxCacheItems) .build(new CacheLoader[String, Array[(String, String)]] { override def load(name: String): Array[(String, String)] = { Array.empty[(String, String)] @@ -162,11 +177,11 @@ abstract class AffinityManager extends LogLevelUtil with Logging { event.reason match { case org.apache.spark.Success => val stageId = event.stageId - val rddInfo = stageInfoMap.get(stageId) + val rddInfo = stageInfoMap.getIfPresent(stageId) if (rddInfo != null) { rddInfo.foreach { rddId => - val partitions = rddPartitionInfoMap.get(rddId) + val partitions = rddPartitionInfoMap.getIfPresent(rddId) if (partitions != null) { val key = partitions .filter(p => p._1 == SparkShimLoader.getSparkShims.getPartitionId(event.taskInfo)) @@ -195,11 +210,11 @@ abstract class AffinityManager extends LogLevelUtil with Logging { } def clearPartitionMap(rddIds: Seq[Int]): Unit = { - rddIds.foreach(id => rddPartitionInfoMap.remove(id)) + rddIds.foreach(id => rddPartitionInfoMap.invalidate(id)) } def clearStageMap(id: Int): Unit = { - stageInfoMap.remove(id) + stageInfoMap.invalidate(id) } def checkTargetHosts(hosts: Array[String]): Boolean = { @@ -274,8 +289,9 @@ abstract class AffinityManager extends LogLevelUtil with Logging { val paths = f.files.map(file => (f.index, file.filePath.toString, file.start, file.length)).toArray val key = rddId - val values = if (rddPartitionInfoMap.containsKey(key)) { - rddPartitionInfoMap.get(key) ++ paths + var values = rddPartitionInfoMap.getIfPresent(key) + values = if (values != null) { + values ++ paths } else { paths } @@ -300,8 +316,8 @@ object SoftAffinityManager extends AffinityManager { ) && SparkShimLoader.getSparkShims.supportDuplicateReadingTracking - override lazy val maxDuplicateReadingRecords = SparkEnv.get.conf.getInt( - GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS, - GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE + override lazy val duplicateReadingMaxCacheItems = SparkEnv.get.conf.getInt( + GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS, + GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE ) } diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala index b22eb508001a..2328900da76a 100644 --- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala +++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.softaffinity import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.GlutenPartition -import org.apache.gluten.softaffinity.SoftAffinityManager +import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager} import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanBuilder @@ -33,6 +33,16 @@ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.storage.{RDDInfo, StorageLevel} +object FakeSoftAffinityManager extends AffinityManager { + override lazy val usingSoftAffinity: Boolean = true + + override lazy val minOnTargetHosts: Int = 1 + + override lazy val detectDuplicateReading = true + + override lazy val duplicateReadingMaxCacheItems = 1 +} + class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession with PredicateHelper { override protected def sparkConf: SparkConf = super.sparkConf @@ -110,4 +120,32 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession wit assert(SoftAffinityManager.askExecutors(filePartition).isEmpty) } } + + test("Duplicate reading detection limits middle states count") { + // This test simulate the case listener bus stucks. We need to make sure the middle states + // count would not exceed the configed threshold. + if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) { + val files = Seq( + SparkShimLoader.getSparkShims.generatePartitionedFile( + InternalRow.empty, + "fakePath0", + 0, + 100, + Array("host-3")), + SparkShimLoader.getSparkShims.generatePartitionedFile( + InternalRow.empty, + "fakePath0", + 100, + 200, + Array("host-3")) + ).toArray + val filePartition = FilePartition(-1, files) + FakeSoftAffinityManager.updatePartitionMap(filePartition, 1) + assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1) + val filePartition1 = FilePartition(-1, files) + FakeSoftAffinityManager.updatePartitionMap(filePartition1, 2) + assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1) + assert(FakeSoftAffinityManager.stageInfoMap.size <= 1) + } + } } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 5c032d4b0ef4..27ce1ec36379 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -557,11 +557,11 @@ object GlutenConfig { // Enable Soft Affinity duplicate reading detection, defalut value is true val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED = "spark.gluten.soft-affinity.duplicateReadingDetect.enabled" - val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE = true + val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE = false // Enable Soft Affinity duplicate reading detection, defalut value is 10000 - val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS = - "spark.gluten.soft-affinity.maxDuplicateReading.records" - val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE = 10000 + val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS = + "spark.gluten.soft-affinity.duplicateReading.maxCacheItems" + val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE = 10000 // Pass through to native conf val GLUTEN_SAVE_DIR = "spark.gluten.saveDir"