Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan committed Sep 6, 2024
1 parent 7ce863d commit e6ccea3
Showing 1 changed file with 111 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,19 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {

/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the whole stage codegen id for current operator and records it in the
* operator by setting a tag.
* 2. Generate the two part explain output for this plan.
* 1. Computes the whole stage codegen id for current operator and records it in the operator by
* setting a tag. 2. Generate the two part explain output for this plan.
* 1. First part explains the operator tree with each operator tagged with an unique
* identifier.
* 2. Second part explains each operator in a verbose manner.
* identifier. 2. Second part explains each operator in a verbose manner.
*
* Note : This function skips over subqueries. They are handled by its caller.
*
* @param plan Input query plan to process
* @param append function used to append the explain output
* @param collectedOperators The IDs of the operators that are already collected and we shouldn't
* collect again.
* @param plan
* Input query plan to process
* @param append
* function used to append the explain output
* @param collectedOperators
* The IDs of the operators that are already collected and we shouldn't collect again.
*/
private def processPlanSkippingSubqueries[T <: QueryPlan[T]](
plan: T,
Expand All @@ -141,12 +141,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
try {
generateWholeStageCodegenIds(plan)

QueryPlan.append(
plan,
append,
verbose = false,
addSuffix = false,
printOperatorId = true)
QueryPlan.append(plan, append, verbose = false, addSuffix = false, printOperatorId = true)

append("\n")

Expand All @@ -161,8 +156,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {

/**
* Given a input physical plan, performs the following tasks.
* 1. Generates the explain output for the input plan excluding the subquery plans.
* 2. Generates the explain output for each subquery referenced in the plan.
* 1. Generates the explain output for the input plan excluding the subquery plans. 2. Generates
* the explain output for each subquery referenced in the plan.
*
* Note that, ideally this is a no-op as different explain actions operate on different plan,
* instances but cached plan is an exception. The `InMemoryRelation#innerChildren` use a shared
Expand All @@ -180,26 +175,25 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]

var currentOperatorID = 0
currentOperatorID = generateOperatorIDs(plan, currentOperatorID, idMap, reusedExchanges,
true)
currentOperatorID = generateOperatorIDs(plan, currentOperatorID, idMap, reusedExchanges, true)

val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
getSubqueries(plan, subqueries)

currentOperatorID = subqueries.foldLeft(currentOperatorID) {
(curId, plan) => generateOperatorIDs(plan._3.child, curId, idMap, reusedExchanges,
true)
(curId, plan) => generateOperatorIDs(plan._3.child, curId, idMap, reusedExchanges, true)
}

// SPARK-42753: Process subtree for a ReusedExchange with unknown child
val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
reusedExchanges.foreach{ reused =>
val child = reused.child
if (!idMap.containsKey(child)) {
optimizedOutExchanges.append(child)
currentOperatorID = generateOperatorIDs(child, currentOperatorID, idMap,
reusedExchanges, false)
}
reusedExchanges.foreach {
reused =>
val child = reused.child
if (!idMap.containsKey(child)) {
optimizedOutExchanges.append(child)
currentOperatorID =
generateOperatorIDs(child, currentOperatorID, idMap, reusedExchanges, false)
}
}

val collectedOperators = BitSet.empty
Expand All @@ -211,8 +205,9 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
append("\n===== Subqueries =====\n\n")
}
i = i + 1
append(s"Subquery:$i Hosting operator id = " +
s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n")
append(
s"Subquery:$i Hosting operator id = " +
s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n")

// For each subquery expression in the parent plan, process its child plan to compute
// the explain output. In case of subquery reuse, we don't print subquery plan more
Expand All @@ -224,14 +219,15 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
}

i = 0
optimizedOutExchanges.foreach{ exchange =>
if (i == 0) {
append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
}
i = i + 1
append(s"Subplan:$i\n")
processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators)
append("\n")
optimizedOutExchanges.foreach {
exchange =>
if (i == 0) {
append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
}
i = i + 1
append(s"Subplan:$i\n")
processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators)
append("\n")
}
} finally {
localIdMap.set(prevIdMap)
Expand All @@ -240,31 +236,33 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {

/**
* Traverses the supplied input plan in a bottom-up fashion and records the operator id via
* setting a tag in the operator.
* Note :
* - Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't
* appear in the explain output.
* - Operator identifier starts at startOperatorID + 1
* setting a tag in the operator. Note :
* - Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't appear in
* the explain output.
* - Operator identifier starts at startOperatorID + 1
*
* @param plan Input query plan to process
* @param startOperatorID The start value of operation id. The subsequent operations will be
* assigned higher value.
* @param idMap A reference-unique map store operators visited by generateOperatorIds and its
* id. This Map is scoped at the callsite function processPlan. It serves three
* purpose:
* Firstly, it stores the QueryPlan - generated ID mapping. Secondly, it is used to
* avoid accidentally overwriting existing IDs that were generated in the same
* processPlan call. Thirdly, it is used to allow for intentional ID overwriting as
* part of SPARK-42753 where an Adaptively Optimized Out Exchange and its subtree
* may contain IDs that were generated in a previous AQE iteration's processPlan
* call which would result in incorrect IDs.
* @param reusedExchanges A unique set of ReusedExchange nodes visited which will be used to
* idenitfy adaptively optimized out exchanges in SPARK-42753.
* @param addReusedExchanges Whether to add ReusedExchange nodes to reusedExchanges set. We set it
* to false to avoid processing more nested ReusedExchanges nodes in the
* subtree of an Adpatively Optimized Out Exchange.
* @return The last generated operation id for this input plan. This is to ensure we always
* assign incrementing unique id to each operator.
* @param plan
* Input query plan to process
* @param startOperatorID
* The start value of operation id. The subsequent operations will be assigned higher value.
* @param idMap
* A reference-unique map store operators visited by generateOperatorIds and its id. This Map is
* scoped at the callsite function processPlan. It serves three purpose: Firstly, it stores the
* QueryPlan - generated ID mapping. Secondly, it is used to avoid accidentally overwriting
* existing IDs that were generated in the same processPlan call. Thirdly, it is used to allow
* for intentional ID overwriting as part of SPARK-42753 where an Adaptively Optimized Out
* Exchange and its subtree may contain IDs that were generated in a previous AQE iteration's
* processPlan call which would result in incorrect IDs.
* @param reusedExchanges
* A unique set of ReusedExchange nodes visited which will be used to idenitfy adaptively
* optimized out exchanges in SPARK-42753.
* @param addReusedExchanges
* Whether to add ReusedExchange nodes to reusedExchanges set. We set it to false to avoid
* processing more nested ReusedExchanges nodes in the subtree of an Adpatively Optimized Out
* Exchange.
* @return
* The last generated operation id for this input plan. This is to ensure we always assign
* incrementing unique id to each operator.
*/
private def generateOperatorIDs(
plan: QueryPlan[_],
Expand All @@ -278,36 +276,50 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
return currentOperationID
}

def setOpId(plan: QueryPlan[_]): Unit = idMap.computeIfAbsent(plan, plan => {
plan match {
case r: ReusedExchangeExec if addReusedExchanges =>
reusedExchanges.append(r)
case _ =>
}
currentOperationID += 1
currentOperationID
})
def setOpId(plan: QueryPlan[_]): Unit = idMap.computeIfAbsent(
plan,
plan => {
plan match {
case r: ReusedExchangeExec if addReusedExchanges =>
reusedExchanges.append(r)
case _ =>
}
currentOperationID += 1
currentOperationID
})

plan.foreachUp {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec =>
currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID, idMap,
reusedExchanges, addReusedExchanges)
currentOperationID = generateOperatorIDs(
p.executedPlan,
currentOperationID,
idMap,
reusedExchanges,
addReusedExchanges)
if (!p.executedPlan.fastEquals(p.initialPlan)) {
currentOperationID = generateOperatorIDs(p.initialPlan, currentOperationID, idMap,
reusedExchanges, addReusedExchanges)
currentOperationID = generateOperatorIDs(
p.initialPlan,
currentOperationID,
idMap,
reusedExchanges,
addReusedExchanges)
}
setOpId(p)
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID, idMap,
reusedExchanges, addReusedExchanges)
currentOperationID = generateOperatorIDs(
p.plan,
currentOperationID,
idMap,
reusedExchanges,
addReusedExchanges)
setOpId(p)
case other: QueryPlan[_] =>
setOpId(other)
currentOperationID = other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId, idMap, reusedExchanges,
addReusedExchanges)
(curId, plan) =>
generateOperatorIDs(plan, curId, idMap, reusedExchanges, addReusedExchanges)
}
}
currentOperationID
Expand All @@ -317,10 +329,12 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
* Traverses the supplied input plan in a bottom-up fashion and collects operators with assigned
* ids.
*
* @param plan Input query plan to process
* @param operators An output parameter that contains the operators.
* @param collectedOperators The IDs of the operators that are already collected and we shouldn't
* collect again.
* @param plan
* Input query plan to process
* @param operators
* An output parameter that contains the operators.
* @param collectedOperators
* The IDs of the operators that are already collected and we shouldn't collect again.
*/
private def collectOperatorsWithID(
plan: QueryPlan[_],
Expand All @@ -332,8 +346,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
}

