Skip to content

Commit

Permalink
[GLUTEN-5880][CORE] Ignore fallback for ColumnarWriteFilesExec childr…
Browse files Browse the repository at this point in the history
…en (apache#7113)
  • Loading branch information
wForget authored and shamirchen committed Oct 14, 2024
1 parent 68edcd7 commit 9f3a1fa
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ object ColumnarWriteFilesExec {
staticPartitions)
}

private case class NoopLeaf() extends LeafExecNode {
case class NoopLeaf() extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new GlutenException(s"$nodeName does not support doExecute")
override def output: Seq[Attribute] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}

Expand Down Expand Up @@ -90,6 +92,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case _: ReusedExchangeExec =>
case _: NoopLeaf =>
case w: WriteFilesExec if w.child.isInstanceOf[NoopLeaf] =>
case sub: AdaptiveSparkPlanExec if sub.isSubquery => collect(sub.executedPlan)
case _: AdaptiveSparkPlanExec =>
case p: QueryStageExec => collect(p.plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
import org.apache.spark.sql.execution.GlutenExplainUtils._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -112,6 +114,8 @@ object GlutenImplicits {
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: ReusedExchangeExec =>
case _: NoopLeaf =>
case w: WriteFilesExec if w.child.isInstanceOf[NoopLeaf] =>
case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
collect(p.executedPlan)
case p: AdaptiveSparkPlanExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution}
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, GlutenImplicits, QueryExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -70,8 +70,9 @@ class GlutenInsertSuite
}

testGluten("insert partition table") {
withTable("pt") {
withTable("pt", "pt2") {
spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")
spark.sql("CREATE TABLE pt2 (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")

var taskMetrics: OutputMetrics = null
val taskListener = new SparkListener {
Expand Down Expand Up @@ -111,6 +112,13 @@ class GlutenInsertSuite
spark.sparkContext.removeSparkListener(taskListener)
spark.listenerManager.unregister(queryListener)
}

// check no fallback nodes
val df2 = spark.sql("INSERT INTO TABLE pt2 SELECT * FROM pt")
checkAndGetWriteFiles(df2)
val fallbackSummary = GlutenImplicits
.collectQueryExecutionFallbackSummary(spark, df2.queryExecution)
assert(fallbackSummary.numFallbackNodes == 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.{CommandResultExec, GlutenImplicits, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -72,8 +72,9 @@ class GlutenInsertSuite
}

testGluten("insert partition table") {
withTable("pt") {
withTable("pt", "pt2") {
spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")
spark.sql("CREATE TABLE pt2 (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")

var taskMetrics: OutputMetrics = null
val taskListener = new SparkListener {
Expand Down Expand Up @@ -113,6 +114,13 @@ class GlutenInsertSuite
spark.sparkContext.removeSparkListener(taskListener)
spark.listenerManager.unregister(queryListener)
}

// check no fallback nodes
val df2 = spark.sql("INSERT INTO TABLE pt2 SELECT * FROM pt")
checkWriteFilesAndGetChild(df2)
val fallbackSummary = GlutenImplicits
.collectQueryExecutionFallbackSummary(spark, df2.queryExecution)
assert(fallbackSummary.numFallbackNodes == 0)
}
}

Expand Down

0 comments on commit 9f3a1fa

Please sign in to comment.