Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed May 30, 2024
1 parent 32a7f8b commit 14cef0f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,12 @@ case class VeloxColumnarWriteFilesExec private (
extends BinaryExecNode
with GlutenPlan
with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible {
import VeloxColumnarWriteFilesExec._

val child: SparkPlan = left

// Make sure we hide the noop leaf from SQL UI.
// Make sure we hide the noop leaf from fallback report / SQL UI.
HideNoopLeafFromFallBackReport.ensureRegistered()
HideNoopLeafFromVeloxColumnarWriteFiles.ensureRegistered()

override lazy val references: AttributeSet = AttributeSet.empty
Expand Down Expand Up @@ -327,14 +329,14 @@ case class VeloxColumnarWriteFilesExec private (
new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID)
}
}

override protected def withNewChildrenInternal(
newLeft: SparkPlan,
newRight: SparkPlan): SparkPlan =
copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
}

object VeloxColumnarWriteFilesExec {

def apply(
child: SparkPlan,
fileFormat: FileFormat,
Expand Down Expand Up @@ -378,10 +380,24 @@ object VeloxColumnarWriteFilesExec {

sealed trait ExecuteWriteCompatible {
// To be compatible with Spark (version < 3.4)
protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
throw new GlutenException(
s"Internal Error ${this.getClass} has write support" +
s" mismatch:\n${this}")
protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage]
}

// Hide the noop leaf from fall back reporting.
private object HideNoopLeafFromFallBackReport extends GlutenExplainUtils.HideFallbackReason {
override def shouldHide(plan: SparkPlan): Boolean = {
hasOnlyOneNoopLeaf(plan)
}

// True if the plan tree has and only has one single NoopLeaf as its leaf.
private def hasOnlyOneNoopLeaf(plan: SparkPlan): Boolean = {
if (plan.children.size > 1) {
return false
}
if (plan.children.size == 1) {
return hasOnlyOneNoopLeaf(plan.children.head)
}
plan.isInstanceOf[NoopLeaf]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}

import java.util
import java.util.Collections.newSetFromMap
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, BitSet}
Expand Down Expand Up @@ -103,6 +104,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
}
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan if exclusions.exists(_.shouldHide(p)) =>
case p: SparkPlan =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
Expand All @@ -113,6 +115,22 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
(numGlutenNodes, fallbackNodeToReason.toMap)
}

private val exclusions: mutable.ListBuffer[HideFallbackReason] = mutable.ListBuffer()

trait HideFallbackReason {
private val registered: AtomicBoolean = new AtomicBoolean(false)

final def ensureRegistered(): Unit = {
if (!registered.compareAndSet(false, true)) {
return
}
exclusions.synchronized {
exclusions += this
}
}
def shouldHide(plan: SparkPlan): Boolean
}

/**
* Given a input physical plan, performs the following tasks.
* 1. Generate the two part explain output for this plan.
Expand Down

0 comments on commit 14cef0f

Please sign in to comment.