diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index ffe4e63dd784..8a30df2fdc37 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -785,3 +785,17 @@ If you want to disable Gluten UI, add a config when submitting `--conf spark.glu ## History server Gluten UI also supports Spark history server. Add gluten-ui jar into the history server classpath, e.g., $SPARK_HOME/jars, then restart history server. + +# Gluten Implicits + +Gluten provides a helper class to get the fallback summary from a Spark Dataset. + +``` +import org.apache.spark.sql.execution.GlutenImplicits._ +val df = spark.sql("SELECT * FROM t") +df.fallbackSummary +``` + +Note that, if AQE is enabled, but the query is not materialized, then it will re-plan +the query execution with disabled AQE. It is a workaround to get the final plan, and it may +cause the inconsistent results with a materialized query. However, we have no choice. diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala index 8d22624919c2..61938e263c5d 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala @@ -17,11 +17,11 @@ package org.apache.spark.shuffle import io.glutenproject.GlutenConfig +import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import io.glutenproject.memory.alloc.CHNativeMemoryAllocators import io.glutenproject.memory.memtarget.MemoryTarget import io.glutenproject.memory.memtarget.Spiller import io.glutenproject.vectorized._ -import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import org.apache.spark._ import org.apache.spark.scheduler.MapStatus diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala index e281d7f36f82..e6eaac2a1c4d 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala @@ -23,8 +23,10 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution.GlutenFallbackReporter.FALLBACK_REASON_TAG -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} import java.util.{IdentityHashMap, Set} @@ -38,25 +40,52 @@ import scala.collection.mutable.{ArrayBuffer, BitSet} // 2. remove `plan.verboseStringWithOperatorId` // 3. remove codegen id object GlutenExplainUtils extends AdaptiveSparkPlanHelper { - private def collectFallbackNodes(plan: QueryPlan[_]): (Int, Map[String, String]) = { - var numGlutenNodes = 0 - val fallbackNodeToReason = new mutable.HashMap[String, String] + type FallbackInfo = (Int, Map[String, String]) - def addFallbackNodeWithReason(p: SparkPlan, reason: String): Unit = { - p.getTagValue(QueryPlan.OP_ID_TAG).foreach { - opId => - // e.g., 002 project, it is used to help analysis by `substring(4)` - val formattedNodeName = f"$opId%03d ${p.nodeName}" - fallbackNodeToReason.put(formattedNodeName, reason) - } + def addFallbackNodeWithReason( + p: SparkPlan, + reason: String, + fallbackNodeToReason: mutable.HashMap[String, String]): Unit = { + p.getTagValue(QueryPlan.OP_ID_TAG).foreach { + opId => + // e.g., 002 project, it is used to help analysis by `substring(4)` + val formattedNodeName = f"$opId%03d ${p.nodeName}" + fallbackNodeToReason.put(formattedNodeName, reason) } + } + + def handleVanillaSparkPlan( + p: SparkPlan, + fallbackNodeToReason: mutable.HashMap[String, String] + ): Unit = { + p.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match { + case Some(reason) => addFallbackNodeWithReason(p, reason, fallbackNodeToReason) + case _ => + // If the SparkPlan does not have fallback reason, then there are two options: + // 1. Gluten ignore that plan and it's a kind of fallback + // 2. Gluten does not support it without the fallback reason + addFallbackNodeWithReason( + p, + "Gluten does not touch it or does not support it", + fallbackNodeToReason) + } + } + + private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = { + var numGlutenNodes = 0 + val fallbackNodeToReason = new mutable.HashMap[String, String] def collect(tmp: QueryPlan[_]): Unit = { tmp.foreachUp { + case _: ExecutedCommandExec => + case _: CommandResultExec => + case _: V2CommandExec => + case _: DataWritingCommandExec => case _: WholeStageCodegenExec => case _: WholeStageTransformer => case _: InputAdapter => - case p: AdaptiveSparkPlanExec => collect(p.executedPlan) + case _: ColumnarToRowTransition => + case _: RowToColumnarTransition => case p: QueryStageExec => collect(p.plan) case p: GlutenPlan => numGlutenNodes += 1 @@ -65,17 +94,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { if (InMemoryTableScanHelper.isGlutenTableCache(i)) { numGlutenNodes += 1 } else { - addFallbackNodeWithReason(i, "Columnar table cache is disabled") + addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason) } + case _: AQEShuffleReadExec => // Ignore case p: SparkPlan => - p.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match { - case Some(reason) => addFallbackNodeWithReason(p, reason) - case _ => - // If the SparkPlan does not have fallback reason, then there are two options: - // 1. Gluten ignore that plan and it's a kind of fallback - // 2. Gluten does not support it without the fallback reason - addFallbackNodeWithReason(p, "Gluten does not touch it or does not support it") - } + handleVanillaSparkPlan(p, fallbackNodeToReason) p.innerChildren.foreach(collect) case _ => } @@ -120,7 +143,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { */ def processPlan[T <: QueryPlan[T]]( plan: T, - append: String => Unit): (Int, Map[String, String]) = { + append: String => Unit, + collectFallbackFunc: Option[QueryPlan[_] => FallbackInfo] = None): FallbackInfo = { try { // Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow // intentional overwriting of IDs generated in previous AQE iteration @@ -186,7 +210,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { append("\n") } - collectFallbackNodes(plan) + if (collectFallbackFunc.isEmpty) { + collectFallbackNodes(plan) + } else { + collectFallbackFunc.get.apply(plan) + } } finally { removeTags(plan) } @@ -310,11 +338,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper { * Returns the operator identifier for the supplied plan by retrieving the `operationId` tag * value. */ - def getOpId(plan: QueryPlan[_]): String = { + private def getOpId(plan: QueryPlan[_]): String = { plan.getTagValue(QueryPlan.OP_ID_TAG).map(v => s"$v").getOrElse("unknown") } - def removeTags(plan: QueryPlan[_]): Unit = { + private def removeTags(plan: QueryPlan[_]): Unit = { def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { p.unsetTagValue(QueryPlan.OP_ID_TAG) children.foreach(removeTags) diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala new file mode 100644 index 000000000000..f79266706bec --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import io.glutenproject.execution.WholeStageTransformer +import io.glutenproject.extension.{GlutenPlan, InMemoryTableScanHelper} + +import org.apache.spark.sql.{AnalysisException, Dataset} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan} +import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat +import org.apache.spark.sql.execution.GlutenExplainUtils._ +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec +import org.apache.spark.sql.internal.SQLConf + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +// spotless:off +/** + * A helper class to get the Gluten fallback summary from a Spark [[Dataset]]. + * + * Note that, if AQE is enabled, but the query is not materialized, then this method will re-plan + * the query execution with disabled AQE. It is a workaround to get the final plan, and it may + * cause the inconsistent results with a materialized query. However, we have no choice. + * + * For example: + * + * {{{ + * import org.apache.spark.sql.execution.GlutenImplicits._ + * val df = spark.sql("SELECT * FROM t") + * df.fallbackSummary + * }}} + */ +// spotless:on +object GlutenImplicits { + + case class FallbackSummary( + numGlutenNodes: Int, + numFallbackNodes: Int, + physicalPlanDescription: Seq[String], + fallbackNodeToReason: Seq[Map[String, String]]) {} + + private[sql] def withSQLConf[T](pairs: (String, String)*)(f: => T): T = { + val conf = SQLConf.get + val (keys, values) = pairs.unzip + val currentValues = keys.map { + key => + if (conf.contains(key)) { + Some(conf.getConfString(key)) + } else { + None + } + } + keys.zip(values).foreach { + case (k, v) => + if (SQLConf.isStaticConfigKey(k)) { + throw new AnalysisException(s"Cannot modify the value of a static config: $k") + } + conf.setConfString(k, v) + } + try f + finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => conf.setConfString(key, value) + case (key, None) => conf.unsetConf(key) + } + } + } + + implicit class DatasetTransformer[T](dateset: Dataset[T]) { + private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = { + val args = p.argString(Int.MaxValue) + val index = args.indexOf("isFinalPlan=") + assert(index >= 0) + args.substring(index + "isFinalPlan=".length).trim.toBoolean + } + + private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = { + var numGlutenNodes = 0 + val fallbackNodeToReason = new mutable.HashMap[String, String] + + def collect(tmp: QueryPlan[_]): Unit = { + tmp.foreachUp { + case _: ExecutedCommandExec => + case _: CommandResultExec => + case _: V2CommandExec => + case _: DataWritingCommandExec => + case _: WholeStageCodegenExec => + case _: WholeStageTransformer => + case _: InputAdapter => + case _: ColumnarToRowTransition => + case _: RowToColumnarTransition => + case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) => + collect(p.executedPlan) + case p: AdaptiveSparkPlanExec => + // if we are here that means we are inside table cache. + val (innerNumGlutenNodes, innerFallbackNodeToReason) = + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + // re-plan manually to skip cached data + val newSparkPlan = QueryExecution.createSparkPlan( + dateset.sparkSession, + dateset.sparkSession.sessionState.planner, + p.inputPlan.logicalLink.get) + val newExecutedPlan = QueryExecution.prepareExecutedPlan( + dateset.sparkSession, + newSparkPlan + ) + processPlan( + newExecutedPlan, + new PlanStringConcat().append, + Some(plan => collectFallbackNodes(plan))) + } + numGlutenNodes += innerNumGlutenNodes + fallbackNodeToReason.++=(innerFallbackNodeToReason) + case p: QueryStageExec => collect(p.plan) + case p: GlutenPlan => + numGlutenNodes += 1 + p.innerChildren.foreach(collect) + case i: InMemoryTableScanExec => + if (InMemoryTableScanHelper.isGlutenTableCache(i)) { + numGlutenNodes += 1 + } else { + addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason) + } + collect(i.relation.cachedPlan) + case _: AQEShuffleReadExec => // Ignore + case p: SparkPlan => + handleVanillaSparkPlan(p, fallbackNodeToReason) + p.innerChildren.foreach(collect) + case _ => + } + } + + collect(plan) + (numGlutenNodes, fallbackNodeToReason.toMap) + } + + private def collectQueryExecutionFallbackSummary(qe: QueryExecution): FallbackSummary = { + var totalNumGlutenNodes = 0 + var totalNumFallbackNodes = 0 + val totalPhysicalPlanDescription = new ArrayBuffer[String]() + val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]() + + def handlePlanWithAQEAndTableCache( + plan: SparkPlan, + logicalPlan: LogicalPlan, + isMaterialized: Boolean): Unit = { + val concat = new PlanStringConcat() + val collectFallbackFunc = Some(plan => collectFallbackNodes(plan)) + val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + // AQE is not materialized, so the columnar rules are not applied. + // For this case, We apply columnar rules manually with disable AQE. + val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan) + processPlan(qe.executedPlan, concat.append, collectFallbackFunc) + } + } else { + processPlan(plan, concat.append, collectFallbackFunc) + } + totalNumGlutenNodes += numGlutenNodes + totalNumFallbackNodes += fallbackNodeToReason.size + totalPhysicalPlanDescription.append(concat.toString()) + totalFallbackNodeToReason.append(fallbackNodeToReason) + } + + // For command-like query, e.g., `INSERT INTO TABLE ...` + qe.commandExecuted.foreach { + case r: CommandResult => + handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, r.commandLogicalPlan, true) + case _ => // ignore + } + + // For query, e.g., `SELECT * FROM ...` + if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) { + val isMaterialized = qe.executedPlan.find { + case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true + case _ => false + }.isDefined + handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, isMaterialized) + } + + FallbackSummary( + totalNumGlutenNodes, + totalNumFallbackNodes, + totalPhysicalPlanDescription, + totalFallbackNodeToReason + ) + } + + def fallbackSummary(): FallbackSummary = { + collectQueryExecutionFallbackSummary(dateset.queryExecution) + } + } +} diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index e0d825c9578c..a2a747c4a183 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -2014,6 +2014,9 @@ class ClickHouseTestSettings extends BackendTestSettings { "SELECT structFieldSimple.key, arrayFieldSimple[1] FROM tableWithSchema a where int_Field=1") .exclude("SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema") enableSuite[SparkFunctionStatistics] - + enableSuite[GlutenImplicitsTest] + .exclude("fallbackSummary with shuffle") + .exclude("fallbackSummary with cache") + .exclude("fallbackSummary with cached data and shuffle") } // scalastyle:on line.size.limit diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index afd8e127e93f..7d5c2f7588e5 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -1164,5 +1164,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenHiveSQLQuerySuite] // ReaderFactory is not registered for format orc. .exclude("hive orc scan") + enableSuite[GlutenImplicitsTest] } // scalastyle:on line.size.limit diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenImplicitsTest.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenImplicitsTest.scala new file mode 100644 index 000000000000..63348295feab --- /dev/null +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenImplicitsTest.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.GlutenImplicits._ +import org.apache.spark.sql.internal.SQLConf + +class GlutenImplicitsTest extends GlutenSQLTestsBaseTrait { + sys.props.put("spark.gluten.sql.columnar.tableCache", "true") + + override protected def beforeAll(): Unit = { + super.beforeAll() + spark + .range(10) + .selectExpr("id as c1", "id % 3 as c2") + .write + .format("parquet") + .saveAsTable("t1") + } + + override protected def afterAll(): Unit = { + spark.sql("drop table t1") + super.afterAll() + } + + override protected def afterEach(): Unit = { + spark.catalog.clearCache() + super.afterEach() + } + + override def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.shuffle.partitions", "5") + } + + private def withAQEEnabledAndDisabled(f: => Unit): Unit = { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true", + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true" + ) { + f + } + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "false", + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false" + ) { + f + } + } + + test("fallbackSummary with query") { + withAQEEnabledAndDisabled { + val df = spark.table("t1").filter(_.getLong(0) > 0) + assert(df.fallbackSummary().numGlutenNodes == 1, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + df.collect() + assert(df.fallbackSummary().numGlutenNodes == 1, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + } + } + + test("fallbackSummary with shuffle") { + withAQEEnabledAndDisabled { + val df = spark.sql("SELECT c2 FROM t1 group by c2").filter(_.getLong(0) > 0) + assert(df.fallbackSummary().numGlutenNodes == 5, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + df.collect() + assert(df.fallbackSummary().numGlutenNodes == 5, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + } + } + + test("fallbackSummary with set command") { + withAQEEnabledAndDisabled { + val df = spark.sql("set k=v") + assert(df.fallbackSummary().numGlutenNodes == 0, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 0, df.fallbackSummary()) + } + } + + test("fallbackSummary with data write command") { + withAQEEnabledAndDisabled { + withTable("tmp") { + val df = spark.sql("create table tmp using parquet as select * from t1") + assert(df.fallbackSummary().numGlutenNodes == 1, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 0, df.fallbackSummary()) + } + } + } + + test("fallbackSummary with cache") { + withAQEEnabledAndDisabled { + val df = spark.table("t1").cache().filter(_.getLong(0) > 0) + assert(df.fallbackSummary().numGlutenNodes == 2, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + df.collect() + assert(df.fallbackSummary().numGlutenNodes == 2, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + } + } + + test("fallbackSummary with cached data and shuffle") { + withAQEEnabledAndDisabled { + val df = spark.sql("select * from t1").filter(_.getLong(0) > 0).cache.repartition() + assert(df.fallbackSummary().numGlutenNodes == 3, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + df.collect() + assert(df.fallbackSummary().numGlutenNodes == 3, df.fallbackSummary()) + assert(df.fallbackSummary().numFallbackNodes == 1, df.fallbackSummary()) + } + } +} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 62b095482826..8a2acc9db1e6 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -74,7 +74,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait { val id = runExecution("SELECT * FROM t") val execution = glutenStore.execution(id) assert(execution.isDefined) - assert(execution.get.numGlutenNodes == 2) + assert(execution.get.numGlutenNodes == 1) assert(execution.get.numFallbackNodes == 0) assert(execution.get.fallbackNodeToReason.isEmpty) @@ -83,7 +83,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait { val execution = glutenStore.execution(id) assert(execution.isDefined) assert(execution.get.numGlutenNodes == 0) - assert(execution.get.numFallbackNodes == 2) + assert(execution.get.numFallbackNodes == 1) val fallbackReason = execution.get.fallbackNodeToReason.head assert(fallbackReason._1.contains("Scan parquet default.t")) assert(fallbackReason._2.contains("columnar FileScan is not enabled in FileSourceScanExec"))