Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL][1.2] Port #7121 #7448 #7988

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<!-- Fasterxml -->
Expand Down
2 changes: 1 addition & 1 deletion backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,7 @@ class EnumeratedApplier(session: SparkSession)
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
16
} else {
14
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
PhysicalPlanSelector.maybe(session, plan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ class HeuristicApplier(session: SparkSession)
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other versions of spark.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
19
} else {
17
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
withTransformRules(transformRules(outputsColumnar)).apply(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ sealed trait AdaptiveContext {
}

object AdaptiveContext {
def apply(session: SparkSession, aqeStackTraceIndex: Int): AdaptiveContext =
new AdaptiveContextImpl(session, aqeStackTraceIndex)
def apply(session: SparkSession): AdaptiveContext =
new AdaptiveContextImpl(session)

private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"

Expand All @@ -44,8 +44,7 @@ object AdaptiveContext {
private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])

private class AdaptiveContextImpl(session: SparkSession, aqeStackTraceIndex: Int)
extends AdaptiveContext {
private class AdaptiveContextImpl(session: SparkSession) extends AdaptiveContext {
// Just for test use.
override def enableAdaptiveContext(): Unit = {
session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
Expand All @@ -59,19 +58,13 @@ object AdaptiveContext {

override def setAdaptiveContext(): Unit = {
val traceElements = Thread.currentThread.getStackTrace
assert(
traceElements.length > aqeStackTraceIndex,
s"The number of stack trace elements is expected to be more than $aqeStackTraceIndex")
// ApplyColumnarRulesAndInsertTransitions is called by either QueryExecution or
// AdaptiveSparkPlanExec. So by checking the stack trace, we can know whether
// columnar rule will be applied in adaptive execution context. This part of code
// needs to be carefully checked when supporting higher versions of spark to make
// sure the calling stack has not been changed.
// columnar rule will be applied in adaptive execution context.
localIsAdaptiveContextFlags
.get()
.prepend(
traceElements(aqeStackTraceIndex).getClassName
.equals(AdaptiveSparkPlanExec.getClass.getName))
traceElements.exists(_.getClassName.equals(AdaptiveSparkPlanExec.getClass.getName)))
}

override def resetAdaptiveContext(): Unit =
Expand Down
2 changes: 1 addition & 1 deletion gluten-ut/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
Expand Down
Loading