Skip to content

Commit

Permalink
Initial
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE committed Sep 5, 2024
1 parent e620830 commit a2f8a8b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,7 @@ class EnumeratedApplier(session: SparkSession, ruleBuilders: Seq[ColumnarRuleBui
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
16
} else {
14
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ class HeuristicApplier(
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other versions of spark.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
19
} else {
17
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ sealed trait AdaptiveContext {
}

object AdaptiveContext {
def apply(session: SparkSession, aqeStackTraceIndex: Int): AdaptiveContext =
new AdaptiveContextImpl(session, aqeStackTraceIndex)
def apply(session: SparkSession): AdaptiveContext =
new AdaptiveContextImpl(session)

private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"

Expand All @@ -45,7 +45,7 @@ object AdaptiveContext {
private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])

private class AdaptiveContextImpl(session: SparkSession, aqeStackTraceIndex: Int)
private class AdaptiveContextImpl(session: SparkSession)
extends AdaptiveContext {
// Just for test use.
override def enableAdaptiveContext(): Unit = {
Expand All @@ -60,19 +60,13 @@ object AdaptiveContext {

override def setAdaptiveContext(): Unit = {
val traceElements = Thread.currentThread.getStackTrace
assert(
traceElements.length > aqeStackTraceIndex,
s"The number of stack trace elements is expected to be more than $aqeStackTraceIndex")
// ApplyColumnarRulesAndInsertTransitions is called by either QueryExecution or
// AdaptiveSparkPlanExec. So by checking the stack trace, we can know whether
// columnar rule will be applied in adaptive execution context. This part of code
// needs to be carefully checked when supporting higher versions of spark to make
// sure the calling stack has not been changed.
// columnar rule will be applied in adaptive execution context.
localIsAdaptiveContextFlags
.get()
.prepend(
traceElements(aqeStackTraceIndex).getClassName
.equals(AdaptiveSparkPlanExec.getClass.getName))
traceElements.exists(_.getClassName.equals(AdaptiveSparkPlanExec.getClass.getName)))
}

override def resetAdaptiveContext(): Unit =
Expand Down

0 comments on commit a2f8a8b

Please sign in to comment.