diff --git a/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala b/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala deleted file mode 100644 index 36e93bb1af5f..000000000000 --- a/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.extension - -import io.glutenproject.GlutenConfig -import io.glutenproject.execution.{ProjectExecTransformer, VeloxWholeStageTransformerSuite} - -import org.apache.spark.SparkConf -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper - -class CollapseProjectExecTransformerSuite - extends VeloxWholeStageTransformerSuite - with AdaptiveSparkPlanHelper { - - protected val rootPath: String = getClass.getResource("/").getPath - override protected val backend: String = "velox" - override protected val resourcePath: String = "/tpch-data-parquet-velox" - override protected val fileFormat: String = "parquet" - - override def beforeAll(): Unit = { - super.beforeAll() - createTPCHNotNullTables() - } - - override protected def sparkConf: SparkConf = { - super.sparkConf - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.sql.files.maxPartitionBytes", "1g") - .set("spark.sql.shuffle.partitions", "1") - .set("spark.memory.offHeap.size", "2g") - .set("spark.unsafe.exceptionOnMemoryLeak", "true") - .set("spark.sql.autoBroadcastJoinThreshold", "-1") - } - - test("Support ProjectExecTransformer collapse") { - val query = - """ - |SELECT - | o_orderpriority - |FROM - | orders - |WHERE - | o_orderdate >= '1993-07-01' - | AND EXISTS ( - | SELECT - | * - | FROM - | lineitem - | WHERE - | l_orderkey = o_orderkey - | AND l_commitdate < l_receiptdate - | ) - |ORDER BY - | o_orderpriority - |LIMIT - | 100; - |""".stripMargin - Seq(true, false).foreach { - collapsed => - withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString) { - runQueryAndCompare(query) { - df => - { - assert( - getExecutedPlan(df).exists { - case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true - case _ => false - } == !collapsed - ) - } - } - } - } - } -} diff --git a/ep/build-clickhouse/src/build_clickhouse.sh b/ep/build-clickhouse/src/build_clickhouse.sh index 0a14a86be1aa..1fca38c8116e 100644 --- a/ep/build-clickhouse/src/build_clickhouse.sh +++ b/ep/build-clickhouse/src/build_clickhouse.sh @@ -38,7 +38,7 @@ done echo ${GLUTEN_SOURCE} -export CC=${CC:-clang-15} -export CXX=${CXX:-clang++-15} +export CC=${CC:-clang-16} +export CXX=${CXX:-clang++-16} cmake -G Ninja -S ${GLUTEN_SOURCE}/cpp-ch -B ${GLUTEN_SOURCE}/cpp-ch/build_ch -DCH_SOURCE_DIR=${CH_SOURCE_DIR} "-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}" cmake --build ${GLUTEN_SOURCE}/cpp-ch/build_ch --target build_ch diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectExecTransformer.scala similarity index 94% rename from gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala rename to gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectExecTransformer.scala index 222e738b80fc..6087fe7a2cf0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectExecTransformer.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan -object CollapseProjectTransformer extends Rule[SparkPlan] { +object CollapseProjectExecTransformer extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { if (!GlutenConfig.getConf.enableColumnarProjectCollapse) { @@ -51,7 +51,7 @@ object CollapseProjectTransformer extends Rule[SparkPlan] { /** * In Velox, CreateNamedStruct will generate a special output named obj, We cannot collapse such - * ProjectTransformer, otherwise it will result in a bind reference failure. + * project transformer, otherwise it will result in a bind reference failure. */ private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = { projectList.exists { diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index bcce1a8d8153..844a64b92919 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -798,7 +798,7 @@ case class ColumnarOverrideRules(session: SparkSession) (_: SparkSession) => TransformPreOverrides(isAdaptiveContext), (spark: SparkSession) => RewriteTransformer(spark), (_: SparkSession) => EnsureLocalSortRequirements, - (_: SparkSession) => CollapseProjectTransformer + (_: SparkSession) => CollapseProjectExecTransformer ) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() ::: SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules) diff --git a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala index 98d12f5a3b70..d87e57622ca2 100644 --- a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala +++ b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.util.{sideBySide, stackTraceToString} -import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.commons.io.FileUtils import org.apache.commons.math3.util.Precision @@ -75,6 +76,52 @@ trait GlutenSQLTestsTrait extends QueryTest with GlutenSQLTestsBaseTrait { GlutenQueryTest.checkAnswer(analyzedDF, expectedAnswer) } + + /** + * Get all the children plan of plans. + * + * @param plans + * : the input plans. + * @return + */ + def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { + if (plans.isEmpty) { + return Seq() + } + + val inputPlans: Seq[SparkPlan] = plans.map { + case stage: ShuffleQueryStageExec => stage.plan + case plan => plan + } + + var newChildren: Seq[SparkPlan] = Seq() + inputPlans.foreach { + plan => + newChildren = newChildren ++ getChildrenPlan(plan.children) + // To avoid duplication of WholeStageCodegenXXX and its children. + if (!plan.nodeName.startsWith("WholeStageCodegen")) { + newChildren = newChildren :+ plan + } + } + newChildren + } + + /** + * Get the executed plan of a data frame. + * + * @param df + * : dataframe. + * @return + * A sequence of executed plans. + */ + def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { + df.queryExecution.executedPlan match { + case exec: AdaptiveSparkPlanExec => + getChildrenPlan(Seq(exec.executedPlan)) + case plan => + getChildrenPlan(Seq(plan)) + } + } } object GlutenQueryTest extends Assertions { diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 5691341aa238..7d14cd990b97 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} @@ -1080,6 +1080,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-33687: analyze all tables in a specific database") enableSuite[FallbackStrategiesSuite] enableSuite[GlutenHiveSQLQuerySuite] + enableSuite[GlutenCollapseProjectExecTransformerSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala new file mode 100644 index 000000000000..6709e2a91fec --- /dev/null +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { + + import testImplicits._ + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_shippriority >= 0 + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_linenumber < 10 + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + + val ordersData = Seq[(Int, Int, String)]( + (30340, 1, "3-MEDIUM"), + (31140, 1, "1-URGENT"), + (31940, 1, "2-HIGH"), + (32740, 1, "3-MEDIUM"), + (33540, 1, "5-LOW"), + (34340, 1, "2-HIGH"), + (35140, 1, "3-MEDIUM"), + (35940, 1, "1-URGENT"), + (36740, 1, "3-MEDIUM"), + (37540, 1, "4-NOT SPECIFIED") + ) + val lineitemData = Seq[(Int, Int, String)]( + (30340, 1, "F"), + (31140, 4, "F"), + (31940, 7, "O"), + (32740, 6, "O"), + (33540, 2, "F"), + (34340, 3, "F"), + (35140, 1, "O"), + (35940, 2, "F"), + (36740, 3, "F"), + (37540, 5, "O") + ) + withTable("orders", "lineitem") { + ordersData + .toDF("o_orderkey", "o_shippriority", "o_orderpriority") + .write + .format("parquet") + .saveAsTable("orders") + lineitemData + .toDF("l_orderkey", "l_linenumber", "l_linestatus") + .write + .format("parquet") + .saveAsTable("lineitem") + Seq(true, false).foreach { + collapsed => + withSQLConf( + GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = sql(query) + checkAnswer( + df, + Seq( + Row("1-URGENT"), + Row("1-URGENT"), + Row("2-HIGH"), + Row("2-HIGH"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("4-NOT SPECIFIED"), + Row("5-LOW") + ) + ) + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } == !collapsed + ) + } + } + } + } +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index aafb27cef469..59f450297993 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -98,44 +96,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait { * : the input plans. * @return */ - def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { - if (plans.isEmpty) { - return Seq() - } - - val inputPlans: Seq[SparkPlan] = plans.map { - case stage: ShuffleQueryStageExec => stage.plan - case plan => plan - } - - var newChildren: Seq[SparkPlan] = Seq() - inputPlans.foreach { - plan => - newChildren = newChildren ++ getChildrenPlan(plan.children) - // To avoid duplication of WholeStageCodegenXXX and its children. - if (!plan.nodeName.startsWith("WholeStageCodegen")) { - newChildren = newChildren :+ plan - } - } - newChildren - } - - /** - * Get the executed plan of a data frame. - * - * @param df - * : dataframe. - * @return - * A sequence of executed plans. - */ - def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { - df.queryExecution.executedPlan match { - case exec: AdaptiveSparkPlanExec => - getChildrenPlan(Seq(exec.executedPlan)) - case plan => - getChildrenPlan(Seq(plan)) - } - } def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { val executedPlan = getExecutedPlan(df) diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index c323efa99140..46ee5666ba8a 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} @@ -1152,6 +1152,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFallbackSuite] enableSuite[GlutenHiveSQLQuerySuite] enableSuite[GlutenImplicitsTest] + enableSuite[GlutenCollapseProjectExecTransformerSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala new file mode 100644 index 000000000000..6709e2a91fec --- /dev/null +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { + + import testImplicits._ + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_shippriority >= 0 + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_linenumber < 10 + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + + val ordersData = Seq[(Int, Int, String)]( + (30340, 1, "3-MEDIUM"), + (31140, 1, "1-URGENT"), + (31940, 1, "2-HIGH"), + (32740, 1, "3-MEDIUM"), + (33540, 1, "5-LOW"), + (34340, 1, "2-HIGH"), + (35140, 1, "3-MEDIUM"), + (35940, 1, "1-URGENT"), + (36740, 1, "3-MEDIUM"), + (37540, 1, "4-NOT SPECIFIED") + ) + val lineitemData = Seq[(Int, Int, String)]( + (30340, 1, "F"), + (31140, 4, "F"), + (31940, 7, "O"), + (32740, 6, "O"), + (33540, 2, "F"), + (34340, 3, "F"), + (35140, 1, "O"), + (35940, 2, "F"), + (36740, 3, "F"), + (37540, 5, "O") + ) + withTable("orders", "lineitem") { + ordersData + .toDF("o_orderkey", "o_shippriority", "o_orderpriority") + .write + .format("parquet") + .saveAsTable("orders") + lineitemData + .toDF("l_orderkey", "l_linenumber", "l_linestatus") + .write + .format("parquet") + .saveAsTable("lineitem") + Seq(true, false).foreach { + collapsed => + withSQLConf( + GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = sql(query) + checkAnswer( + df, + Seq( + Row("1-URGENT"), + Row("1-URGENT"), + Row("2-HIGH"), + Row("2-HIGH"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("4-NOT SPECIFIED"), + Row("5-LOW") + ) + ) + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } == !collapsed + ) + } + } + } + } +} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index 90d978b0d2e5..9bf4f453c2d5 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -91,52 +89,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait { conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } - /** - * Get all the children plan of plans. - * - * @param plans - * : the input plans. - * @return - */ - def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { - if (plans.isEmpty) { - return Seq() - } - - val inputPlans: Seq[SparkPlan] = plans.map { - case stage: ShuffleQueryStageExec => stage.plan - case plan => plan - } - - var newChildren: Seq[SparkPlan] = Seq() - inputPlans.foreach { - plan => - newChildren = newChildren ++ getChildrenPlan(plan.children) - // To avoid duplication of WholeStageCodegenXXX and its children. - if (!plan.nodeName.startsWith("WholeStageCodegen")) { - newChildren = newChildren :+ plan - } - } - newChildren - } - - /** - * Get the executed plan of a data frame. - * - * @param df - * : dataframe. - * @return - * A sequence of executed plans. - */ - def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { - df.queryExecution.executedPlan match { - case exec: AdaptiveSparkPlanExec => - getChildrenPlan(Seq(exec.executedPlan)) - case plan => - getChildrenPlan(Seq(plan)) - } - } - def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { val executedPlan = getExecutedPlan(df) assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass)) diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 40b5e4039910..3096ee5595ad 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} @@ -1203,6 +1203,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenXPathFunctionsSuite] enableSuite[GlutenFallbackSuite] enableSuite[GlutenHiveSQLQuerySuite] + enableSuite[GlutenCollapseProjectExecTransformerSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala new file mode 100644 index 000000000000..6709e2a91fec --- /dev/null +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { + + import testImplicits._ + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_shippriority >= 0 + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_linenumber < 10 + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + + val ordersData = Seq[(Int, Int, String)]( + (30340, 1, "3-MEDIUM"), + (31140, 1, "1-URGENT"), + (31940, 1, "2-HIGH"), + (32740, 1, "3-MEDIUM"), + (33540, 1, "5-LOW"), + (34340, 1, "2-HIGH"), + (35140, 1, "3-MEDIUM"), + (35940, 1, "1-URGENT"), + (36740, 1, "3-MEDIUM"), + (37540, 1, "4-NOT SPECIFIED") + ) + val lineitemData = Seq[(Int, Int, String)]( + (30340, 1, "F"), + (31140, 4, "F"), + (31940, 7, "O"), + (32740, 6, "O"), + (33540, 2, "F"), + (34340, 3, "F"), + (35140, 1, "O"), + (35940, 2, "F"), + (36740, 3, "F"), + (37540, 5, "O") + ) + withTable("orders", "lineitem") { + ordersData + .toDF("o_orderkey", "o_shippriority", "o_orderpriority") + .write + .format("parquet") + .saveAsTable("orders") + lineitemData + .toDF("l_orderkey", "l_linenumber", "l_linestatus") + .write + .format("parquet") + .saveAsTable("lineitem") + Seq(true, false).foreach { + collapsed => + withSQLConf( + GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = sql(query) + checkAnswer( + df, + Seq( + Row("1-URGENT"), + Row("1-URGENT"), + Row("2-HIGH"), + Row("2-HIGH"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("4-NOT SPECIFIED"), + Row("5-LOW") + ) + ) + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } == !collapsed + ) + } + } + } + } +} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index d82e84d50d96..802d344634d1 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -91,52 +89,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait { conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } - /** - * Get all the children plan of plans. - * - * @param plans - * : the input plans. - * @return - */ - def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { - if (plans.isEmpty) { - return Seq() - } - - val inputPlans: Seq[SparkPlan] = plans.map { - case stage: ShuffleQueryStageExec => stage.plan - case plan => plan - } - - var newChildren: Seq[SparkPlan] = Seq() - inputPlans.foreach { - plan => - newChildren = newChildren ++ getChildrenPlan(plan.children) - // To avoid duplication of WholeStageCodegenXXX and its children. - if (!plan.nodeName.startsWith("WholeStageCodegen")) { - newChildren = newChildren :+ plan - } - } - newChildren - } - - /** - * Get the executed plan of a data frame. - * - * @param df - * : dataframe. - * @return - * A sequence of executed plans. - */ - def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { - df.queryExecution.executedPlan match { - case exec: AdaptiveSparkPlanExec => - getChildrenPlan(Seq(exec.executedPlan)) - case plan => - getChildrenPlan(Seq(plan)) - } - } - def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { val executedPlan = getExecutedPlan(df) assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))