Subject: [PATCH] Introduce GlutenAutoAdjustStageResourceProfile to auto adjust
 stage resource profile

 .../org/apache/gluten/GlutenConfig.scala      |  34 ++++
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
     injector.injectFinal(_ => RemoveFallbackTagRule())
+    injector.injectFinal(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
     injector.injectPostTransform(c => RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, c.session))
     injector.injectPostTransform(_ => RemoveFallbackTagRule())
+    injector.injectPostTransform(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
+package org.apache.spark.sql.execution
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan}
+import org.apache.gluten.logging.LogLevelUtil
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, ResourceProfileManager, TaskResourceRequest}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.GlutenAutoAdjustStageResourceProfile.{applyNewResourceProfileIfPossible, collectStagePlan}
+import org.apache.spark.sql.execution.adaptive.QueryStageExec
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+ * This rule is used to dynamic adjust stage resource profile for following purposes:
+ *   1. swap offheap and onheap memory size when whole stage fallback happened 2. increase executor
+ *      heap memory if stage contains gluten operator and spark operator at the same time. Note: we
+ *      don't support set resource profile for final stage now. Todo: support set resource profile
+ *      for final stage.
+ */
+case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: SparkSession)
+  extends Rule[SparkPlan]
+  with LogLevelUtil {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!glutenConf.enableAutoAdjustStageResourceProfile) {
+      return plan
+    }
+    if (!plan.isInstanceOf[Exchange]) {
+      // todo: support set resource profile for final stage
+      return plan
+    }
+    val planNodes = collectStagePlan(plan)
+    if (planNodes.isEmpty) {
+      return plan
+    }
+"detailPlanNodes ${"Array(", ", ", ")")}")
+    // one stage is considered as fallback if all node is not GlutenPlan
+    // or all GlutenPlan node is C2R node.
+    val wholeStageFallback = planNodes
+      .filter(_.isInstanceOf[GlutenPlan])
+      .count(!_.isInstanceOf[ColumnarToRowExecBase]) == 0
+    val rpManager = spark.sparkContext.resourceProfileManager
+    val defaultRP = rpManager.defaultResourceProfile
+    // initial resource profile config as default resource profile
+    val taskResource = mutable.Map.empty[String, TaskResourceRequest] ++= defaultRP.taskResources
+    val executorResource =
+      mutable.Map.empty[String, ExecutorResourceRequest] ++= defaultRP.executorResources
+    val memoryRequest = executorResource.get(ResourceProfile.MEMORY)
+    val offheapRequest = executorResource.get(ResourceProfile.OFFHEAP_MEM)
+    logInfo(s"default memory request $memoryRequest")
+    logInfo(s"default offheap request $offheapRequest")
+    // case 1: whole stage fallback to vanilla spark in such case we swap the heap
+    // and offheap amount.
+    if (wholeStageFallback) {
+      val newMemoryAmount = offheapRequest.get.amount
+      val newOffheapAmount = memoryRequest.get.amount
+      val newExecutorMemory =
+        new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount)
+      val newExecutorOffheap =
+        new ExecutorResourceRequest(ResourceProfile.OFFHEAP_MEM, newOffheapAmount)
+      executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
+      executorResource.put(ResourceProfile.OFFHEAP_MEM, newExecutorOffheap)
+      val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap)
+      return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
+    }
+    // case 1: check whether fallback exists and decide whether increase heap memory
+    val c2RorR2CCnt = planNodes.count(
+      p => p.isInstanceOf[ColumnarToRowTransition] || p.isInstanceOf[RowToColumnarTransition])
+    if (c2RorR2CCnt >= glutenConf.autoAdjustStageC2RorR2CThreshold) {
+      val newMemoryAmount = memoryRequest.get.amount * glutenConf.autoAdjustStageRPHeapRatio;
+      val newExecutorMemory =
+        new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount.toLong)
+      executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
+      val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap)
+      return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
+    }
+    plan
+  }
+object GlutenAutoAdjustStageResourceProfile extends Logging {
+  // collect all plan nodes belong to this stage including child query stage
+  // but exclude query stage child
+  def collectStagePlan(plan: SparkPlan): ArrayBuffer[SparkPlan] = {
+    def collectStagePlan(plan: SparkPlan, planNodes: ArrayBuffer[SparkPlan]): Unit = {
+      if (plan.isInstanceOf[DataWritingCommandExec] || plan.isInstanceOf[ExecutedCommandExec]) {
+        // todo: support set final stage's resource profile
+        return
+      }
+      planNodes += plan
+      if (plan.isInstanceOf[QueryStageExec]) {
+        return
+      }
+      plan.children.foreach(collectStagePlan(_, planNodes))
+    }
+    val planNodes = new ArrayBuffer[SparkPlan]()
+    collectStagePlan(plan, planNodes)
+    planNodes
+  }
+  private def getFinalResourceProfile(
+      rpManager: ResourceProfileManager,
+      newRP: ResourceProfile): ResourceProfile = {
+    val maybeEqProfile = rpManager.getEquivalentProfile(newRP)
+    if (maybeEqProfile.isDefined) {
+      maybeEqProfile.get
+    } else {
+      // register new resource profile here
+      rpManager.addResourceProfile(newRP)
+      newRP
+    }
+  }
+  def applyNewResourceProfileIfPossible(
+      plan: SparkPlan,
+      rp: ResourceProfile,
+      rpManager: ResourceProfileManager): SparkPlan = {
+    val finalRP = getFinalResourceProfile(rpManager, rp)
+    if (plan.isInstanceOf[Exchange]) {
+      // Wrap the plan with ApplyResourceProfileExec so that we can apply new ResourceProfile
+      val wrapperPlan = ApplyResourceProfileExec(plan.children.head, finalRP)
+      logInfo(s"Apply resource profile $finalRP for plan ${wrapperPlan.nodeName}")
+      plan.withNewChildren(IndexedSeq(wrapperPlan))
+    } else {
+      logInfo(s"Ignore apply resource profile for plan ${plan.nodeName}")
+      // todo: support set InsertInto stage's resource profile
+      plan
+    }
+  }
   def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
   def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)
+  def enableAutoAdjustStageResourceProfile: Boolean =
+    conf.getConf(AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED) && conf.adaptiveExecutionEnabled
+  def autoAdjustStageRPHeapRatio: Double = {
+  }
+  def autoAdjustStageC2RorR2CThreshold: Int = {
+  }
 object GlutenConfig {
@@ -2250,4 +2262,26 @@ object GlutenConfig {
       .doc("If enabled, gluten will convert the viewfs path to hdfs path in scala side")
+    buildStaticConf("")
+      .internal()
+      .doc("If enabled, gluten will try to set the stage resource according " +
+        "to stage execution plan. Only worked when aqe is enabled at the same time!!")
+      .booleanConf
+      .createWithDefault(false)
+    buildConf("")
+      .internal()
+      .doc("Increase executor heap memory when match adjust stage resource rule.")
+      .doubleConf
+      .createWithDefault(2.0d)
+    buildConf("")
+      .internal()
+      .doc("Increase executor heap memory when match c2r and r2c exceeds the threshold.")
+      .intConf
+      .createWithDefault(4)