From 24b02d7c63425722738cedf85cc65186e764deb5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 13 Jun 2024 16:59:35 +0800 Subject: [PATCH 1/4] [VL] More refactors on ColumnarRuleApplier --- .../columnar/ColumnarRuleApplier.scala | 36 +++++++ .../enumerated/EnumeratedApplier.scala | 99 +++++++++---------- .../columnar/heuristic/HeuristicApplier.scala | 42 +++----- 3 files changed, 94 insertions(+), 83 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala index 17bf017305f2..ee5bcd883e7e 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala @@ -16,8 +16,44 @@ */ package org.apache.gluten.extension.columnar +import org.apache.gluten.GlutenConfig +import org.apache.gluten.metrics.GlutenTimeMetric +import org.apache.gluten.utils.LogLevelUtil + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.execution.SparkPlan trait ColumnarRuleApplier { def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan } + +object ColumnarRuleApplier { + class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends RuleExecutor[SparkPlan] { + private val batch: Batch = + Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new LoggedRule(r)): _*) + + // TODO Remove this exclusion then pass Spark's idempotence check. + override protected val excludedOnceBatches: Set[String] = Set(batch.name) + + override protected def batches: Seq[Batch] = List(batch) + } + + private class LoggedRule(delegate: Rule[SparkPlan]) + extends Rule[SparkPlan] + with Logging + with LogLevelUtil { + // Columnar plan change logging added since https://github.com/apache/incubator-gluten/pull/456. + private val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel + override val ruleName: String = delegate.ruleName + + override def apply(plan: SparkPlan): SparkPlan = GlutenTimeMetric.withMillisTime { + logOnLevel( + transformPlanLogLevel, + s"Preparing to apply rule $ruleName on plan:\n${plan.toString}") + val out = delegate.apply(plan) + logOnLevel(transformPlanLogLevel, s"Plan after applied rule $ruleName:\n${plan.toString}") + out + }(t => logOnLevel(transformPlanLogLevel, s"Applying rule $ruleName took $t ms.")) + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index 26201dc1baa3..85990500869a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -22,13 +22,12 @@ import org.apache.gluten.extension.columnar._ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast} import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.columnar.util.AdaptiveContext -import org.apache.gluten.metrics.GlutenTimeMetric import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector} import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter, SparkPlan} import org.apache.spark.util.SparkRuleUtil @@ -47,41 +46,25 @@ class EnumeratedApplier(session: SparkSession) with LogLevelUtil { // An empirical value. private val aqeStackTraceIndex = 16 - - private lazy val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel - private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]() - private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex) override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = PhysicalPlanSelector.maybe(session, plan) { - val transformed = transformPlan(transformRules(outputsColumnar), plan, "transform") + val transformed = transformPlan("transform", transformRules(outputsColumnar), plan) val postPlan = maybeAqe { - transformPlan(postRules(), transformed, "post") + transformPlan("post", postRules(), transformed) } - val finalPlan = transformPlan(finalRules(), postPlan, "final") + val finalPlan = transformPlan("final", finalRules(), postPlan) finalPlan } private def transformPlan( - getRules: List[SparkSession => Rule[SparkPlan]], - plan: SparkPlan, - step: String) = GlutenTimeMetric.withMillisTime { - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}") - val overridden = getRules.foldLeft(plan) { - (p, getRule) => - val rule = getRule(session) - val newPlan = rule(p) - planChangeLogger.logRule(rule.ruleName, p, newPlan) - newPlan - } - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions afterOverriden plan:\n${overridden.toString}") - overridden - }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: $t ms.")) + phase: String, + rules: Seq[Rule[SparkPlan]], + plan: SparkPlan): SparkPlan = { + val executor = new ColumnarRuleApplier.Executor(phase, rules) + executor.execute(plan) + } private def maybeAqe[T](f: => T): T = { adaptiveContext.setAdaptiveContext() @@ -96,55 +79,63 @@ class EnumeratedApplier(session: SparkSession) * Rules to let planner create a suggested Gluten plan being sent to `fallbackPolicies` in which * the plan will be breakdown and decided to be fallen back or not. */ - private def transformRules(outputsColumnar: Boolean): List[SparkSession => Rule[SparkPlan]] = { + private def transformRules(outputsColumnar: Boolean): Seq[Rule[SparkPlan]] = { List( - (_: SparkSession) => RemoveTransitions, - (spark: SparkSession) => FallbackOnANSIMode(spark), - (spark: SparkSession) => PlanOneRowRelation(spark), - (_: SparkSession) => FallbackEmptySchemaRelation(), - (_: SparkSession) => RewriteSubqueryBroadcast() + RemoveTransitions, + FallbackOnANSIMode(session), + PlanOneRowRelation(session), + FallbackEmptySchemaRelation(), + RewriteSubqueryBroadcast() ) ::: - BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules() ::: - List((spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark)) ::: + BackendsApiManager.getSparkPlanExecApiInstance + .genExtendedColumnarValidationRules() + .map(_(session)) ::: + List(MergeTwoPhasesHashBaseAggregate(session)) ::: List( - (session: SparkSession) => EnumeratedTransform(session, outputsColumnar), - (_: SparkSession) => RemoveTransitions + EnumeratedTransform(session, outputsColumnar), + RemoveTransitions ) ::: List( - (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(), - (spark: SparkSession) => RewriteTransformer(spark), - (_: SparkSession) => EnsureLocalSortRequirements, - (_: SparkSession) => CollapseProjectExecTransformer + RemoveNativeWriteFilesSortAndProject(), + RewriteTransformer(session), + EnsureLocalSortRequirements, + CollapseProjectExecTransformer ) ::: - BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules() ::: + BackendsApiManager.getSparkPlanExecApiInstance + .genExtendedColumnarTransformRules() + .map(_(session)) ::: SparkRuleUtil - .extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarTransformRules) ::: - List((_: SparkSession) => InsertTransitions(outputsColumnar)) + .extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarTransformRules) + .map(_(session)) ::: + List(InsertTransitions(outputsColumnar)) } /** * Rules applying to non-fallen-back Gluten plans. To do some post cleanup works on the plan to * make sure it be able to run and be compatible with Spark's execution engine. */ - private def postRules(): List[SparkSession => Rule[SparkPlan]] = - List( - (s: SparkSession) => RemoveTopmostColumnarToRow(s, adaptiveContext.isAdaptiveContext())) ::: - BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() ::: - List((_: SparkSession) => ColumnarCollapseTransformStages(GlutenConfig.getConf)) ::: - SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPostRules) + private def postRules(): Seq[Rule[SparkPlan]] = + List(RemoveTopmostColumnarToRow(session, adaptiveContext.isAdaptiveContext())) ::: + BackendsApiManager.getSparkPlanExecApiInstance + .genExtendedColumnarPostRules() + .map(_(session)) ::: + List(ColumnarCollapseTransformStages(GlutenConfig.getConf)) ::: + SparkRuleUtil + .extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPostRules) + .map(_(session)) /* * Rules consistently applying to all input plans after all other rules have been applied, despite * whether the input plan is fallen back or not. */ - private def finalRules(): List[SparkSession => Rule[SparkPlan]] = { + private def finalRules(): Seq[Rule[SparkPlan]] = { List( // The rule is required despite whether the stage is fallen back or not. Since // ColumnarCachedBatchSerializer is statically registered to Spark without a columnar rule // when columnar table cache is enabled. - (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), - (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), - (_: SparkSession) => RemoveTransformHintRule() + RemoveGlutenTableCacheColumnarToRow(session), + GlutenFallbackReporter(GlutenConfig.getConf, session), + RemoveTransformHintRule() ) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index eb5c561bfa8d..1a36af169a84 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -23,12 +23,11 @@ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTable import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.columnar.util.AdaptiveContext -import org.apache.gluten.metrics.GlutenTimeMetric import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter, SparkPlan} import org.apache.spark.util.SparkRuleUtil @@ -42,14 +41,11 @@ class HeuristicApplier(session: SparkSession) with LogLevelUtil { // This is an empirical value, may need to be changed for supporting other versions of spark. private val aqeStackTraceIndex = 19 - - private lazy val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel - private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]() - private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex) - override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = + override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = { withTransformRules(transformRules(outputsColumnar)).apply(plan) + } // Visible for testing. def withTransformRules(transformRules: List[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] = @@ -57,39 +53,27 @@ class HeuristicApplier(session: SparkSession) PhysicalPlanSelector.maybe(session, plan) { val finalPlan = prepareFallback(plan) { p => - val suggestedPlan = transformPlan(transformRules, p, "transform") - transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match { + val suggestedPlan = transformPlan("transform", transformRules.map(_(session)), p) + transformPlan("fallback", fallbackPolicies().map(_(session)), suggestedPlan) match { case FallbackNode(fallbackPlan) => // we should use vanilla c2r rather than native c2r, // and there should be no `GlutenPlan` any more, // so skip the `postRules()`. fallbackPlan case plan => - transformPlan(postRules(), plan, "post") + transformPlan("post", postRules().map(_(session)), plan) } } - transformPlan(finalRules(), finalPlan, "final") + transformPlan("final", finalRules().map(_(session)), finalPlan) } private def transformPlan( - getRules: List[SparkSession => Rule[SparkPlan]], - plan: SparkPlan, - step: String) = GlutenTimeMetric.withMillisTime { - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions preOverridden plan:\n${plan.toString}") - val overridden = getRules.foldLeft(plan) { - (p, getRule) => - val rule = getRule(session) - val newPlan = rule(p) - planChangeLogger.logRule(rule.ruleName, p, newPlan) - newPlan - } - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions afterOverridden plan:\n${overridden.toString}") - overridden - }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: $t ms.")) + phase: String, + rules: Seq[Rule[SparkPlan]], + plan: SparkPlan): SparkPlan = { + val executor = new ColumnarRuleApplier.Executor(phase, rules) + executor.execute(plan) + } private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = { adaptiveContext.setAdaptiveContext() From 5d71060aa2de3e787ac79f69332319d5737c9966 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Jun 2024 11:04:06 +0800 Subject: [PATCH 2/4] fixup --- .../enumerated/EnumeratedApplier.scala | 69 +++++++++---------- .../columnar/heuristic/HeuristicApplier.scala | 8 +-- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index 85990500869a..d5260f66adba 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -50,11 +50,12 @@ class EnumeratedApplier(session: SparkSession) override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = PhysicalPlanSelector.maybe(session, plan) { - val transformed = transformPlan("transform", transformRules(outputsColumnar), plan) + val transformed = + transformPlan("transform", transformRules(outputsColumnar).map(_(session)), plan) val postPlan = maybeAqe { - transformPlan("post", postRules(), transformed) + transformPlan("post", postRules().map(_(session)), transformed) } - val finalPlan = transformPlan("final", finalRules(), postPlan) + val finalPlan = transformPlan("final", finalRules().map(_(session)), postPlan) finalPlan } @@ -79,63 +80,55 @@ class EnumeratedApplier(session: SparkSession) * Rules to let planner create a suggested Gluten plan being sent to `fallbackPolicies` in which * the plan will be breakdown and decided to be fallen back or not. */ - private def transformRules(outputsColumnar: Boolean): Seq[Rule[SparkPlan]] = { + private def transformRules(outputsColumnar: Boolean): Seq[SparkSession => Rule[SparkPlan]] = { List( - RemoveTransitions, - FallbackOnANSIMode(session), - PlanOneRowRelation(session), - FallbackEmptySchemaRelation(), - RewriteSubqueryBroadcast() + (_: SparkSession) => RemoveTransitions, + (spark: SparkSession) => FallbackOnANSIMode(spark), + (spark: SparkSession) => PlanOneRowRelation(spark), + (_: SparkSession) => FallbackEmptySchemaRelation(), + (_: SparkSession) => RewriteSubqueryBroadcast() ) ::: - BackendsApiManager.getSparkPlanExecApiInstance - .genExtendedColumnarValidationRules() - .map(_(session)) ::: - List(MergeTwoPhasesHashBaseAggregate(session)) ::: + BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules() ::: + List((spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark)) ::: List( - EnumeratedTransform(session, outputsColumnar), - RemoveTransitions + (session: SparkSession) => EnumeratedTransform(session, outputsColumnar), + (_: SparkSession) => RemoveTransitions ) ::: List( - RemoveNativeWriteFilesSortAndProject(), - RewriteTransformer(session), - EnsureLocalSortRequirements, - CollapseProjectExecTransformer + (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(), + (spark: SparkSession) => RewriteTransformer(spark), + (_: SparkSession) => EnsureLocalSortRequirements, + (_: SparkSession) => CollapseProjectExecTransformer ) ::: - BackendsApiManager.getSparkPlanExecApiInstance - .genExtendedColumnarTransformRules() - .map(_(session)) ::: + BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules() ::: SparkRuleUtil - .extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarTransformRules) - .map(_(session)) ::: - List(InsertTransitions(outputsColumnar)) + .extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarTransformRules) ::: + List((_: SparkSession) => InsertTransitions(outputsColumnar)) } /** * Rules applying to non-fallen-back Gluten plans. To do some post cleanup works on the plan to * make sure it be able to run and be compatible with Spark's execution engine. */ - private def postRules(): Seq[Rule[SparkPlan]] = - List(RemoveTopmostColumnarToRow(session, adaptiveContext.isAdaptiveContext())) ::: - BackendsApiManager.getSparkPlanExecApiInstance - .genExtendedColumnarPostRules() - .map(_(session)) ::: - List(ColumnarCollapseTransformStages(GlutenConfig.getConf)) ::: - SparkRuleUtil - .extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPostRules) - .map(_(session)) + private def postRules(): Seq[SparkSession => Rule[SparkPlan]] = + List( + (s: SparkSession) => RemoveTopmostColumnarToRow(s, adaptiveContext.isAdaptiveContext())) ::: + BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() ::: + List((_: SparkSession) => ColumnarCollapseTransformStages(GlutenConfig.getConf)) ::: + SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPostRules) /* * Rules consistently applying to all input plans after all other rules have been applied, despite * whether the input plan is fallen back or not. */ - private def finalRules(): Seq[Rule[SparkPlan]] = { + private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = { List( // The rule is required despite whether the stage is fallen back or not. Since // ColumnarCachedBatchSerializer is statically registered to Spark without a columnar rule // when columnar table cache is enabled. - RemoveGlutenTableCacheColumnarToRow(session), - GlutenFallbackReporter(GlutenConfig.getConf, session), - RemoveTransformHintRule() + (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), + (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), + (_: SparkSession) => RemoveTransformHintRule() ) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index 1a36af169a84..17adf8bbeb4e 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -90,7 +90,7 @@ class HeuristicApplier(session: SparkSession) * Rules to let planner create a suggested Gluten plan being sent to `fallbackPolicies` in which * the plan will be breakdown and decided to be fallen back or not. */ - private def transformRules(outputsColumnar: Boolean): List[SparkSession => Rule[SparkPlan]] = { + private def transformRules(outputsColumnar: Boolean): Seq[SparkSession => Rule[SparkPlan]] = { List( (_: SparkSession) => RemoveTransitions, (spark: SparkSession) => FallbackOnANSIMode(spark), @@ -122,7 +122,7 @@ class HeuristicApplier(session: SparkSession) * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints to make planner fall * back the whole input plan to the original vanilla Spark plan. */ - private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = { + private def fallbackPolicies(): Seq[SparkSession => Rule[SparkPlan]] = { List( (_: SparkSession) => ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(), adaptiveContext.originalPlan())) @@ -132,7 +132,7 @@ class HeuristicApplier(session: SparkSession) * Rules applying to non-fallen-back Gluten plans. To do some post cleanup works on the plan to * make sure it be able to run and be compatible with Spark's execution engine. */ - private def postRules(): List[SparkSession => Rule[SparkPlan]] = + private def postRules(): Seq[SparkSession => Rule[SparkPlan]] = List( (s: SparkSession) => RemoveTopmostColumnarToRow(s, adaptiveContext.isAdaptiveContext())) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() ::: @@ -143,7 +143,7 @@ class HeuristicApplier(session: SparkSession) * Rules consistently applying to all input plans after all other rules have been applied, despite * whether the input plan is fallen back or not. */ - private def finalRules(): List[SparkSession => Rule[SparkPlan]] = { + private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = { List( // The rule is required despite whether the stage is fallen back or not. Since // ColumnarCachedBatchSerializer is statically registered to Spark without a columnar rule From 1f36523c25ee8d0d2088e6a320408c3f0d68d113 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Jun 2024 11:40:22 +0800 Subject: [PATCH 3/4] fixup --- .../gluten/extension/columnar/heuristic/HeuristicApplier.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index 17adf8bbeb4e..ad68786e6579 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -48,7 +48,7 @@ class HeuristicApplier(session: SparkSession) } // Visible for testing. - def withTransformRules(transformRules: List[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] = + def withTransformRules(transformRules: Seq[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] = plan => PhysicalPlanSelector.maybe(session, plan) { val finalPlan = prepareFallback(plan) { From c2a338045b977a4e1ded7df4e7d2aaab60c95057 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Jun 2024 12:48:58 +0800 Subject: [PATCH 4/4] fixup --- .../apache/spark/sql/SparkQueryRunner.scala | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala index bb11a679f9eb..b68f74c1d5ed 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala @@ -18,13 +18,9 @@ package org.apache.spark.sql import org.apache.spark.{SparkContext, Success, TaskKilled} import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.scheduler.{ - SparkListener, - SparkListenerExecutorMetricsUpdate, - SparkListenerTaskEnd, - SparkListenerTaskStart -} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.sql.KillTaskListener.INIT_WAIT_TIME_MS +import org.apache.spark.sql.catalyst.QueryPlanningTracker import com.google.common.base.Preconditions import org.apache.commons.lang3.RandomUtils @@ -50,7 +46,8 @@ object SparkQueryRunner { "ProcessTreePythonVMemory", "ProcessTreePythonRSSMemory", "ProcessTreeOtherVMemory", - "ProcessTreeOtherRSSMemory") + "ProcessTreeOtherRSSMemory" + ) def runQuery( spark: SparkSession, @@ -82,25 +79,33 @@ object SparkQueryRunner { println(s"Executing SQL query from resource path $queryPath...") try { + val tracker = new QueryPlanningTracker val sql = resourceToString(queryPath) val prev = System.nanoTime() val df = spark.sql(sql) - val rows = df.collect() + val rows = QueryPlanningTracker.withTracker(tracker) { + df.collect() + } if (explain) { df.explain(extended = true) } - val planMillis = - df.queryExecution.tracker.phases.values.map(p => p.endTimeMs - p.startTimeMs).sum + val sparkTracker = df.queryExecution.tracker + val sparkRulesMillis = + sparkTracker.rules.map(_._2.totalTimeNs).sum / 1000000L + val otherRulesMillis = + tracker.rules.map(_._2.totalTimeNs).sum / 1000000L + val planMillis = sparkRulesMillis + otherRulesMillis val totalMillis = (System.nanoTime() - prev) / 1000000L val collectedMetrics = metrics.map(name => (name, em.getMetricValue(name))).toMap RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics) } finally { sc.removeSparkListener(metricsListener) - killTaskListener.foreach(l => { - sc.removeSparkListener(l) - println(s"Successful kill rate ${"%.2f%%" - .format(100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}") - }) + killTaskListener.foreach( + l => { + sc.removeSparkListener(l) + println(s"Successful kill rate ${"%.2f%%" + .format(100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}") + }) sc.setJobDescription(null) } } @@ -166,7 +171,8 @@ class KillTaskListener(val sc: SparkContext) extends SparkListener { val total = Math.min( stageKillMaxWaitTimeLookup.computeIfAbsent(taskStart.stageId, _ => Long.MaxValue), stageKillWaitTimeLookup - .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS)) + .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS) + ) val elapsed = System.currentTimeMillis() - startMs val remaining = total - elapsed if (remaining <= 0L) { @@ -180,6 +186,7 @@ class KillTaskListener(val sc: SparkContext) extends SparkListener { } throw new IllegalStateException() } + val elapsed = wait() // We have 50% chance to kill the task. FIXME make it configurable?