Skip to content

Commit

Permalink
[GLUTEN-6748][CORE] Search stack trace to infer adaptive execution co…
Browse files Browse the repository at this point in the history
…ntext (apache#7121)

Closes apache#6748
  • Loading branch information
PHILO-HE authored and hengzhen.sq committed Sep 11, 2024
1 parent 6edeb99 commit 6d27ac9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,7 @@ class EnumeratedApplier(session: SparkSession, ruleBuilders: Seq[ColumnarRuleBui
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
16
} else {
14
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ class HeuristicApplier(
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other versions of spark.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
19
} else {
17
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ sealed trait AdaptiveContext {
}

object AdaptiveContext {
def apply(session: SparkSession, aqeStackTraceIndex: Int): AdaptiveContext =
new AdaptiveContextImpl(session, aqeStackTraceIndex)
def apply(session: SparkSession): AdaptiveContext =
new AdaptiveContextImpl(session)

private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"

Expand All @@ -45,8 +45,7 @@ object AdaptiveContext {
private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])

private class AdaptiveContextImpl(session: SparkSession, aqeStackTraceIndex: Int)
extends AdaptiveContext {
private class AdaptiveContextImpl(session: SparkSession) extends AdaptiveContext {
// Just for test use.
override def enableAdaptiveContext(): Unit = {
session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
Expand All @@ -60,19 +59,13 @@ object AdaptiveContext {

override def setAdaptiveContext(): Unit = {
val traceElements = Thread.currentThread.getStackTrace
assert(
traceElements.length > aqeStackTraceIndex,
s"The number of stack trace elements is expected to be more than $aqeStackTraceIndex")
// ApplyColumnarRulesAndInsertTransitions is called by either QueryExecution or
// AdaptiveSparkPlanExec. So by checking the stack trace, we can know whether
// columnar rule will be applied in adaptive execution context. This part of code
// needs to be carefully checked when supporting higher versions of spark to make
// sure the calling stack has not been changed.
// columnar rule will be applied in adaptive execution context.
localIsAdaptiveContextFlags
.get()
.prepend(
traceElements(aqeStackTraceIndex).getClassName
.equals(AdaptiveSparkPlanExec.getClass.getName))
traceElements.exists(_.getClassName.equals(AdaptiveSparkPlanExec.getClass.getName)))
}

override def resetAdaptiveContext(): Unit =
Expand Down

0 comments on commit 6d27ac9

Please sign in to comment.