Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[GLUTEN-5880][CORE] Ignore fallback for ColumnarWriteFilesExec childr…
Browse files Browse the repository at this point in the history
…en (apache#7113)
wForget authored and hengzhen.sq committed Sep 11, 2024
1 parent 07d6a9a commit fe9185b
Showing 5 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -150,7 +150,7 @@ object ColumnarWriteFilesExec {
staticPartitions)
}

private case class NoopLeaf() extends LeafExecNode {
case class NoopLeaf() extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new GlutenException(s"$nodeName does not support doExecute")
override def output: Seq[Attribute] = Seq.empty
Original file line number Diff line number Diff line change
@@ -24,9 +24,11 @@ import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}

@@ -90,6 +92,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case _: ReusedExchangeExec =>
case _: NoopLeaf =>
case w: WriteFilesExec if w.child.isInstanceOf[NoopLeaf] =>
case sub: AdaptiveSparkPlanExec if sub.isSubquery => collect(sub.executedPlan)
case _: AdaptiveSparkPlanExec =>
case p: QueryStageExec => collect(p.plan)
Original file line number Diff line number Diff line change
@@ -24,10 +24,12 @@ import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
import org.apache.spark.sql.execution.GlutenExplainUtils._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.SQLConf
@@ -112,6 +114,8 @@ object GlutenImplicits {
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: ReusedExchangeExec =>
case _: NoopLeaf =>
case w: WriteFilesExec if w.child.isInstanceOf[NoopLeaf] =>
case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
collect(p.executedPlan)
case p: AdaptiveSparkPlanExec =>
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution}
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, GlutenImplicits, QueryExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -70,8 +70,9 @@ class GlutenInsertSuite
}

testGluten("insert partition table") {
withTable("pt") {
withTable("pt", "pt2") {
spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")
spark.sql("CREATE TABLE pt2 (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")

var taskMetrics: OutputMetrics = null
val taskListener = new SparkListener {
@@ -111,6 +112,13 @@ class GlutenInsertSuite
spark.sparkContext.removeSparkListener(taskListener)
spark.listenerManager.unregister(queryListener)
}

// check no fallback nodes
val df2 = spark.sql("INSERT INTO TABLE pt2 SELECT * FROM pt")
checkAndGetWriteFiles(df2)
val fallbackSummary = GlutenImplicits
.collectQueryExecutionFallbackSummary(spark, df2.queryExecution)
assert(fallbackSummary.numFallbackNodes == 0)
}
}

Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.{CommandResultExec, GlutenImplicits, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -72,8 +72,9 @@ class GlutenInsertSuite
}

testGluten("insert partition table") {
withTable("pt") {
withTable("pt", "pt2") {
spark.sql("CREATE TABLE pt (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")
spark.sql("CREATE TABLE pt2 (c1 int, c2 string) USING PARQUET PARTITIONED BY (pt string)")

var taskMetrics: OutputMetrics = null
val taskListener = new SparkListener {
@@ -113,6 +114,13 @@ class GlutenInsertSuite
spark.sparkContext.removeSparkListener(taskListener)
spark.listenerManager.unregister(queryListener)
}

// check no fallback nodes
val df2 = spark.sql("INSERT INTO TABLE pt2 SELECT * FROM pt")
checkWriteFilesAndGetChild(df2)
val fallbackSummary = GlutenImplicits
.collectQueryExecutionFallbackSummary(spark, df2.queryExecution)
assert(fallbackSummary.numFallbackNodes == 0)
}
}

0 comments on commit fe9185b

Please sign in to comment.