Skip to content

Commit

Permalink
address comments and add test suit
Browse files Browse the repository at this point in the history
  • Loading branch information
zjuwangg committed Dec 18, 2024
1 parent b04a82a commit 38dde01
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ object VeloxRuleApi {

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
injector.injectFinal(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
}

private def injectRas(injector: RasInjector): Unit = {
Expand Down Expand Up @@ -179,8 +179,8 @@ object VeloxRuleApi {
.foreach(each => injector.injectPostTransform(c => each(c.session)))
injector.injectPostTransform(c => ColumnarCollapseTransformStages(c.glutenConf))
injector.injectPostTransform(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectPostTransform(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
injector.injectPostTransform(c => GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.{ApplyResourceProfileExec, ColumnarShuffleExchangeExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

class AutoAdjustStageResourceProfileSuite
extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPlanHelper {
protected val rootPath: String = getClass.getResource("/").getPath
override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.auto.adjustStageResource.enabled", "true")
}

override def beforeAll(): Unit = {
super.beforeAll()

spark
.range(100)
.selectExpr("cast(id % 3 as int) as c1", "id as c2")
.write
.format("parquet")
.saveAsTable("tmp1")
spark
.range(100)
.selectExpr("cast(id % 9 as int) as c1")
.write
.format("parquet")
.saveAsTable("tmp2")
spark
.range(100)
.selectExpr("cast(id % 3 as int) as c1", "cast(id % 9 as int) as c2")
.write
.format("parquet")
.saveAsTable("tmp3")
}

override protected def afterAll(): Unit = {
spark.sql("drop table tmp1")
spark.sql("drop table tmp2")
spark.sql("drop table tmp3")

super.afterAll()
}

private def collectColumnarToRow(plan: SparkPlan): Int = {
collect(plan) { case v: VeloxColumnarToRowExec => v }.size
}

private def collectColumnarShuffleExchange(plan: SparkPlan): Int = {
collect(plan) { case c: ColumnarShuffleExchangeExec => c }.size
}

private def collectShuffleExchange(plan: SparkPlan): Int = {
collect(plan) { case c: ShuffleExchangeExec => c }.size
}

private def collectApplyResourceProfileExec(plan: SparkPlan): Int = {
collect(plan) { case c: ApplyResourceProfileExec => c }.size
}

test("stage contains r2c and apply new resource profile") {
withSQLConf(
GlutenConfig.COLUMNAR_SHUFFLE_ENABLED.key -> "false",
GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_THRESHOLD.key -> "1") {
runQueryAndCompare("select c1, count(*) from tmp1 group by c1") {
df =>
val plan = df.queryExecution.executedPlan

assert(collectColumnarShuffleExchange(plan) == 0)
assert(collectShuffleExchange(plan) == 1)

val wholeQueryColumnarToRow = collectColumnarToRow(plan)
assert(wholeQueryColumnarToRow == 2)

val applyResourceProfileExec = collectApplyResourceProfileExec(plan)
assert(applyResourceProfileExec == 1)
// here we can't check the applied resource profile since
// ResourceProfiles are only supported on YARN and Kubernetes
// with dynamic allocation enabled. In testing mode, we apply
// default resource profile to make sure ut works.
}
}
}

test("whole stage fallback") {
withSQLConf(
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_C2R_OR_R2C_THRESHOLD.key -> "1"
) {
runQueryAndCompare(
"select " +
"java_method('java.lang.Integer', 'signum', tmp1.c1), count(*) " +
"from tmp1 group by java_method('java.lang.Integer', 'signum', tmp1.c1)") {
df =>
assert(
collect(df.queryExecution.executedPlan) {
case h: HashAggregateExecTransformer => h
}.size == 2,
df.queryExecution.executedPlan)
assert(collectApplyResourceProfileExec(df.queryExecution.executedPlan) == 2)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan}
import org.apache.gluten.logging.LogLevelUtil

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, ResourceProfileManager, TaskResourceRequest}
Expand All @@ -29,6 +28,7 @@ import org.apache.spark.sql.execution.GlutenAutoAdjustStageResourceProfile.{appl
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.util.SparkTestUtil

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -133,6 +133,11 @@ object GlutenAutoAdjustStageResourceProfile extends Logging {
private def getFinalResourceProfile(
rpManager: ResourceProfileManager,
newRP: ResourceProfile): ResourceProfile = {
// Just for test
// ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation enabled
if (SparkTestUtil.isTesting) {
return rpManager.defaultResourceProfile
}
val maybeEqProfile = rpManager.getEquivalentProfile(newRP)
if (maybeEqProfile.isDefined) {
maybeEqProfile.get
Expand Down

0 comments on commit 38dde01

Please sign in to comment.