Skip to content

Commit

Permalink
[GLUTEN-6995][Core] Limit soft affinity duplicate reading detection m…
Browse files Browse the repository at this point in the history
…ax cache items (#7003)

* [Core] Limit soft affinity duplicate reading detection max cache items

* disable duplicate_reading by default
  • Loading branch information
zhli1142015 authored Aug 28, 2024
1 parent 01b334b commit edaf88a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ object CHUTSoftAffinityManager extends AffinityManager {

override lazy val detectDuplicateReading = true

override lazy val maxDuplicateReadingRecords =
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
override lazy val duplicateReadingMaxCacheItems =
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.datasources.FilePartition

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock

Expand Down Expand Up @@ -56,18 +55,34 @@ abstract class AffinityManager extends LogLevelUtil with Logging {

lazy val detectDuplicateReading = true

lazy val maxDuplicateReadingRecords =
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
lazy val duplicateReadingMaxCacheItems =
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE

// rdd id -> patition id, file path, start, length
val rddPartitionInfoMap = new ConcurrentHashMap[Int, Array[(Int, String, Long, Long)]]()
val rddPartitionInfoMap: LoadingCache[Integer, Array[(Int, String, Long, Long)]] =
CacheBuilder
.newBuilder()
.maximumSize(duplicateReadingMaxCacheItems)
.build(new CacheLoader[Integer, Array[(Int, String, Long, Long)]] {
override def load(id: Integer): Array[(Int, String, Long, Long)] = {
Array.empty[(Int, String, Long, Long)]
}
})
// stage id -> execution id + rdd ids: job start / execution end
val stageInfoMap = new ConcurrentHashMap[Int, Array[Int]]()
val stageInfoMap: LoadingCache[Integer, Array[Int]] =
CacheBuilder
.newBuilder()
.maximumSize(duplicateReadingMaxCacheItems)
.build(new CacheLoader[Integer, Array[Int]] {
override def load(id: Integer): Array[Int] = {
Array.empty[Int]
}
})
// final result: partition composed key("path1_start_length,path2_start_length") --> array_host
val duplicateReadingInfos: LoadingCache[String, Array[(String, String)]] =
CacheBuilder
.newBuilder()
.maximumSize(maxDuplicateReadingRecords)
.maximumSize(duplicateReadingMaxCacheItems)
.build(new CacheLoader[String, Array[(String, String)]] {
override def load(name: String): Array[(String, String)] = {
Array.empty[(String, String)]
Expand Down Expand Up @@ -162,11 +177,11 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
event.reason match {
case org.apache.spark.Success =>
val stageId = event.stageId
val rddInfo = stageInfoMap.get(stageId)
val rddInfo = stageInfoMap.getIfPresent(stageId)
if (rddInfo != null) {
rddInfo.foreach {
rddId =>
val partitions = rddPartitionInfoMap.get(rddId)
val partitions = rddPartitionInfoMap.getIfPresent(rddId)
if (partitions != null) {
val key = partitions
.filter(p => p._1 == SparkShimLoader.getSparkShims.getPartitionId(event.taskInfo))
Expand Down Expand Up @@ -195,11 +210,11 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
}

def clearPartitionMap(rddIds: Seq[Int]): Unit = {
rddIds.foreach(id => rddPartitionInfoMap.remove(id))
rddIds.foreach(id => rddPartitionInfoMap.invalidate(id))
}

def clearStageMap(id: Int): Unit = {
stageInfoMap.remove(id)
stageInfoMap.invalidate(id)
}

def checkTargetHosts(hosts: Array[String]): Boolean = {
Expand Down Expand Up @@ -274,8 +289,9 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
val paths =
f.files.map(file => (f.index, file.filePath.toString, file.start, file.length)).toArray
val key = rddId
val values = if (rddPartitionInfoMap.containsKey(key)) {
rddPartitionInfoMap.get(key) ++ paths
var values = rddPartitionInfoMap.getIfPresent(key)
values = if (values != null) {
values ++ paths
} else {
paths
}
Expand All @@ -300,8 +316,8 @@ object SoftAffinityManager extends AffinityManager {
) &&
SparkShimLoader.getSparkShims.supportDuplicateReadingTracking

override lazy val maxDuplicateReadingRecords = SparkEnv.get.conf.getInt(
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS,
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
override lazy val duplicateReadingMaxCacheItems = SparkEnv.get.conf.getInt(
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS,
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.softaffinity

import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.GlutenPartition
import org.apache.gluten.softaffinity.SoftAffinityManager
import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanBuilder
Expand All @@ -33,6 +33,16 @@ import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.storage.{RDDInfo, StorageLevel}

object FakeSoftAffinityManager extends AffinityManager {
override lazy val usingSoftAffinity: Boolean = true

override lazy val minOnTargetHosts: Int = 1

override lazy val detectDuplicateReading = true

override lazy val duplicateReadingMaxCacheItems = 1
}

class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession with PredicateHelper {

override protected def sparkConf: SparkConf = super.sparkConf
Expand Down Expand Up @@ -110,4 +120,32 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession wit
assert(SoftAffinityManager.askExecutors(filePartition).isEmpty)
}
}

test("Duplicate reading detection limits middle states count") {
// This test simulate the case listener bus stucks. We need to make sure the middle states
// count would not exceed the configed threshold.
if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) {
val files = Seq(
SparkShimLoader.getSparkShims.generatePartitionedFile(
InternalRow.empty,
"fakePath0",
0,
100,
Array("host-3")),
SparkShimLoader.getSparkShims.generatePartitionedFile(
InternalRow.empty,
"fakePath0",
100,
200,
Array("host-3"))
).toArray
val filePartition = FilePartition(-1, files)
FakeSoftAffinityManager.updatePartitionMap(filePartition, 1)
assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1)
val filePartition1 = FilePartition(-1, files)
FakeSoftAffinityManager.updatePartitionMap(filePartition1, 2)
assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1)
assert(FakeSoftAffinityManager.stageInfoMap.size <= 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -557,11 +557,11 @@ object GlutenConfig {
// Enable Soft Affinity duplicate reading detection, defalut value is true
val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED =
"spark.gluten.soft-affinity.duplicateReadingDetect.enabled"
val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE = true
val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE = false
// Enable Soft Affinity duplicate reading detection, defalut value is 10000
val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS =
"spark.gluten.soft-affinity.maxDuplicateReading.records"
val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE = 10000
val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS =
"spark.gluten.soft-affinity.duplicateReading.maxCacheItems"
val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE = 10000

// Pass through to native conf
val GLUTEN_SAVE_DIR = "spark.gluten.saveDir"
Expand Down

0 comments on commit edaf88a

Please sign in to comment.