Skip to content

Commit

Permalink
[SPARK-32518][CORE] CoarseGrainedSchedulerBackend.maxNumConcurrentTas…
Browse files Browse the repository at this point in the history
…ks should consider all kinds of resources

### What changes were proposed in this pull request?

1.  Make `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` considers all kinds of resources when calculating the max concurrent tasks

2. Refactor `calculateAvailableSlots()` to make it be able to be used for both `CoarseGrainedSchedulerBackend` and `TaskSchedulerImpl`

### Why are the changes needed?

Currently, `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` only considers the CPU for the max concurrent tasks. This can cause the application to hang when a barrier stage requires extra custom resources but the cluster doesn't have enough corresponding resources. Because, without the checking for other custom resources in `maxNumConcurrentTasks`, the barrier stage can be submitted to the `TaskSchedulerImpl`. But the `TaskSchedulerImpl` won't launch tasks for the barrier stage due to the insufficient task slots calculated by `TaskSchedulerImpl.calculateAvailableSlots` (which does check all kinds of resources).

The application hang issue can be reproduced by the added unit test.

### Does this PR introduce _any_ user-facing change?

Yes. In case of a barrier stage requires more custom resources than the cluster has, the application can get hang before this PR but can fail due to insufficient resources at the end after this PR.

### How was this patch tested?

Added a unit test.

