From 29390e1e43117275a8bdfe58a12b2670a4e556d0 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Thu, 5 Sep 2024 13:18:17 +0800 Subject: [PATCH] [GLUTEN-6748][CORE] Search stack trace to infer adaptive execution context (#7121) Closes #6748 --- .../columnar/enumerated/EnumeratedApplier.scala | 9 +-------- .../columnar/heuristic/HeuristicApplier.scala | 9 +-------- .../columnar/util/AdaptiveContext.scala | 17 +++++------------ 3 files changed, 7 insertions(+), 28 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index bebce3a61ae8..6ce4e24ed329 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -40,14 +40,7 @@ class EnumeratedApplier(session: SparkSession, ruleBuilders: Seq[ColumnarRuleBui extends ColumnarRuleApplier with Logging with LogLevelUtil { - // An empirical value. - private val aqeStackTraceIndex = - if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) { - 16 - } else { - 14 - } - private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex) + private val adaptiveContext = AdaptiveContext(session) override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = { val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index dea9f01df2a5..85f44878f2c1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -39,14 +39,7 @@ class HeuristicApplier( extends ColumnarRuleApplier with Logging with LogLevelUtil { - // This is an empirical value, may need to be changed for supporting other versions of spark. - private val aqeStackTraceIndex = - if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) { - 19 - } else { - 17 - } - private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex) + private val adaptiveContext = AdaptiveContext(session) override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = { val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala index e1f594fd36e5..de72bc4bc97b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala @@ -34,8 +34,8 @@ sealed trait AdaptiveContext { } object AdaptiveContext { - def apply(session: SparkSession, aqeStackTraceIndex: Int): AdaptiveContext = - new AdaptiveContextImpl(session, aqeStackTraceIndex) + def apply(session: SparkSession): AdaptiveContext = + new AdaptiveContextImpl(session) private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext" @@ -45,8 +45,7 @@ object AdaptiveContext { private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] = ThreadLocal.withInitial(() => ListBuffer.empty[Boolean]) - private class AdaptiveContextImpl(session: SparkSession, aqeStackTraceIndex: Int) - extends AdaptiveContext { + private class AdaptiveContextImpl(session: SparkSession) extends AdaptiveContext { // Just for test use. override def enableAdaptiveContext(): Unit = { session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true") @@ -60,19 +59,13 @@ object AdaptiveContext { override def setAdaptiveContext(): Unit = { val traceElements = Thread.currentThread.getStackTrace - assert( - traceElements.length > aqeStackTraceIndex, - s"The number of stack trace elements is expected to be more than $aqeStackTraceIndex") // ApplyColumnarRulesAndInsertTransitions is called by either QueryExecution or // AdaptiveSparkPlanExec. So by checking the stack trace, we can know whether - // columnar rule will be applied in adaptive execution context. This part of code - // needs to be carefully checked when supporting higher versions of spark to make - // sure the calling stack has not been changed. + // columnar rule will be applied in adaptive execution context. localIsAdaptiveContextFlags .get() .prepend( - traceElements(aqeStackTraceIndex).getClassName - .equals(AdaptiveSparkPlanExec.getClass.getName)) + traceElements.exists(_.getClassName.equals(AdaptiveSparkPlanExec.getClass.getName))) } override def resetAdaptiveContext(): Unit =