Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8018][CORE] Support adjusting stage resource profile dynamically #8209

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object VeloxRuleApi {
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
injector.injectFinal(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the rule between RemoveGlutenTableCacheColumnarToRow and GlutenFallbackReporter? Thanks.

}

private def injectRas(injector: RasInjector): Unit = {
Expand Down Expand Up @@ -180,5 +181,6 @@ object VeloxRuleApi {
injector.injectPostTransform(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
injector.injectPostTransform(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan}
import org.apache.gluten.logging.LogLevelUtil

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, ResourceProfileManager, TaskResourceRequest}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.GlutenAutoAdjustStageResourceProfile.{applyNewResourceProfileIfPossible, collectStagePlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.exchange.Exchange

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* This rule is used to dynamic adjust stage resource profile for following purposes:
* 1. swap offheap and onheap memory size when whole stage fallback happened 2. increase executor
* heap memory if stage contains gluten operator and spark operator at the same time. Note: we
* don't support set resource profile for final stage now. Todo: support set resource profile
* for final stage.
*/
@Experimental
case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: SparkSession)
extends Rule[SparkPlan]
with LogLevelUtil {

override def apply(plan: SparkPlan): SparkPlan = {
if (!glutenConf.enableAutoAdjustStageResourceProfile) {
return plan
}
if (!plan.isInstanceOf[Exchange]) {
// todo: support set resource profile for final stage
return plan
}
val planNodes = collectStagePlan(plan)
if (planNodes.isEmpty) {
return plan
}
log.info(s"detailPlanNodes ${planNodes.map(_.nodeName).mkString("Array(", ", ", ")")}")

// one stage is considered as fallback if all node is not GlutenPlan
// or all GlutenPlan node is C2R node.
val wholeStageFallback = planNodes
.filter(_.isInstanceOf[GlutenPlan])
.count(!_.isInstanceOf[ColumnarToRowExecBase]) == 0

val rpManager = spark.sparkContext.resourceProfileManager
val defaultRP = rpManager.defaultResourceProfile

// initial resource profile config as default resource profile
val taskResource = mutable.Map.empty[String, TaskResourceRequest] ++= defaultRP.taskResources
val executorResource =
mutable.Map.empty[String, ExecutorResourceRequest] ++= defaultRP.executorResources
val memoryRequest = executorResource.get(ResourceProfile.MEMORY)
val offheapRequest = executorResource.get(ResourceProfile.OFFHEAP_MEM)
logInfo(s"default memory request $memoryRequest")
logInfo(s"default offheap request $offheapRequest")

// case 1: whole stage fallback to vanilla spark in such case we swap the heap
// and offheap amount.
if (wholeStageFallback) {
val newMemoryAmount = offheapRequest.get.amount
val newOffheapAmount = memoryRequest.get.amount
val newExecutorMemory =
new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount)
val newExecutorOffheap =
new ExecutorResourceRequest(ResourceProfile.OFFHEAP_MEM, newOffheapAmount)
executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
executorResource.put(ResourceProfile.OFFHEAP_MEM, newExecutorOffheap)
val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap)
return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
}

// case 1: check whether fallback exists and decide whether increase heap memory
val c2RorR2CCnt = planNodes.count(
p => p.isInstanceOf[ColumnarToRowTransition] || p.isInstanceOf[RowToColumnarTransition])

if (c2RorR2CCnt >= glutenConf.autoAdjustStageC2RorR2CThreshold) {
val newMemoryAmount = memoryRequest.get.amount * glutenConf.autoAdjustStageRPHeapRatio;
val newExecutorMemory =
new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount.toLong)
executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap)
return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
}
plan
}
}

object GlutenAutoAdjustStageResourceProfile extends Logging {
// collect all plan nodes belong to this stage including child query stage
// but exclude query stage child
def collectStagePlan(plan: SparkPlan): ArrayBuffer[SparkPlan] = {

def collectStagePlan(plan: SparkPlan, planNodes: ArrayBuffer[SparkPlan]): Unit = {
if (plan.isInstanceOf[DataWritingCommandExec] || plan.isInstanceOf[ExecutedCommandExec]) {
// todo: support set final stage's resource profile
return
}
planNodes += plan
if (plan.isInstanceOf[QueryStageExec]) {
return
}
plan.children.foreach(collectStagePlan(_, planNodes))
}

val planNodes = new ArrayBuffer[SparkPlan]()
collectStagePlan(plan, planNodes)
planNodes
}

private def getFinalResourceProfile(
rpManager: ResourceProfileManager,
newRP: ResourceProfile): ResourceProfile = {
val maybeEqProfile = rpManager.getEquivalentProfile(newRP)
if (maybeEqProfile.isDefined) {
maybeEqProfile.get
} else {
// register new resource profile here
rpManager.addResourceProfile(newRP)
newRP
}
}

def applyNewResourceProfileIfPossible(
plan: SparkPlan,
rp: ResourceProfile,
rpManager: ResourceProfileManager): SparkPlan = {
val finalRP = getFinalResourceProfile(rpManager, rp)
if (plan.isInstanceOf[Exchange]) {
// Wrap the plan with ApplyResourceProfileExec so that we can apply new ResourceProfile
val wrapperPlan = ApplyResourceProfileExec(plan.children.head, finalRP)
logInfo(s"Apply resource profile $finalRP for plan ${wrapperPlan.nodeName}")
plan.withNewChildren(IndexedSeq(wrapperPlan))
} else {
logInfo(s"Ignore apply resource profile for plan ${plan.nodeName}")
// todo: support set InsertInto stage's resource profile
plan
}
}
}
34 changes: 34 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)

def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)

def enableAutoAdjustStageResourceProfile: Boolean =
conf.getConf(AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED) && conf.adaptiveExecutionEnabled

def autoAdjustStageRPHeapRatio: Double = {
conf.getConf(AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO)
}

def autoAdjustStageC2RorR2CThreshold: Int = {
conf.getConf(AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_THRESHOLD)
}

}

object GlutenConfig {
Expand Down Expand Up @@ -2250,4 +2262,26 @@ object GlutenConfig {
.doc("If enabled, gluten will convert the viewfs path to hdfs path in scala side")
.booleanConf
.createWithDefault(false)

val AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED =
buildStaticConf("spark.gluten.auto.adjustStageResource.enabled")
.internal()
.doc("If enabled, gluten will try to set the stage resource according " +
"to stage execution plan. Only worked when aqe is enabled at the same time!!")
.booleanConf
.createWithDefault(false)

val AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO =
buildConf("spark.gluten.auto.adjustStageResources.heap.ratio")
.internal()
.doc("Increase executor heap memory when match adjust stage resource rule.")
.doubleConf
.createWithDefault(2.0d)

val AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_THRESHOLD =
buildConf("spark.gluten.auto.adjustStageResources.c2rORr2c.threshold")
.internal()
.doc("Increase executor heap memory when match c2r and r2c exceeds the threshold.")
.intConf
.createWithDefault(4)
}
Loading