diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 78f509c670839..5e0eaa478547c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1603,7 +1603,7 @@ class SparkContext(config: SparkConf) extends Logging { /** * Get the max number of tasks that can be concurrent launched based on the ResourceProfile - * being used. + * could be used, even if some of them are being used at the moment. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala index 2274e6898adf6..043c6b90384b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala @@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed { val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER = "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " + "more slots than the total number of slots in the cluster currently. Please init a new " + - "cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " + - "slots required to run this barrier stage." + "cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce " + + "the number of slots required to run this barrier stage." } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6b376cdadc66b..7641948ed4b30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -480,10 +480,12 @@ private[spark] class DAGScheduler( * submission. */ private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = { - val numPartitions = rdd.getNumPartitions - val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp) - if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) { - throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + if (rdd.isBarrier()) { + val numPartitions = rdd.getNumPartitions + val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp) + if (numPartitions > maxNumConcurrentTasks) { + throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index fd04db8c09d76..508c6cebd9fe3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -36,4 +36,5 @@ private[spark] class ExecutorResourceInfo( override protected def resourceName = this.name override protected def resourceAddresses = this.addresses override protected def slotsPerAddress: Int = numParts + def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index a5bba645be14c..a566d0a04387c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -83,7 +83,7 @@ private[spark] trait SchedulerBackend { /** * Get the max number of tasks that can be concurrent launched based on the ResourceProfile - * being used. + * could be used, even if some of them are being used at the moment. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2551e497a165a..a0c507e7f893b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -468,51 +468,6 @@ private[spark] class TaskSchedulerImpl( Some(localTaskReqAssign.toMap) } - // Use the resource that the resourceProfile has as the limiting resource to calculate the - // total number of slots available based on the current offers. - private def calculateAvailableSlots( - resourceProfileIds: Array[Int], - availableCpus: Array[Int], - availableResources: Array[Map[String, Buffer[String]]], - taskSet: TaskSetManager): Int = { - val resourceProfile = sc.resourceProfileManager.resourceProfileFromId( - taskSet.taskSet.resourceProfileId) - val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) => - (id == resourceProfile.id) - } - val coresKnown = resourceProfile.isCoresLimitKnown - var limitingResource = resourceProfile.limitingResource(conf) - val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf) - - offersForResourceProfile.map { case (o, index) => - val numTasksPerExecCores = availableCpus(index) / taskCpus - // if limiting resource is empty then we have no other resources, so it has to be CPU - if (limitingResource == ResourceProfile.CPUS || limitingResource.isEmpty) { - numTasksPerExecCores - } else { - val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount) - .getOrElse { - val errorMsg = "limitingResource returns from ResourceProfile " + - s"$resourceProfile doesn't actually contain that task resource!" - taskSet.abort(errorMsg) - throw new SparkException(errorMsg) - } - // available addresses already takes into account if there are fractional - // task resource requests - val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0) - val resourceLimit = (availAddrs / taskLimit).toInt - if (!coresKnown) { - // when executor cores config isn't set, we can't calculate the real limiting resource - // and number of tasks per executor ahead of time, so calculate it now based on what - // is available. - if (numTasksPerExecCores <= resourceLimit) numTasksPerExecCores else resourceLimit - } else { - resourceLimit - } - } - }.sum - } - private def minTaskLocality( l1: Option[TaskLocality], l2: Option[TaskLocality]) : Option[TaskLocality] = { @@ -591,9 +546,14 @@ private[spark] class TaskSchedulerImpl( // we only need to calculate available slots if using barrier scheduling, otherwise the // value is -1 val numBarrierSlotsAvailable = if (taskSet.isBarrier) { - val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources, - taskSet) - slots + val rpId = taskSet.taskSet.resourceProfileId + val availableResourcesAmount = availableResources.map { resourceMap => + // available addresses already takes into account if there are fractional + // task resource requests + resourceMap.map { case (name, addresses) => (name, addresses.length) } + } + calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus, + availableResourcesAmount) } else { -1 } @@ -1166,6 +1126,63 @@ private[spark] object TaskSchedulerImpl { val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key + /** + * Calculate the max available task slots given the `availableCpus` and `availableResources` + * from a collection of ResourceProfiles. And only those ResourceProfiles who has the + * same id with the `rpId` can be used to calculate the task slots. + * + * @param scheduler the TaskSchedulerImpl instance + * @param conf SparkConf used to calculate the limiting resource and get the cpu amount per task + * @param rpId the target ResourceProfile id. Only those ResourceProfiles who has the same id + * with it can be used to calculate the task slots. + * @param availableRPIds an Array of ids of the available ResourceProfiles from the executors. + * @param availableCpus an Array of the amount of available cpus from the executors. + * @param availableResources an Array of the resources map from the executors. In the resource + * map, it maps from the resource name to its amount. + * @return the number of max task slots + */ + def calculateAvailableSlots( + scheduler: TaskSchedulerImpl, + conf: SparkConf, + rpId: Int, + availableRPIds: Array[Int], + availableCpus: Array[Int], + availableResources: Array[Map[String, Int]]): Int = { + val resourceProfile = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + val coresKnown = resourceProfile.isCoresLimitKnown + val (limitingResource, limitedByCpu) = { + val limiting = resourceProfile.limitingResource(conf) + // if limiting resource is empty then we have no other resources, so it has to be CPU + if (limiting == ResourceProfile.CPUS || limiting.isEmpty) { + (ResourceProfile.CPUS, true) + } else { + (limiting, false) + } + } + val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf) + val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get + + availableCpus.zip(availableResources).zip(availableRPIds) + .filter { case (_, id) => id == rpId } + .map { case ((cpu, resources), _) => + val numTasksPerExecCores = cpu / cpusPerTask + if (limitedByCpu) { + numTasksPerExecCores + } else { + val availAddrs = resources.getOrElse(limitingResource, 0) + val resourceLimit = (availAddrs / taskLimit).toInt + // when executor cores config isn't set, we can't calculate the real limiting resource + // and number of tasks per executor ahead of time, so calculate it now based on what + // is available. + if (!coresKnown && numTasksPerExecCores <= resourceLimit) { + numTasksPerExecCores + } else { + resourceLimit + } + } + }.sum + } + /** * Used to balance containers across hosts. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d81a617d0ed7d..200f2d87a8a7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -642,10 +642,28 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } + /** + * Get the max number of tasks that can be concurrent launched based on the ResourceProfile + * could be used, even if some of them are being used at the moment. + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @param rp ResourceProfile which to use to calculate max concurrent tasks. + * @return The max number of tasks that can be concurrent launched currently. + */ override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized { - val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf) - val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id) - executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum + val (rpIds, cpus, resources) = { + executorDataMap + .filter { case (id, _) => isExecutorActive(id) } + .values.toArray.map { executor => + ( + executor.resourceProfileId, + executor.totalCores, + executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) } + ) + }.unzip3 + } + TaskSchedulerImpl.calculateAvailableSlots(scheduler, conf, rp.id, rpIds, cpus, resources) } // this function is for testing only diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala index 435b927068e60..1ba13c2ef1897 100644 --- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -19,9 +19,12 @@ package org.apache.spark import scala.concurrent.duration._ +import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput import org.apache.spark.internal.config._ import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, TASK_GPU_ID, WORKER_GPU_ID} import org.apache.spark.scheduler.BarrierJobAllocationFailed._ +import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed import org.apache.spark.util.ThreadUtils /** @@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext testSubmitJob(sc, rdd, message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) } + + test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should " + + "consider all kinds of resources for the barrier stage") { + withTempDir { dir => + val discoveryScript = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") + + val conf = new SparkConf() + // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU. + .setMaster("local-cluster[1, 2, 1024]") + .setAppName("test-cluster") + .set(WORKER_GPU_ID.amountConf, "1") + .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + .set(EXECUTOR_GPU_ID.amountConf, "1") + .set(TASK_GPU_ID.amountConf, "1") + // disable barrier stage retry to fail the application as soon as possible + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1) + sc = new SparkContext(conf) + TestUtils.waitUntilExecutorsUp(sc, 1, 60000) + + val exception = intercept[BarrierJobSlotsNumberCheckFailed] { + // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU. + // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage + // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total. + sc.parallelize(Range(1, 10), 2) + .barrier() + .mapPartitions { iter => iter } + .collect() + } + assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " + + "mode does not allow run a barrier stage that requires more slots")) + } + } }