From 32a7f8ba20b4a34eab1fbe595f08080d30155763 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 10 May 2024 17:01:27 +0800 Subject: [PATCH 1/2] Update VeloxColumnarWriteFilesExec.scala fixup fixup fixup fixup fixup fixup fixup fixup UI --- .../VeloxColumnarWriteFilesExec.scala | 47 +++++++++ .../ui/GlutenSQLAppStatusListener.scala | 2 + .../ui/adjustment/PlanGraphAdjustment.scala | 96 +++++++++++++++++++ 3 files changed, 145 insertions(+) create mode 100644 gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/adjustment/PlanGraphAdjustment.scala diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 1d3d55afb526..a24b13e06bae 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.ui.{SparkPlanGraphEdge, SparkPlanGraphNodeWrapper, SparkPlanGraphWrapper} +import org.apache.spark.sql.execution.ui.adjustment.PlanGraphAdjustment import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils @@ -273,6 +275,9 @@ case class VeloxColumnarWriteFilesExec private ( val child: SparkPlan = left + // Make sure we hide the noop leaf from SQL UI. + HideNoopLeafFromVeloxColumnarWriteFiles.ensureRegistered() + override lazy val references: AttributeSet = AttributeSet.empty override def supportsColumnar(): Boolean = true @@ -379,4 +384,46 @@ object VeloxColumnarWriteFilesExec { s" mismatch:\n${this}") } } + + // Hide the noop leaf from SQL UI. + private object HideNoopLeafFromVeloxColumnarWriteFiles extends PlanGraphAdjustment { + override def apply(graph: SparkPlanGraphWrapper): SparkPlanGraphWrapper = { + val nodeLeafNodes = graph.nodes + .filter(_.node != null) + .filter(n => n.node.name == "NoopLeaf") + if (nodeLeafNodes.isEmpty) { + return graph + } + val nodesToRemove = mutable.ListBuffer[SparkPlanGraphNodeWrapper]() + val edgesToRemove = mutable.ListBuffer[SparkPlanGraphEdge]() + + nodeLeafNodes.foreach { + node => + nodesToRemove += node + keepFinding() + def keepFinding(): Unit = { + var tmp = node + for (_ <- graph.nodes.indices) { + val parent = findParent(graph, tmp) + if (parent.isEmpty) { + return + } + if (parent.get._1.node.name == "VeloxColumnarWriteFiles") { + edgesToRemove += parent.get._2 + return + } + nodesToRemove += parent.get._1 + edgesToRemove += parent.get._2 + tmp = parent.get._1 + } + } + } + + new SparkPlanGraphWrapper( + graph.executionId, + (graph.nodes.toSet -- nodesToRemove).toSeq, + (graph.edges.toSet -- edgesToRemove).toSeq + ) + } + } } diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala index 7c236b4e8881..ee0ac029f097 100644 --- a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala +++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala @@ -21,6 +21,7 @@ import org.apache.gluten.events.{GlutenBuildInfoEvent, GlutenPlanFallbackEvent} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.ui.adjustment.PlanGraphAdjustment import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.{ElementTrackingStore, KVUtils} @@ -79,6 +80,7 @@ class GlutenSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore) } private def onSQLExtensionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + PlanGraphAdjustment.adjust(kvstore, event.executionId) executionIdToDescription.remove(event.executionId) executionIdToFallbackEvent.remove(event.executionId) } diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/adjustment/PlanGraphAdjustment.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/adjustment/PlanGraphAdjustment.scala new file mode 100644 index 000000000000..2919535d1597 --- /dev/null +++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/adjustment/PlanGraphAdjustment.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui.adjustment + +import org.apache.spark.sql.execution.ui.{SparkPlanGraphEdge, SparkPlanGraphNodeWrapper, SparkPlanGraphWrapper} +import org.apache.spark.sql.execution.ui.adjustment.PlanGraphAdjustment.register +import org.apache.spark.status.ElementTrackingStore + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +trait PlanGraphAdjustment { + private val registered: AtomicBoolean = new AtomicBoolean(false) + + final def ensureRegistered(): Unit = { + if (!registered.compareAndSet(false, true)) { + return + } + register(this) + } + + final protected def findChildren( + graph: SparkPlanGraphWrapper, + node: SparkPlanGraphNodeWrapper): Seq[(SparkPlanGraphNodeWrapper, SparkPlanGraphEdge)] = { + val id = node.node.id + val out = graph.edges + .filter(_.toId == id) + .flatMap { + childEdge => + graph.nodes + .filter(_.node != null) + .find(_.node.id == childEdge.fromId) + .map(childNode => (childNode, childEdge)) + } + out + } + + final protected def findParent( + graph: SparkPlanGraphWrapper, + node: SparkPlanGraphNodeWrapper): Option[(SparkPlanGraphNodeWrapper, SparkPlanGraphEdge)] = { + val id = node.node.id + val out = graph.edges + .filter(_.fromId == id) + .flatMap { + parentEdge => + graph.nodes + .filter(_.node != null) + .find(_.node.id == parentEdge.toId) + .map(parentNode => (parentNode, parentEdge)) + } + if (out.isEmpty) { + return None + } + if (out.size == 1) { + return Some(out.head) + } + throw new IllegalStateException("Node has multiple parents, it should not happen: " + node) + } + + def apply(from: SparkPlanGraphWrapper): SparkPlanGraphWrapper +} + +object PlanGraphAdjustment { + private val adjustments: ListBuffer[PlanGraphAdjustment] = mutable.ListBuffer() + + private[ui] def register(adjustment: PlanGraphAdjustment): Unit = synchronized { + adjustments += adjustment + } + + private[ui] def adjust(kvstore: ElementTrackingStore, executionId: Long): Unit = { + val graph = kvstore.read(classOf[SparkPlanGraphWrapper], executionId) + val out = adjustments.foldLeft(graph) { + case (g, a) => + a.apply(g) + } + assert(out.executionId == executionId) + kvstore.delete(classOf[SparkPlanGraphWrapper], executionId) + kvstore.write(out) + } +} From 14cef0fabae1b68903832a1914db1b120fcb860a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 30 May 2024 12:56:29 +0800 Subject: [PATCH 2/2] fixup --- .../VeloxColumnarWriteFilesExec.scala | 28 +++++++++++++++---- .../sql/execution/GlutenExplainUtils.scala | 18 ++++++++++++ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index a24b13e06bae..11ed883aa9cb 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -272,10 +272,12 @@ case class VeloxColumnarWriteFilesExec private ( extends BinaryExecNode with GlutenPlan with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible { + import VeloxColumnarWriteFilesExec._ val child: SparkPlan = left - // Make sure we hide the noop leaf from SQL UI. + // Make sure we hide the noop leaf from fallback report / SQL UI. + HideNoopLeafFromFallBackReport.ensureRegistered() HideNoopLeafFromVeloxColumnarWriteFiles.ensureRegistered() override lazy val references: AttributeSet = AttributeSet.empty @@ -327,6 +329,7 @@ case class VeloxColumnarWriteFilesExec private ( new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID) } } + override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = @@ -334,7 +337,6 @@ case class VeloxColumnarWriteFilesExec private ( } object VeloxColumnarWriteFilesExec { - def apply( child: SparkPlan, fileFormat: FileFormat, @@ -378,10 +380,24 @@ object VeloxColumnarWriteFilesExec { sealed trait ExecuteWriteCompatible { // To be compatible with Spark (version < 3.4) - protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { - throw new GlutenException( - s"Internal Error ${this.getClass} has write support" + - s" mismatch:\n${this}") + protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] + } + + // Hide the noop leaf from fall back reporting. + private object HideNoopLeafFromFallBackReport extends GlutenExplainUtils.HideFallbackReason { + override def shouldHide(plan: SparkPlan): Boolean = { + hasOnlyOneNoopLeaf(plan) + } + + // True if the plan tree has and only has one single NoopLeaf as its leaf. + private def hasOnlyOneNoopLeaf(plan: SparkPlan): Boolean = { + if (plan.children.size > 1) { + return false + } + if (plan.children.size == 1) { + return hasOnlyOneNoopLeaf(plan.children.head) + } + plan.isInstanceOf[NoopLeaf] } } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala index 781dc6b6f717..fc4acdbf5953 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} import java.util import java.util.Collections.newSetFromMap +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, BitSet} @@ -103,6 +104,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason) } case _: AQEShuffleReadExec => // Ignore + case p: SparkPlan if exclusions.exists(_.shouldHide(p)) => case p: SparkPlan => handleVanillaSparkPlan(p, fallbackNodeToReason) p.innerChildren.foreach(collect) @@ -113,6 +115,22 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { (numGlutenNodes, fallbackNodeToReason.toMap) } + private val exclusions: mutable.ListBuffer[HideFallbackReason] = mutable.ListBuffer() + + trait HideFallbackReason { + private val registered: AtomicBoolean = new AtomicBoolean(false) + + final def ensureRegistered(): Unit = { + if (!registered.compareAndSet(false, true)) { + return + } + exclusions.synchronized { + exclusions += this + } + } + def shouldHide(plan: SparkPlan): Boolean + } + /** * Given a input physical plan, performs the following tasks. * 1. Generate the two part explain output for this plan.