diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 22919538ff4d..d96a1bc2565f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -109,6 +109,7 @@ object VeloxRuleApi { injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.injectFinal(_ => RemoveFallbackTagRule()) + injector.injectFinal(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session)) } private def injectRas(injector: RasInjector): Unit = { @@ -180,5 +181,6 @@ object VeloxRuleApi { injector.injectPostTransform(c => RemoveGlutenTableCacheColumnarToRow(c.session)) injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.injectPostTransform(_ => RemoveFallbackTagRule()) + injector.injectPostTransform(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session)) } } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala new file mode 100644 index 000000000000..afe2a330d119 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan} +import org.apache.gluten.logging.LogLevelUtil + +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, ResourceProfileManager, TaskResourceRequest} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.GlutenAutoAdjustStageResourceProfile.{applyNewResourceProfileIfPossible, collectStagePlan} +import org.apache.spark.sql.execution.adaptive.QueryStageExec +import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.exchange.Exchange + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** + * This rule is used to dynamic adjust stage resource profile for following purposes: + * 1. swap offheap and onheap memory size when whole stage fallback happened 2. increase executor + * heap memory if stage contains gluten operator and spark operator at the same time. Note: we + * don't support set resource profile for final stage now. Todo: support set resource profile + * for final stage. + */ +@Experimental +case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: SparkSession) + extends Rule[SparkPlan] + with LogLevelUtil { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!glutenConf.enableAutoAdjustStageResourceProfile) { + return plan + } + if (!plan.isInstanceOf[Exchange]) { + // todo: support set resource profile for final stage + return plan + } + val planNodes = collectStagePlan(plan) + if (planNodes.isEmpty) { + return plan + } + log.info(s"detailPlanNodes ${planNodes.map(_.nodeName).mkString("Array(", ", ", ")")}") + + // one stage is considered as fallback if all node is not GlutenPlan + // or all GlutenPlan node is C2R node. + val wholeStageFallback = planNodes + .filter(_.isInstanceOf[GlutenPlan]) + .count(!_.isInstanceOf[ColumnarToRowExecBase]) == 0 + + val rpManager = spark.sparkContext.resourceProfileManager + val defaultRP = rpManager.defaultResourceProfile + + // initial resource profile config as default resource profile + val taskResource = mutable.Map.empty[String, TaskResourceRequest] ++= defaultRP.taskResources + val executorResource = + mutable.Map.empty[String, ExecutorResourceRequest] ++= defaultRP.executorResources + val memoryRequest = executorResource.get(ResourceProfile.MEMORY) + val offheapRequest = executorResource.get(ResourceProfile.OFFHEAP_MEM) + logInfo(s"default memory request $memoryRequest") + logInfo(s"default offheap request $offheapRequest") + + // case 1: whole stage fallback to vanilla spark in such case we swap the heap + // and offheap amount. + if (wholeStageFallback) { + val newMemoryAmount = offheapRequest.get.amount + val newOffheapAmount = memoryRequest.get.amount + val newExecutorMemory = + new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount) + val newExecutorOffheap = + new ExecutorResourceRequest(ResourceProfile.OFFHEAP_MEM, newOffheapAmount) + executorResource.put(ResourceProfile.MEMORY, newExecutorMemory) + executorResource.put(ResourceProfile.OFFHEAP_MEM, newExecutorOffheap) + val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap) + return applyNewResourceProfileIfPossible(plan, newRP, rpManager) + } + + // case 1: check whether fallback exists and decide whether increase heap memory + val c2RorR2CCnt = planNodes.count( + p => p.isInstanceOf[ColumnarToRowTransition] || p.isInstanceOf[RowToColumnarTransition]) + + if (c2RorR2CCnt >= glutenConf.autoAdjustStageC2RorR2CThreshold) { + val newMemoryAmount = memoryRequest.get.amount * glutenConf.autoAdjustStageRPHeapRatio; + val newExecutorMemory = + new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount.toLong) + executorResource.put(ResourceProfile.MEMORY, newExecutorMemory) + val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap) + return applyNewResourceProfileIfPossible(plan, newRP, rpManager) + } + plan + } +} + +object GlutenAutoAdjustStageResourceProfile extends Logging { + // collect all plan nodes belong to this stage including child query stage + // but exclude query stage child + def collectStagePlan(plan: SparkPlan): ArrayBuffer[SparkPlan] = { + + def collectStagePlan(plan: SparkPlan, planNodes: ArrayBuffer[SparkPlan]): Unit = { + if (plan.isInstanceOf[DataWritingCommandExec] || plan.isInstanceOf[ExecutedCommandExec]) { + // todo: support set final stage's resource profile + return + } + planNodes += plan + if (plan.isInstanceOf[QueryStageExec]) { + return + } + plan.children.foreach(collectStagePlan(_, planNodes)) + } + + val planNodes = new ArrayBuffer[SparkPlan]() + collectStagePlan(plan, planNodes) + planNodes + } + + private def getFinalResourceProfile( + rpManager: ResourceProfileManager, + newRP: ResourceProfile): ResourceProfile = { + val maybeEqProfile = rpManager.getEquivalentProfile(newRP) + if (maybeEqProfile.isDefined) { + maybeEqProfile.get + } else { + // register new resource profile here + rpManager.addResourceProfile(newRP) + newRP + } + } + + def applyNewResourceProfileIfPossible( + plan: SparkPlan, + rp: ResourceProfile, + rpManager: ResourceProfileManager): SparkPlan = { + val finalRP = getFinalResourceProfile(rpManager, rp) + if (plan.isInstanceOf[Exchange]) { + // Wrap the plan with ApplyResourceProfileExec so that we can apply new ResourceProfile + val wrapperPlan = ApplyResourceProfileExec(plan.children.head, finalRP) + logInfo(s"Apply resource profile $finalRP for plan ${wrapperPlan.nodeName}") + plan.withNewChildren(IndexedSeq(wrapperPlan)) + } else { + logInfo(s"Ignore apply resource profile for plan ${plan.nodeName}") + // todo: support set InsertInto stage's resource profile + plan + } + } +} diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 296153346ef8..4e44f3822537 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -483,6 +483,18 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED) def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED) + + def enableAutoAdjustStageResourceProfile: Boolean = + conf.getConf(AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED) && conf.adaptiveExecutionEnabled + + def autoAdjustStageRPHeapRatio: Double = { + conf.getConf(AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO) + } + + def autoAdjustStageC2RorR2CThreshold: Int = { + conf.getConf(AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_THRESHOLD) + } + } object GlutenConfig { @@ -2250,4 +2262,26 @@ object GlutenConfig { .doc("If enabled, gluten will convert the viewfs path to hdfs path in scala side") .booleanConf .createWithDefault(false) + + val AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED = + buildStaticConf("spark.gluten.auto.adjustStageResource.enabled") + .internal() + .doc("If enabled, gluten will try to set the stage resource according " + + "to stage execution plan. Only worked when aqe is enabled at the same time!!") + .booleanConf + .createWithDefault(false) + + val AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO = + buildConf("spark.gluten.auto.adjustStageResources.heap.ratio") + .internal() + .doc("Increase executor heap memory when match adjust stage resource rule.") + .doubleConf + .createWithDefault(2.0d) + + val AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_THRESHOLD = + buildConf("spark.gluten.auto.adjustStageResources.c2rORr2c.threshold") + .internal() + .doc("Increase executor heap memory when match c2r and r2c exceeds the threshold.") + .intConf + .createWithDefault(4) }