def collectOperatorWithID(plan: QueryPlan[_]): Unit = {
Option(ExplainUtils.localIdMap.get().get(plan)).foreach { id =>
if (collectedOperators.add(id)) operators += plan
Option(ExplainUtils.localIdMap.get().get(plan)).foreach {
id => if (collectedOperators.add(id)) operators += plan
}
}

Expand All @@ -356,8 +370,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
}

/**
* Traverses the supplied input plan in a top-down fashion and records the
* whole stage code gen id in the plan via setting a tag.
* Traverses the supplied input plan in a top-down fashion and records the whole stage code gen id
* in the plan via setting a tag.
*/
private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = {
var currentCodegenId = -1
Expand All @@ -382,22 +396,18 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
}
}

/**
* Generate detailed field string with different format based on type of input value
*/
/** Generate detailed field string with different format based on type of input value */
def generateFieldString(fieldName: String, values: Any): String = values match {
case iter: Iterable[_] if (iter.size == 0) => s"${fieldName}: []"
case iter: Iterable[_] => s"${fieldName} [${iter.size}]: ${iter.mkString("[", ", ", "]")}"
case str: String if (str == null || str.isEmpty) => s"${fieldName}: None"
case str: String => s"${fieldName}: ${str}"
case iter: Iterable[_] if (iter.size == 0) => s"$fieldName: []"
case iter: Iterable[_] => s"$fieldName [${iter.size}]: ${iter.mkString("[", ", ", "]")}"
case str: String if (str == null || str.isEmpty) => s"$fieldName: None"
case str: String => s"$fieldName: $str"
case _ => throw new IllegalArgumentException(s"Unsupported type for argument values: $values")
}

/**
* Given a input plan, returns an array of tuples comprising of :
* 1. Hosting operator id.
* 2. Hosting expression
* 3. Subquery plan
* 1. Hosting operator id. 2. Hosting expression 3. Subquery plan
*/
private def getSubqueries(
plan: => QueryPlan[_],
Expand All @@ -408,7 +418,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
case q: QueryStageExec =>
getSubqueries(q.plan, subqueries)
case p: SparkPlan =>
p.expressions.foreach (_.collect {
p.expressions.foreach(_.collect {
case e: PlanExpression[_] =>
e.plan match {
case s: BaseSubqueryExec =>
Expand All @@ -421,10 +431,10 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
}

/**
* Returns the operator identifier for the supplied plan by retrieving the
* `operationId` tag value.
* Returns the operator identifier for the supplied plan by retrieving the `operationId` tag
* value.
*/
def getOpId(plan: QueryPlan[_]): String = {
Option(ExplainUtils.localIdMap.get().get(plan)).map(v => s"$v").getOrElse("unknown")
}
}
}

0 comments on commit e6ccea3

Please sign in to comment.