From 14cef0fabae1b68903832a1914db1b120fcb860a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 30 May 2024 12:56:29 +0800 Subject: [PATCH] fixup --- .../VeloxColumnarWriteFilesExec.scala | 28 +++++++++++++++---- .../sql/execution/GlutenExplainUtils.scala | 18 ++++++++++++ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index a24b13e06bae..11ed883aa9cb 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -272,10 +272,12 @@ case class VeloxColumnarWriteFilesExec private ( extends BinaryExecNode with GlutenPlan with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible { + import VeloxColumnarWriteFilesExec._ val child: SparkPlan = left - // Make sure we hide the noop leaf from SQL UI. + // Make sure we hide the noop leaf from fallback report / SQL UI. + HideNoopLeafFromFallBackReport.ensureRegistered() HideNoopLeafFromVeloxColumnarWriteFiles.ensureRegistered() override lazy val references: AttributeSet = AttributeSet.empty @@ -327,6 +329,7 @@ case class VeloxColumnarWriteFilesExec private ( new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID) } } + override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = @@ -334,7 +337,6 @@ case class VeloxColumnarWriteFilesExec private ( } object VeloxColumnarWriteFilesExec { - def apply( child: SparkPlan, fileFormat: FileFormat, @@ -378,10 +380,24 @@ object VeloxColumnarWriteFilesExec { sealed trait ExecuteWriteCompatible { // To be compatible with Spark (version < 3.4) - protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { - throw new GlutenException( - s"Internal Error ${this.getClass} has write support" + - s" mismatch:\n${this}") + protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] + } + + // Hide the noop leaf from fall back reporting. + private object HideNoopLeafFromFallBackReport extends GlutenExplainUtils.HideFallbackReason { + override def shouldHide(plan: SparkPlan): Boolean = { + hasOnlyOneNoopLeaf(plan) + } + + // True if the plan tree has and only has one single NoopLeaf as its leaf. + private def hasOnlyOneNoopLeaf(plan: SparkPlan): Boolean = { + if (plan.children.size > 1) { + return false + } + if (plan.children.size == 1) { + return hasOnlyOneNoopLeaf(plan.children.head) + } + plan.isInstanceOf[NoopLeaf] } } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala index 781dc6b6f717..fc4acdbf5953 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} import java.util import java.util.Collections.newSetFromMap +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, BitSet} @@ -103,6 +104,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason) } case _: AQEShuffleReadExec => // Ignore + case p: SparkPlan if exclusions.exists(_.shouldHide(p)) => case p: SparkPlan => handleVanillaSparkPlan(p, fallbackNodeToReason) p.innerChildren.foreach(collect) @@ -113,6 +115,22 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { (numGlutenNodes, fallbackNodeToReason.toMap) } + private val exclusions: mutable.ListBuffer[HideFallbackReason] = mutable.ListBuffer() + + trait HideFallbackReason { + private val registered: AtomicBoolean = new AtomicBoolean(false) + + final def ensureRegistered(): Unit = { + if (!registered.compareAndSet(false, true)) { + return + } + exclusions.synchronized { + exclusions += this + } + } + def shouldHide(plan: SparkPlan): Boolean + } + /** * Given a input physical plan, performs the following tasks. * 1. Generate the two part explain output for this plan.