Closes apache#29332 from Ngone51/fix-slots.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Ngone51 authored and cloud-fan committed Aug 6, 2020
1 parent 375d348 commit 7f275ee
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 59 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,7 @@ class SparkContext(config: SparkConf) extends Logging {

/**
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
* being used.
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed {
val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
"[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " +
"more slots than the total number of slots in the cluster currently. Please init a new " +
"cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " +
"slots required to run this barrier stage."
"cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce " +
"the number of slots required to run this barrier stage."
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,12 @@ private[spark] class DAGScheduler(
* submission.
*/
private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = {
val numPartitions = rdd.getNumPartitions
val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
if (rdd.isBarrier()) {
val numPartitions = rdd.getNumPartitions
val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
if (numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ private[spark] class ExecutorResourceInfo(
override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = numParts
def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[spark] trait SchedulerBackend {

/**
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
* being used.
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
Expand Down
113 changes: 65 additions & 48 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -468,51 +468,6 @@ private[spark] class TaskSchedulerImpl(
Some(localTaskReqAssign.toMap)
}

// Use the resource that the resourceProfile has as the limiting resource to calculate the
// total number of slots available based on the current offers.
private def calculateAvailableSlots(
resourceProfileIds: Array[Int],
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
taskSet: TaskSetManager): Int = {
val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(
taskSet.taskSet.resourceProfileId)
val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
(id == resourceProfile.id)
}
val coresKnown = resourceProfile.isCoresLimitKnown
var limitingResource = resourceProfile.limitingResource(conf)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf)

offersForResourceProfile.map { case (o, index) =>
val numTasksPerExecCores = availableCpus(index) / taskCpus
// if limiting resource is empty then we have no other resources, so it has to be CPU
if (limitingResource == ResourceProfile.CPUS || limitingResource.isEmpty) {
numTasksPerExecCores
} else {
val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount)
.getOrElse {
val errorMsg = "limitingResource returns from ResourceProfile " +
s"$resourceProfile doesn't actually contain that task resource!"
taskSet.abort(errorMsg)
throw new SparkException(errorMsg)
}
// available addresses already takes into account if there are fractional
// task resource requests
val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0)
val resourceLimit = (availAddrs / taskLimit).toInt
if (!coresKnown) {
// when executor cores config isn't set, we can't calculate the real limiting resource
// and number of tasks per executor ahead of time, so calculate it now based on what
// is available.
if (numTasksPerExecCores <= resourceLimit) numTasksPerExecCores else resourceLimit
} else {
resourceLimit
}
}
}.sum
}

private def minTaskLocality(
l1: Option[TaskLocality],
l2: Option[TaskLocality]) : Option[TaskLocality] = {
Expand Down Expand Up @@ -591,9 +546,14 @@ private[spark] class TaskSchedulerImpl(
// we only need to calculate available slots if using barrier scheduling, otherwise the
// value is -1
val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources,
taskSet)
slots
val rpId = taskSet.taskSet.resourceProfileId
val availableResourcesAmount = availableResources.map { resourceMap =>
// available addresses already takes into account if there are fractional
// task resource requests
resourceMap.map { case (name, addresses) => (name, addresses.length) }
}
calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus,
availableResourcesAmount)
} else {
-1
}
Expand Down Expand Up @@ -1166,6 +1126,63 @@ private[spark] object TaskSchedulerImpl {

val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key

/**
* Calculate the max available task slots given the `availableCpus` and `availableResources`
* from a collection of ResourceProfiles. And only those ResourceProfiles who has the
* same id with the `rpId` can be used to calculate the task slots.
*
* @param scheduler the TaskSchedulerImpl instance
* @param conf SparkConf used to calculate the limiting resource and get the cpu amount per task
* @param rpId the target ResourceProfile id. Only those ResourceProfiles who has the same id
* with it can be used to calculate the task slots.
* @param availableRPIds an Array of ids of the available ResourceProfiles from the executors.
* @param availableCpus an Array of the amount of available cpus from the executors.
* @param availableResources an Array of the resources map from the executors. In the resource
* map, it maps from the resource name to its amount.
* @return the number of max task slots
*/
def calculateAvailableSlots(
scheduler: TaskSchedulerImpl,
conf: SparkConf,
rpId: Int,
availableRPIds: Array[Int],
availableCpus: Array[Int],
availableResources: Array[Map[String, Int]]): Int = {
val resourceProfile = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val coresKnown = resourceProfile.isCoresLimitKnown
val (limitingResource, limitedByCpu) = {
val limiting = resourceProfile.limitingResource(conf)
// if limiting resource is empty then we have no other resources, so it has to be CPU
if (limiting == ResourceProfile.CPUS || limiting.isEmpty) {
(ResourceProfile.CPUS, true)
} else {
(limiting, false)
}
}
val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf)
val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get

availableCpus.zip(availableResources).zip(availableRPIds)
.filter { case (_, id) => id == rpId }
.map { case ((cpu, resources), _) =>
val numTasksPerExecCores = cpu / cpusPerTask
if (limitedByCpu) {
numTasksPerExecCores
} else {
val availAddrs = resources.getOrElse(limitingResource, 0)
val resourceLimit = (availAddrs / taskLimit).toInt
// when executor cores config isn't set, we can't calculate the real limiting resource
// and number of tasks per executor ahead of time, so calculate it now based on what
// is available.
if (!coresKnown && numTasksPerExecCores <= resourceLimit) {
numTasksPerExecCores
} else {
resourceLimit
}
}
}.sum
}

/**
* Used to balance containers across hosts.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,10 +642,28 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

}

/**
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @param rp ResourceProfile which to use to calculate max concurrent tasks.
* @return The max number of tasks that can be concurrent launched currently.
*/
override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum
val (rpIds, cpus, resources) = {
executorDataMap
.filter { case (id, _) => isExecutorActive(id) }
.values.toArray.map { executor =>
(
executor.resourceProfileId,
executor.totalCores,
executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) }
)
}.unzip3
}
TaskSchedulerImpl.calculateAvailableSlots(scheduler, conf, rp.id, rpIds, cpus, resources)
}

// this function is for testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package org.apache.spark

import scala.concurrent.duration._

import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, TASK_GPU_ID, WORKER_GPU_ID}
import org.apache.spark.scheduler.BarrierJobAllocationFailed._
import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
testSubmitJob(sc, rdd,
message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}

test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should " +
"consider all kinds of resources for the barrier stage") {
withTempDir { dir =>
val discoveryScript = createTempScriptWithExpectedOutput(
dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""")

val conf = new SparkConf()
// Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU.
.setMaster("local-cluster[1, 2, 1024]")
.setAppName("test-cluster")
.set(WORKER_GPU_ID.amountConf, "1")
.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
.set(EXECUTOR_GPU_ID.amountConf, "1")
.set(TASK_GPU_ID.amountConf, "1")
// disable barrier stage retry to fail the application as soon as possible
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 1, 60000)

val exception = intercept[BarrierJobSlotsNumberCheckFailed] {
// Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU.
// Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage
// can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total.
sc.parallelize(Range(1, 10), 2)
.barrier()
.mapPartitions { iter => iter }
.collect()
}
assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " +
"mode does not allow run a barrier stage that requires more slots"))
}
}
}

0 comments on commit 7f275ee

Please sign in to comment.