From e6ccea3d8619d324e2b6f38ed4531d0e25eed097 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Fri, 6 Sep 2024 17:27:29 +0800 Subject: [PATCH] fix format Signed-off-by: Yuan Zhou --- .../sql/execution/GlutenExplainUtils.scala | 212 +++++++++--------- 1 file changed, 111 insertions(+), 101 deletions(-) diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala index e9d00d540ec1..c7efbbbb5b07 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala @@ -120,19 +120,19 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { /** * Given a input physical plan, performs the following tasks. - * 1. Computes the whole stage codegen id for current operator and records it in the - * operator by setting a tag. - * 2. Generate the two part explain output for this plan. + * 1. Computes the whole stage codegen id for current operator and records it in the operator by + * setting a tag. 2. Generate the two part explain output for this plan. * 1. First part explains the operator tree with each operator tagged with an unique - * identifier. - * 2. Second part explains each operator in a verbose manner. + * identifier. 2. Second part explains each operator in a verbose manner. * * Note : This function skips over subqueries. They are handled by its caller. * - * @param plan Input query plan to process - * @param append function used to append the explain output - * @param collectedOperators The IDs of the operators that are already collected and we shouldn't - * collect again. + * @param plan + * Input query plan to process + * @param append + * function used to append the explain output + * @param collectedOperators + * The IDs of the operators that are already collected and we shouldn't collect again. */ private def processPlanSkippingSubqueries[T <: QueryPlan[T]]( plan: T, @@ -141,12 +141,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { try { generateWholeStageCodegenIds(plan) - QueryPlan.append( - plan, - append, - verbose = false, - addSuffix = false, - printOperatorId = true) + QueryPlan.append(plan, append, verbose = false, addSuffix = false, printOperatorId = true) append("\n") @@ -161,8 +156,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { /** * Given a input physical plan, performs the following tasks. - * 1. Generates the explain output for the input plan excluding the subquery plans. - * 2. Generates the explain output for each subquery referenced in the plan. + * 1. Generates the explain output for the input plan excluding the subquery plans. 2. Generates + * the explain output for each subquery referenced in the plan. * * Note that, ideally this is a no-op as different explain actions operate on different plan, * instances but cached plan is an exception. The `InMemoryRelation#innerChildren` use a shared @@ -180,26 +175,25 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec] var currentOperatorID = 0 - currentOperatorID = generateOperatorIDs(plan, currentOperatorID, idMap, reusedExchanges, - true) + currentOperatorID = generateOperatorIDs(plan, currentOperatorID, idMap, reusedExchanges, true) val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] getSubqueries(plan, subqueries) currentOperatorID = subqueries.foldLeft(currentOperatorID) { - (curId, plan) => generateOperatorIDs(plan._3.child, curId, idMap, reusedExchanges, - true) + (curId, plan) => generateOperatorIDs(plan._3.child, curId, idMap, reusedExchanges, true) } // SPARK-42753: Process subtree for a ReusedExchange with unknown child val optimizedOutExchanges = ArrayBuffer.empty[Exchange] - reusedExchanges.foreach{ reused => - val child = reused.child - if (!idMap.containsKey(child)) { - optimizedOutExchanges.append(child) - currentOperatorID = generateOperatorIDs(child, currentOperatorID, idMap, - reusedExchanges, false) - } + reusedExchanges.foreach { + reused => + val child = reused.child + if (!idMap.containsKey(child)) { + optimizedOutExchanges.append(child) + currentOperatorID = + generateOperatorIDs(child, currentOperatorID, idMap, reusedExchanges, false) + } } val collectedOperators = BitSet.empty @@ -211,8 +205,9 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { append("\n===== Subqueries =====\n\n") } i = i + 1 - append(s"Subquery:$i Hosting operator id = " + - s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n") + append( + s"Subquery:$i Hosting operator id = " + + s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n") // For each subquery expression in the parent plan, process its child plan to compute // the explain output. In case of subquery reuse, we don't print subquery plan more @@ -224,14 +219,15 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { } i = 0 - optimizedOutExchanges.foreach{ exchange => - if (i == 0) { - append("\n===== Adaptively Optimized Out Exchanges =====\n\n") - } - i = i + 1 - append(s"Subplan:$i\n") - processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators) - append("\n") + optimizedOutExchanges.foreach { + exchange => + if (i == 0) { + append("\n===== Adaptively Optimized Out Exchanges =====\n\n") + } + i = i + 1 + append(s"Subplan:$i\n") + processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators) + append("\n") } } finally { localIdMap.set(prevIdMap) @@ -240,31 +236,33 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { /** * Traverses the supplied input plan in a bottom-up fashion and records the operator id via - * setting a tag in the operator. - * Note : - * - Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't - * appear in the explain output. - * - Operator identifier starts at startOperatorID + 1 + * setting a tag in the operator. Note : + * - Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't appear in + * the explain output. + * - Operator identifier starts at startOperatorID + 1 * - * @param plan Input query plan to process - * @param startOperatorID The start value of operation id. The subsequent operations will be - * assigned higher value. - * @param idMap A reference-unique map store operators visited by generateOperatorIds and its - * id. This Map is scoped at the callsite function processPlan. It serves three - * purpose: - * Firstly, it stores the QueryPlan - generated ID mapping. Secondly, it is used to - * avoid accidentally overwriting existing IDs that were generated in the same - * processPlan call. Thirdly, it is used to allow for intentional ID overwriting as - * part of SPARK-42753 where an Adaptively Optimized Out Exchange and its subtree - * may contain IDs that were generated in a previous AQE iteration's processPlan - * call which would result in incorrect IDs. - * @param reusedExchanges A unique set of ReusedExchange nodes visited which will be used to - * idenitfy adaptively optimized out exchanges in SPARK-42753. - * @param addReusedExchanges Whether to add ReusedExchange nodes to reusedExchanges set. We set it - * to false to avoid processing more nested ReusedExchanges nodes in the - * subtree of an Adpatively Optimized Out Exchange. - * @return The last generated operation id for this input plan. This is to ensure we always - * assign incrementing unique id to each operator. + * @param plan + * Input query plan to process + * @param startOperatorID + * The start value of operation id. The subsequent operations will be assigned higher value. + * @param idMap + * A reference-unique map store operators visited by generateOperatorIds and its id. This Map is + * scoped at the callsite function processPlan. It serves three purpose: Firstly, it stores the + * QueryPlan - generated ID mapping. Secondly, it is used to avoid accidentally overwriting + * existing IDs that were generated in the same processPlan call. Thirdly, it is used to allow + * for intentional ID overwriting as part of SPARK-42753 where an Adaptively Optimized Out + * Exchange and its subtree may contain IDs that were generated in a previous AQE iteration's + * processPlan call which would result in incorrect IDs. + * @param reusedExchanges + * A unique set of ReusedExchange nodes visited which will be used to idenitfy adaptively + * optimized out exchanges in SPARK-42753. + * @param addReusedExchanges + * Whether to add ReusedExchange nodes to reusedExchanges set. We set it to false to avoid + * processing more nested ReusedExchanges nodes in the subtree of an Adpatively Optimized Out + * Exchange. + * @return + * The last generated operation id for this input plan. This is to ensure we always assign + * incrementing unique id to each operator. */ private def generateOperatorIDs( plan: QueryPlan[_], @@ -278,36 +276,50 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { return currentOperationID } - def setOpId(plan: QueryPlan[_]): Unit = idMap.computeIfAbsent(plan, plan => { - plan match { - case r: ReusedExchangeExec if addReusedExchanges => - reusedExchanges.append(r) - case _ => - } - currentOperationID += 1 - currentOperationID - }) + def setOpId(plan: QueryPlan[_]): Unit = idMap.computeIfAbsent( + plan, + plan => { + plan match { + case r: ReusedExchangeExec if addReusedExchanges => + reusedExchanges.append(r) + case _ => + } + currentOperationID += 1 + currentOperationID + }) plan.foreachUp { case _: WholeStageCodegenExec => case _: InputAdapter => case p: AdaptiveSparkPlanExec => - currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID, idMap, - reusedExchanges, addReusedExchanges) + currentOperationID = generateOperatorIDs( + p.executedPlan, + currentOperationID, + idMap, + reusedExchanges, + addReusedExchanges) if (!p.executedPlan.fastEquals(p.initialPlan)) { - currentOperationID = generateOperatorIDs(p.initialPlan, currentOperationID, idMap, - reusedExchanges, addReusedExchanges) + currentOperationID = generateOperatorIDs( + p.initialPlan, + currentOperationID, + idMap, + reusedExchanges, + addReusedExchanges) } setOpId(p) case p: QueryStageExec => - currentOperationID = generateOperatorIDs(p.plan, currentOperationID, idMap, - reusedExchanges, addReusedExchanges) + currentOperationID = generateOperatorIDs( + p.plan, + currentOperationID, + idMap, + reusedExchanges, + addReusedExchanges) setOpId(p) case other: QueryPlan[_] => setOpId(other) currentOperationID = other.innerChildren.foldLeft(currentOperationID) { - (curId, plan) => generateOperatorIDs(plan, curId, idMap, reusedExchanges, - addReusedExchanges) + (curId, plan) => + generateOperatorIDs(plan, curId, idMap, reusedExchanges, addReusedExchanges) } } currentOperationID @@ -317,10 +329,12 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { * Traverses the supplied input plan in a bottom-up fashion and collects operators with assigned * ids. * - * @param plan Input query plan to process - * @param operators An output parameter that contains the operators. - * @param collectedOperators The IDs of the operators that are already collected and we shouldn't - * collect again. + * @param plan + * Input query plan to process + * @param operators + * An output parameter that contains the operators. + * @param collectedOperators + * The IDs of the operators that are already collected and we shouldn't collect again. */ private def collectOperatorsWithID( plan: QueryPlan[_], @@ -332,8 +346,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { } def collectOperatorWithID(plan: QueryPlan[_]): Unit = { - Option(ExplainUtils.localIdMap.get().get(plan)).foreach { id => - if (collectedOperators.add(id)) operators += plan + Option(ExplainUtils.localIdMap.get().get(plan)).foreach { + id => if (collectedOperators.add(id)) operators += plan } } @@ -356,8 +370,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { } /** - * Traverses the supplied input plan in a top-down fashion and records the - * whole stage code gen id in the plan via setting a tag. + * Traverses the supplied input plan in a top-down fashion and records the whole stage code gen id + * in the plan via setting a tag. */ private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = { var currentCodegenId = -1 @@ -382,22 +396,18 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { } } - /** - * Generate detailed field string with different format based on type of input value - */ + /** Generate detailed field string with different format based on type of input value */ def generateFieldString(fieldName: String, values: Any): String = values match { - case iter: Iterable[_] if (iter.size == 0) => s"${fieldName}: []" - case iter: Iterable[_] => s"${fieldName} [${iter.size}]: ${iter.mkString("[", ", ", "]")}" - case str: String if (str == null || str.isEmpty) => s"${fieldName}: None" - case str: String => s"${fieldName}: ${str}" + case iter: Iterable[_] if (iter.size == 0) => s"$fieldName: []" + case iter: Iterable[_] => s"$fieldName [${iter.size}]: ${iter.mkString("[", ", ", "]")}" + case str: String if (str == null || str.isEmpty) => s"$fieldName: None" + case str: String => s"$fieldName: $str" case _ => throw new IllegalArgumentException(s"Unsupported type for argument values: $values") } /** * Given a input plan, returns an array of tuples comprising of : - * 1. Hosting operator id. - * 2. Hosting expression - * 3. Subquery plan + * 1. Hosting operator id. 2. Hosting expression 3. Subquery plan */ private def getSubqueries( plan: => QueryPlan[_], @@ -408,7 +418,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { case q: QueryStageExec => getSubqueries(q.plan, subqueries) case p: SparkPlan => - p.expressions.foreach (_.collect { + p.expressions.foreach(_.collect { case e: PlanExpression[_] => e.plan match { case s: BaseSubqueryExec => @@ -421,10 +431,10 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { } /** - * Returns the operator identifier for the supplied plan by retrieving the - * `operationId` tag value. + * Returns the operator identifier for the supplied plan by retrieving the `operationId` tag + * value. */ def getOpId(plan: QueryPlan[_]): String = { Option(ExplainUtils.localIdMap.get().get(plan)).map(v => s"$v").getOrElse("unknown") } -} \ No newline at end of file +}