Skip to content

Commit

Permalink
[Velox] soft affinity support placing duplicate reading to same execu…
Browse files Browse the repository at this point in the history
…tors (#4407)

[VL] soft affinity support placing duplicate reading to same executors
  • Loading branch information
zhli1142015 authored Feb 23, 2024
1 parent 541cb54 commit f295887
Show file tree
Hide file tree
Showing 12 changed files with 362 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
val (paths, starts, lengths, partitionColumns) =
constructSplitInfo(partitionSchema, f.files)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations())
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
f.index,
paths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import io.glutenproject.utils.SubstraitPlanPrinterUtil

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -305,7 +307,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
substraitPlanLogLevel,
s"$nodeName generating the substrait plan took: $t ms."))

new GlutenWholeStageColumnarRDD(
val rdd = new GlutenWholeStageColumnarRDD(
sparkContext,
inputPartitions,
inputRDDs,
Expand All @@ -318,6 +320,19 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
wsCtx.substraitContext.registeredAggregationParams
)
)
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
(0 until allScanPartitions.head.size).foreach(
i => {
val currentPartitions = allScanPartitions.map(_(i))
currentPartitions.indices.foreach(
i =>
currentPartitions(i) match {
case f: FilePartition =>
SoftAffinity.updateFilePartitionLocations(f, rdd.id)
case _ =>
})
})
rdd
} else {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@ package io.glutenproject.softaffinity

import io.glutenproject.GlutenConfig
import io.glutenproject.softaffinity.strategy.SoftAffinityStrategy
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.LogLevelUtil

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd}
import org.apache.spark.sql.execution.datasources.FilePartition

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection.mutable
import scala.util.Random

abstract class AffinityManager extends LogLevelUtil with Logging {

Expand All @@ -47,6 +54,28 @@ abstract class AffinityManager extends LogLevelUtil with Logging {

lazy val logLevel: String = GlutenConfig.getConf.softAffinityLogLevel

lazy val detectDuplicateReading = true

lazy val maxDuplicateReadingRecords =
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE

// rdd id -> patition id, file path, start, length
val rddPartitionInfoMap = new ConcurrentHashMap[Int, Array[(Int, String, Long, Long)]]()
// stage id -> execution id + rdd ids: job start / execution end
val stageInfoMap = new ConcurrentHashMap[Int, Array[Int]]()
// final result: partition composed key("path1_start_length,path2_start_length") --> array_host
val duplicateReadingInfos: LoadingCache[String, Array[(String, String)]] =
CacheBuilder
.newBuilder()
.maximumSize(maxDuplicateReadingRecords)
.build(new CacheLoader[String, Array[(String, String)]] {
override def load(name: String): Array[(String, String)] = {
Array.empty[(String, String)]
}
})

private val rand = new Random(System.currentTimeMillis)

def totalExecutors(): Int = totalRegisteredExecutors.intValue()

def handleExecutorAdded(execHostId: (String, String)): Unit = {
Expand Down Expand Up @@ -117,6 +146,62 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
}
}

def updateStageMap(event: SparkListenerStageSubmitted): Unit = {
if (!detectDuplicateReading) {
return
}
val info = event.stageInfo
val rddIds = info.rddInfos.map(_.id).toArray
stageInfoMap.put(info.stageId, rddIds)
}

def updateHostMap(event: SparkListenerTaskEnd): Unit = {
if (!detectDuplicateReading) {
return
}
event.reason match {
case org.apache.spark.Success =>
val stageId = event.stageId
val rddInfo = stageInfoMap.get(stageId)
if (rddInfo != null) {
rddInfo.foreach {
rddId =>
val partitions = rddPartitionInfoMap.get(rddId)
if (partitions != null) {
val key = partitions
.filter(p => p._1 == SparkShimLoader.getSparkShims.getPratitionId(event.taskInfo))
.map(pInfo => s"${pInfo._2}_${pInfo._3}_${pInfo._4}")
.sortBy(p => p)
.mkString(",")
val value = Array(((event.taskInfo.executorId, event.taskInfo.host)))
val originalValues = duplicateReadingInfos.get(key)
val values = if (originalValues.contains(value(0))) {
originalValues
} else {
(originalValues ++ value)
}
logOnLevel(logLevel, s"update host for $key: ${values.mkString(",")}")
duplicateReadingInfos.put(key, values)
}
}
}
case _ =>
}
}

def cleanMiddleStatusMap(event: SparkListenerStageCompleted): Unit = {
clearPartitionMap(event.stageInfo.rddInfos.map(_.id))
clearStageMap(event.stageInfo.stageId)
}

def clearPartitionMap(rddIds: Seq[Int]): Unit = {
rddIds.foreach(id => rddPartitionInfoMap.remove(id))
}

def clearStageMap(id: Int): Unit = {
stageInfoMap.remove(id)
}

def checkTargetHosts(hosts: Array[String]): Boolean = {
resourceRWLock.readLock().lock()
try {
Expand Down Expand Up @@ -148,6 +233,54 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
resourceRWLock.readLock().unlock()
}
}

def askExecutors(f: FilePartition): Array[(String, String)] = {
resourceRWLock.readLock().lock()
try {
if (fixedIdForExecutors.size < 1) {
Array.empty
} else {
val result = getDuplicateReadingLocation(f)
result.filter(r => fixedIdForExecutors.exists(s => s.isDefined && s.get._1 == r._1)).toArray
}
} finally {
resourceRWLock.readLock().unlock()
}
}

def getDuplicateReadingLocation(f: FilePartition): Seq[(String, String)] = {
val hosts = mutable.ListBuffer.empty[(String, String)]
val key = f.files
.map(file => s"${file.filePath}_${file.start}_${file.length}")
.sortBy(p => p)
.mkString(",")
val host = duplicateReadingInfos.get(key)
if (!host.isEmpty) {
hosts ++= host
}

if (!hosts.isEmpty) {
rand.shuffle(hosts)
logOnLevel(logLevel, s"get host for $f: ${hosts.distinct.mkString(",")}")
}
hosts.distinct
}

def updatePartitionMap(f: FilePartition, rddId: Int): Unit = {
if (!detectDuplicateReading) {
return
}

val paths =
f.files.map(file => (f.index, file.filePath.toString, file.start, file.length)).toArray
val key = rddId
val values = if (rddPartitionInfoMap.containsKey(key)) {
rddPartitionInfoMap.get(key) ++ paths
} else {
paths
}
rddPartitionInfoMap.put(key, values)
}
}

