Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
fixup
  • Loading branch information
zhztheplayer committed Oct 9, 2024
1 parent 3354763 commit fc29183
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,92 +151,97 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
// scalastyle:off
/**
* Given a input physical plan, performs the following tasks.
* 1. Generates the explain output for the input plan excluding the subquery plans.
* 2. Generates the explain output for each subquery referenced in the plan.
* 1. Generates the explain output for the input plan excluding the subquery plans. 2. Generates
* the explain output for each subquery referenced in the plan.
*/
def processPlan[T <: QueryPlan[T]](
plan: T,
append: String => Unit,
collectFallbackFunc: Option[QueryPlan[_] => FallbackInfo] = None): FallbackInfo = synchronized {
try {
// Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
// intentional overwriting of IDs generated in previous AQE iteration
val operators = newSetFromMap[QueryPlan[_]](new util.IdentityHashMap())
// Initialize an array of ReusedExchanges to help find Adaptively Optimized Out
// Exchanges as part of SPARK-42753
val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]
collectFallbackFunc: Option[QueryPlan[_] => FallbackInfo] = None): FallbackInfo =
synchronized {
SparkShimLoader.getSparkShims.withOperatorIdMap(
new java.util.IdentityHashMap[QueryPlan[_], Int]()) {
try {
// Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
// intentional overwriting of IDs generated in previous AQE iteration
val operators = newSetFromMap[QueryPlan[_]](new util.IdentityHashMap())
// Initialize an array of ReusedExchanges to help find Adaptively Optimized Out
// Exchanges as part of SPARK-42753
val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]

var currentOperatorID = 0
currentOperatorID =
generateOperatorIDs(plan, currentOperatorID, operators, reusedExchanges, true)
var currentOperatorID = 0
currentOperatorID =
generateOperatorIDs(plan, currentOperatorID, operators, reusedExchanges, true)

val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
getSubqueries(plan, subqueries)
val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
getSubqueries(plan, subqueries)

currentOperatorID = subqueries.foldLeft(currentOperatorID) {
(curId, plan) => generateOperatorIDs(plan._3.child, curId, operators, reusedExchanges, true)
}
currentOperatorID = subqueries.foldLeft(currentOperatorID) {
(curId, plan) =>
generateOperatorIDs(plan._3.child, curId, operators, reusedExchanges, true)
}

// SPARK-42753: Process subtree for a ReusedExchange with unknown child
val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
reusedExchanges.foreach {
reused =>
val child = reused.child
if (!operators.contains(child)) {
optimizedOutExchanges.append(child)
currentOperatorID =
generateOperatorIDs(child, currentOperatorID, operators, reusedExchanges, false)
// SPARK-42753: Process subtree for a ReusedExchange with unknown child
val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
reusedExchanges.foreach {
reused =>
val child = reused.child
if (!operators.contains(child)) {
optimizedOutExchanges.append(child)
currentOperatorID =
generateOperatorIDs(child, currentOperatorID, operators, reusedExchanges, false)
}
}
}

val collectedOperators = BitSet.empty
processPlanSkippingSubqueries(plan, append, collectedOperators)
val collectedOperators = BitSet.empty
processPlanSkippingSubqueries(plan, append, collectedOperators)

var i = 0
for (sub <- subqueries) {
if (i == 0) {
append("\n===== Subqueries =====\n\n")
}
i = i + 1
append(
s"Subquery:$i Hosting operator id = " +
s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n")
var i = 0
for (sub <- subqueries) {
if (i == 0) {
append("\n===== Subqueries =====\n\n")
}
i = i + 1
append(
s"Subquery:$i Hosting operator id = " +
s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n")

// For each subquery expression in the parent plan, process its child plan to compute
// the explain output. In case of subquery reuse, we don't print subquery plan more
// than once. So we skip [[ReusedSubqueryExec]] here.
if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
processPlanSkippingSubqueries(sub._3.child, append, collectedOperators)
}
append("\n")
}
// For each subquery expression in the parent plan, process its child plan to compute
// the explain output. In case of subquery reuse, we don't print subquery plan more
// than once. So we skip [[ReusedSubqueryExec]] here.
if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
processPlanSkippingSubqueries(sub._3.child, append, collectedOperators)
}
append("\n")
}

i = 0
optimizedOutExchanges.foreach {
exchange =>
if (i == 0) {
append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
i = 0
optimizedOutExchanges.foreach {
exchange =>
if (i == 0) {
append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
}
i = i + 1
append(s"Subplan:$i\n")
processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators)
append("\n")
}
i = i + 1
append(s"Subplan:$i\n")
processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators)
append("\n")
}

(subqueries.filter(!_._3.isInstanceOf[ReusedSubqueryExec]).map(_._3.child) :+ plan)
.map {
plan =>
if (collectFallbackFunc.isEmpty) {
collectFallbackNodes(plan)
} else {
collectFallbackFunc.get.apply(plan)
(subqueries.filter(!_._3.isInstanceOf[ReusedSubqueryExec]).map(_._3.child) :+ plan)
.map {
plan =>
if (collectFallbackFunc.isEmpty) {
collectFallbackNodes(plan)
} else {
collectFallbackFunc.get.apply(plan)
}
}
.reduce((a, b) => (a._1 + b._1, a._2 ++ b._2))
} finally {
removeTags(plan)
}
.reduce((a, b) => (a._1 + b._1, a._2 ++ b._2))
} finally {
removeTags(plan)
}
}
}
// scalastyle:on
// spotless:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,17 @@ trait SparkShims {
throw new UnsupportedOperationException("ArrayInsert not supported.")
}

/** Shim method for GlutenExplainUtils.scala. */
/** Shim method for usages from GlutenExplainUtils.scala. */
def withOperatorIdMap[T](idMap: java.util.Map[QueryPlan[_], Int])(body: => T): T = {
body
}

/** Shim method for usages from GlutenExplainUtils.scala. */
def getOperatorId(plan: QueryPlan[_]): Option[Int]

/** Shim method for GlutenExplainUtils.scala. */
/** Shim method for usages from GlutenExplainUtils.scala. */
def setOperatorId(plan: QueryPlan[_], opId: Int): Unit

/** Shim method for GlutenExplainUtils.scala. */
/** Shim method for usages from GlutenExplainUtils.scala. */
def unsetOperatorId(plan: QueryPlan[_]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,26 @@ class Spark35Shims extends SparkShims {
Seq(expr.srcArrayExpr, expr.posExpr, expr.itemExpr, Literal(expr.legacyNegativeIndex))
}

override def withOperatorIdMap[T](idMap: java.util.Map[QueryPlan[_], Int])(body: => T): T = {
val prevIdMap = QueryPlan.localIdMap.get()
try {
QueryPlan.localIdMap.set(idMap)
body
} finally {
QueryPlan.localIdMap.set(prevIdMap)
}
}

override def getOperatorId(plan: QueryPlan[_]): Option[Int] = {
plan.getTagValue(QueryPlan.OP_ID_TAG)
Option(QueryPlan.localIdMap.get().get(plan))
}

override def setOperatorId(plan: QueryPlan[_], opId: Int): Unit = {
plan.setTagValue(QueryPlan.OP_ID_TAG, opId)
val prev: Integer = QueryPlan.localIdMap.get().put(plan, opId)
assert(prev == null)
}

override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
QueryPlan.localIdMap.get().remove(plan)
}
}

0 comments on commit fc29183

Please sign in to comment.