diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala index f8f596dcee1af..92dd74796caac 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala @@ -150,7 +150,7 @@ object ColumnarWriteFilesExec { staticPartitions) } - private case class NoopLeaf() extends LeafExecNode { + case class NoopLeaf() extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new GlutenException(s"$nodeName does not support doExecute") override def output: Seq[Attribute] = Seq.empty diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala index 338136b6d8704..43b74c883671e 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala @@ -24,9 +24,11 @@ import org.apache.gluten.utils.PlanUtil import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} @@ -90,6 +92,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { case _: ColumnarToRowTransition => case _: RowToColumnarTransition => case _: ReusedExchangeExec => + case _: NoopLeaf => + case w: WriteFilesExec if w.child.isInstanceOf[NoopLeaf] => case sub: AdaptiveSparkPlanExec if sub.isSubquery => collect(sub.executedPlan) case _: AdaptiveSparkPlanExec => case p: QueryStageExec => collect(p.plan) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala index 2e2af6517d9c2..53b1e861bb2bc 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala @@ -24,10 +24,12 @@ import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat +import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf import org.apache.spark.sql.execution.GlutenExplainUtils._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.internal.SQLConf @@ -112,6 +114,8 @@ object GlutenImplicits { case _: ColumnarToRowTransition => case _: RowToColumnarTransition => case p: ReusedExchangeExec => + case _: NoopLeaf => + case w: WriteFilesExec if w.child.isInstanceOf[NoopLeaf] => case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) => collect(p.executedPlan) case p: AdaptiveSparkPlanExec => diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 3c334511accf2..74c4df1977590 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution} +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, GlutenImplicits, QueryExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -70,8 +70,9 @@ class GlutenInsertSuite } testGluten("insert partition table") { - withTable("pt") { + withTable("pt", "pt2") { spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)") + spark.sql("CREATE TABLE pt2 (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)") var taskMetrics: OutputMetrics = null val taskListener = new SparkListener { @@ -111,6 +112,13 @@ class GlutenInsertSuite spark.sparkContext.removeSparkListener(taskListener) spark.listenerManager.unregister(queryListener) } + + // check no fallback nodes + val df2 = spark.sql("INSERT INTO TABLE pt2 SELECT * FROM pt") + checkAndGetWriteFiles(df2) + val fallbackSummary = GlutenImplicits + .collectQueryExecutionFallbackSummary(spark, df2.queryExecution) + assert(fallbackSummary.numFallbackNodes == 0) } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 3d9d8842f3990..1cb905e10abf5 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SparkPlan} +import org.apache.spark.sql.execution.{CommandResultExec, GlutenImplicits, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -72,8 +72,9 @@ class GlutenInsertSuite } testGluten("insert partition table") { - withTable("pt") { + withTable("pt", "pt2") { spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)") + spark.sql("CREATE TABLE pt2 (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)") var taskMetrics: OutputMetrics = null val taskListener = new SparkListener { @@ -113,6 +114,13 @@ class GlutenInsertSuite spark.sparkContext.removeSparkListener(taskListener) spark.listenerManager.unregister(queryListener) } + + // check no fallback nodes + val df2 = spark.sql("INSERT INTO TABLE pt2 SELECT * FROM pt") + checkWriteFilesAndGetChild(df2) + val fallbackSummary = GlutenImplicits + .collectQueryExecutionFallbackSummary(spark, df2.queryExecution) + assert(fallbackSummary.numFallbackNodes == 0) } }