object SoftAffinityManager extends AffinityManager {
Expand All @@ -160,4 +293,15 @@ object SoftAffinityManager extends AffinityManager {
GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS,
GlutenConfig.GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE
)

override lazy val detectDuplicateReading = SparkEnv.get.conf.getBoolean(
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED,
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE
) &&
SparkShimLoader.getSparkShims.supportDuplicateReadingTracking

override lazy val maxDuplicateReadingRecords = SparkEnv.get.conf.getInt(
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS,
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@ package io.glutenproject.softaffinity.scheduler
import io.glutenproject.softaffinity.SoftAffinityManager

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorRemoved}
import org.apache.spark.scheduler._

class SoftAffinityListener extends SparkListener with Logging {

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
SoftAffinityManager.updateStageMap(event)
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
SoftAffinityManager.cleanMiddleStatusMap(event)
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
SoftAffinityManager.updateHostMap(taskEnd)
}

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
val execId = executorAdded.executorId
val host = executorAdded.executorInfo.executorHost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@
*/
package org.apache.spark.listener

import io.glutenproject.GlutenConfig
import io.glutenproject.softaffinity.scheduler.SoftAffinityListener

import org.apache.spark.SparkContext
import org.apache.spark.rpc.GlutenDriverEndpoint

object GlutenListenerFactory {
def addToSparkListenerBus(sc: SparkContext): Unit = {
sc.listenerBus.addToStatusQueue(
new GlutenSQLAppStatusListener(GlutenDriverEndpoint.glutenDriverEndpointRef))
if (
sc.getConf.getBoolean(
GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE
)
) {
sc.listenerBus.addToStatusQueue(new SoftAffinityListener())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.glutenproject.utils.LogLevelUtil

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.execution.datasources.FilePartition

abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with Logging {

Expand All @@ -44,6 +45,25 @@ abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with
}
}

def getFilePartitionLocations(filePartition: FilePartition): Array[String] = {
val filePaths = filePartition.files.map(_.filePath.toString)
val preferredLocations = filePartition.preferredLocations()
if (shouldUseSoftAffinity(filePaths, preferredLocations)) {
if (manager.detectDuplicateReading) {
val locations = manager.askExecutors(filePartition)
if (locations.nonEmpty) {
locations.map { case (executor, host) => getCacheTaskLocation(host, executor) }
} else {
Array.empty[String]
}
} else {
getFilePartitionLocations(filePaths, preferredLocations)
}
} else {
preferredLocations
}
}

def getLocations(filePath: String)(toTaskLocation: (String, String) => String): Array[String] = {
val locations = manager.askExecutors(filePath)
if (locations.nonEmpty) {
Expand All @@ -59,6 +79,13 @@ abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with
def getCacheTaskLocation(host: String, executor: String): String = {
if (host.isEmpty) executor else ExecutorCacheTaskLocation(host, executor).toString
}

/** Update the RDD id to SoftAffinityManager */
def updateFilePartitionLocations(filePartition: FilePartition, rddId: Int): Unit = {
if (SoftAffinityManager.usingSoftAffinity && SoftAffinityManager.detectDuplicateReading) {
SoftAffinityManager.updatePartitionMap(filePartition, rddId)
}
}
}

object SoftAffinity extends Affinity(SoftAffinityManager) {
Expand Down
Loading

0 comments on commit f295887

Please sign in to comment.