diff --git a/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json b/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json index 068c8ccfa3ba..8ea9fe28b31f 100644 --- a/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json +++ b/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json @@ -1,5 +1,5 @@ [{ - "id": 6, + "id": 5, "name": "kProject", "time": 4, "input_wait_time": 7736, @@ -16,24 +16,6 @@ "input_bytes": 14308 }] }] -}, { - "id": 5, - "name": "kProject", - "time": 6, - "input_wait_time": 7725, - "output_wait_time": 48, - "steps": [{ - "name": "Expression", - "description": "Project", - "processors": [{ - "name": "ExpressionTransform", - "time": 6, - "output_rows": 292, - "output_bytes": 14308, - "input_rows": 292, - "input_bytes": 16644 - }] - }] }, { "id": 4, "name": "kProject", diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index 91fbc57b6c08..13b2d00aaf15 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -207,14 +207,14 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g } - val scanPlan = allGlutenPlans(10) + val scanPlan = allGlutenPlans(9) assert(scanPlan.metrics("scanTime").value == 2) assert(scanPlan.metrics("inputWaitTime").value == 3) assert(scanPlan.metrics("outputWaitTime").value == 1) assert(scanPlan.metrics("outputRows").value == 80000) assert(scanPlan.metrics("outputBytes").value == 2160000) - val filterPlan = allGlutenPlans(9) + val filterPlan = allGlutenPlans(8) assert(filterPlan.metrics("totalTime").value == 1) assert(filterPlan.metrics("inputWaitTime").value == 13) assert(filterPlan.metrics("outputWaitTime").value == 1) @@ -223,7 +223,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite assert(filterPlan.metrics("inputRows").value == 80000) assert(filterPlan.metrics("inputBytes").value == 2160000) - val joinPlan = allGlutenPlans(3) + val joinPlan = allGlutenPlans(2) assert(joinPlan.metrics("totalTime").value == 1) assert(joinPlan.metrics("inputWaitTime").value == 6) assert(joinPlan.metrics("outputWaitTime").value == 0) @@ -244,7 +244,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g } - assert(allGlutenPlans.size == 58) + assert(allGlutenPlans.size == 57) val shjPlan = allGlutenPlans(8) assert(shjPlan.metrics("totalTime").value == 6) diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectExecTransformer.scala new file mode 100644 index 000000000000..6087fe7a2cf0 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectExecTransformer.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.catalyst.expressions.{Alias, CreateNamedStruct, NamedExpression} +import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan + +object CollapseProjectExecTransformer extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!GlutenConfig.getConf.enableColumnarProjectCollapse) { + return plan + } + plan.transformUp { + case p1 @ ProjectExecTransformer(_, p2: ProjectExecTransformer) + if !containsNamedStructAlias(p2.projectList) + && CollapseProjectShim.canCollapseExpressions( + p1.projectList, + p2.projectList, + alwaysInline = false) => + val collapsedProject = p2.copy(projectList = + CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList)) + val validationResult = collapsedProject.doValidate() + if (validationResult.isValid) { + logDebug(s"Collapse project $p1 and $p2.") + collapsedProject + } else { + plan + } + } + } + + /** + * In Velox, CreateNamedStruct will generate a special output named obj, We cannot collapse such + * project transformer, otherwise it will result in a bind reference failure. + */ + private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = { + projectList.exists { + case _ @Alias(_: CreateNamedStruct, _) => true + case _ => false + } + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 0bd693aa1349..844a64b92919 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -797,7 +797,8 @@ case class ColumnarOverrideRules(session: SparkSession) (_: SparkSession) => AddTransformHintRule(), (_: SparkSession) => TransformPreOverrides(isAdaptiveContext), (spark: SparkSession) => RewriteTransformer(spark), - (_: SparkSession) => EnsureLocalSortRequirements + (_: SparkSession) => EnsureLocalSortRequirements, + (_: SparkSession) => CollapseProjectExecTransformer ) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() ::: SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules) diff --git a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala index 98d12f5a3b70..d87e57622ca2 100644 --- a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala +++ b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsTrait.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.util.{sideBySide, stackTraceToString} -import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.commons.io.FileUtils import org.apache.commons.math3.util.Precision @@ -75,6 +76,52 @@ trait GlutenSQLTestsTrait extends QueryTest with GlutenSQLTestsBaseTrait { GlutenQueryTest.checkAnswer(analyzedDF, expectedAnswer) } + + /** + * Get all the children plan of plans. + * + * @param plans + * : the input plans. + * @return + */ + def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { + if (plans.isEmpty) { + return Seq() + } + + val inputPlans: Seq[SparkPlan] = plans.map { + case stage: ShuffleQueryStageExec => stage.plan + case plan => plan + } + + var newChildren: Seq[SparkPlan] = Seq() + inputPlans.foreach { + plan => + newChildren = newChildren ++ getChildrenPlan(plan.children) + // To avoid duplication of WholeStageCodegenXXX and its children. + if (!plan.nodeName.startsWith("WholeStageCodegen")) { + newChildren = newChildren :+ plan + } + } + newChildren + } + + /** + * Get the executed plan of a data frame. + * + * @param df + * : dataframe. + * @return + * A sequence of executed plans. + */ + def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { + df.queryExecution.executedPlan match { + case exec: AdaptiveSparkPlanExec => + getChildrenPlan(Seq(exec.executedPlan)) + case plan => + getChildrenPlan(Seq(plan)) + } + } } object GlutenQueryTest extends Assertions { diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 5691341aa238..7d14cd990b97 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} @@ -1080,6 +1080,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-33687: analyze all tables in a specific database") enableSuite[FallbackStrategiesSuite] enableSuite[GlutenHiveSQLQuerySuite] + enableSuite[GlutenCollapseProjectExecTransformerSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala new file mode 100644 index 000000000000..6709e2a91fec --- /dev/null +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { + + import testImplicits._ + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_shippriority >= 0 + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_linenumber < 10 + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + + val ordersData = Seq[(Int, Int, String)]( + (30340, 1, "3-MEDIUM"), + (31140, 1, "1-URGENT"), + (31940, 1, "2-HIGH"), + (32740, 1, "3-MEDIUM"), + (33540, 1, "5-LOW"), + (34340, 1, "2-HIGH"), + (35140, 1, "3-MEDIUM"), + (35940, 1, "1-URGENT"), + (36740, 1, "3-MEDIUM"), + (37540, 1, "4-NOT SPECIFIED") + ) + val lineitemData = Seq[(Int, Int, String)]( + (30340, 1, "F"), + (31140, 4, "F"), + (31940, 7, "O"), + (32740, 6, "O"), + (33540, 2, "F"), + (34340, 3, "F"), + (35140, 1, "O"), + (35940, 2, "F"), + (36740, 3, "F"), + (37540, 5, "O") + ) + withTable("orders", "lineitem") { + ordersData + .toDF("o_orderkey", "o_shippriority", "o_orderpriority") + .write + .format("parquet") + .saveAsTable("orders") + lineitemData + .toDF("l_orderkey", "l_linenumber", "l_linestatus") + .write + .format("parquet") + .saveAsTable("lineitem") + Seq(true, false).foreach { + collapsed => + withSQLConf( + GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = sql(query) + checkAnswer( + df, + Seq( + Row("1-URGENT"), + Row("1-URGENT"), + Row("2-HIGH"), + Row("2-HIGH"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("4-NOT SPECIFIED"), + Row("5-LOW") + ) + ) + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } == !collapsed + ) + } + } + } + } +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index aafb27cef469..59f450297993 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -98,44 +96,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait { * : the input plans. * @return */ - def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { - if (plans.isEmpty) { - return Seq() - } - - val inputPlans: Seq[SparkPlan] = plans.map { - case stage: ShuffleQueryStageExec => stage.plan - case plan => plan - } - - var newChildren: Seq[SparkPlan] = Seq() - inputPlans.foreach { - plan => - newChildren = newChildren ++ getChildrenPlan(plan.children) - // To avoid duplication of WholeStageCodegenXXX and its children. - if (!plan.nodeName.startsWith("WholeStageCodegen")) { - newChildren = newChildren :+ plan - } - } - newChildren - } - - /** - * Get the executed plan of a data frame. - * - * @param df - * : dataframe. - * @return - * A sequence of executed plans. - */ - def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { - df.queryExecution.executedPlan match { - case exec: AdaptiveSparkPlanExec => - getChildrenPlan(Seq(exec.executedPlan)) - case plan => - getChildrenPlan(Seq(plan)) - } - } def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { val executedPlan = getExecutedPlan(df) diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index c323efa99140..46ee5666ba8a 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} @@ -1152,6 +1152,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFallbackSuite] enableSuite[GlutenHiveSQLQuerySuite] enableSuite[GlutenImplicitsTest] + enableSuite[GlutenCollapseProjectExecTransformerSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala new file mode 100644 index 000000000000..6709e2a91fec --- /dev/null +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { + + import testImplicits._ + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_shippriority >= 0 + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_linenumber < 10 + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + + val ordersData = Seq[(Int, Int, String)]( + (30340, 1, "3-MEDIUM"), + (31140, 1, "1-URGENT"), + (31940, 1, "2-HIGH"), + (32740, 1, "3-MEDIUM"), + (33540, 1, "5-LOW"), + (34340, 1, "2-HIGH"), + (35140, 1, "3-MEDIUM"), + (35940, 1, "1-URGENT"), + (36740, 1, "3-MEDIUM"), + (37540, 1, "4-NOT SPECIFIED") + ) + val lineitemData = Seq[(Int, Int, String)]( + (30340, 1, "F"), + (31140, 4, "F"), + (31940, 7, "O"), + (32740, 6, "O"), + (33540, 2, "F"), + (34340, 3, "F"), + (35140, 1, "O"), + (35940, 2, "F"), + (36740, 3, "F"), + (37540, 5, "O") + ) + withTable("orders", "lineitem") { + ordersData + .toDF("o_orderkey", "o_shippriority", "o_orderpriority") + .write + .format("parquet") + .saveAsTable("orders") + lineitemData + .toDF("l_orderkey", "l_linenumber", "l_linestatus") + .write + .format("parquet") + .saveAsTable("lineitem") + Seq(true, false).foreach { + collapsed => + withSQLConf( + GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = sql(query) + checkAnswer( + df, + Seq( + Row("1-URGENT"), + Row("1-URGENT"), + Row("2-HIGH"), + Row("2-HIGH"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("4-NOT SPECIFIED"), + Row("5-LOW") + ) + ) + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } == !collapsed + ) + } + } + } + } +} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index 90d978b0d2e5..9bf4f453c2d5 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -91,52 +89,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait { conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } - /** - * Get all the children plan of plans. - * - * @param plans - * : the input plans. - * @return - */ - def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { - if (plans.isEmpty) { - return Seq() - } - - val inputPlans: Seq[SparkPlan] = plans.map { - case stage: ShuffleQueryStageExec => stage.plan - case plan => plan - } - - var newChildren: Seq[SparkPlan] = Seq() - inputPlans.foreach { - plan => - newChildren = newChildren ++ getChildrenPlan(plan.children) - // To avoid duplication of WholeStageCodegenXXX and its children. - if (!plan.nodeName.startsWith("WholeStageCodegen")) { - newChildren = newChildren :+ plan - } - } - newChildren - } - - /** - * Get the executed plan of a data frame. - * - * @param df - * : dataframe. - * @return - * A sequence of executed plans. - */ - def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { - df.queryExecution.executedPlan match { - case exec: AdaptiveSparkPlanExec => - getChildrenPlan(Seq(exec.executedPlan)) - case plan => - getChildrenPlan(Seq(plan)) - } - } - def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { val executedPlan = getExecutedPlan(df) assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass)) diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 40b5e4039910..3096ee5595ad 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} @@ -1203,6 +1203,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenXPathFunctionsSuite] enableSuite[GlutenFallbackSuite] enableSuite[GlutenHiveSQLQuerySuite] + enableSuite[GlutenCollapseProjectExecTransformerSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala new file mode 100644 index 000000000000..6709e2a91fec --- /dev/null +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCollapseProjectExecTransformerSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait { + + import testImplicits._ + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_shippriority >= 0 + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_linenumber < 10 + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + + val ordersData = Seq[(Int, Int, String)]( + (30340, 1, "3-MEDIUM"), + (31140, 1, "1-URGENT"), + (31940, 1, "2-HIGH"), + (32740, 1, "3-MEDIUM"), + (33540, 1, "5-LOW"), + (34340, 1, "2-HIGH"), + (35140, 1, "3-MEDIUM"), + (35940, 1, "1-URGENT"), + (36740, 1, "3-MEDIUM"), + (37540, 1, "4-NOT SPECIFIED") + ) + val lineitemData = Seq[(Int, Int, String)]( + (30340, 1, "F"), + (31140, 4, "F"), + (31940, 7, "O"), + (32740, 6, "O"), + (33540, 2, "F"), + (34340, 3, "F"), + (35140, 1, "O"), + (35940, 2, "F"), + (36740, 3, "F"), + (37540, 5, "O") + ) + withTable("orders", "lineitem") { + ordersData + .toDF("o_orderkey", "o_shippriority", "o_orderpriority") + .write + .format("parquet") + .saveAsTable("orders") + lineitemData + .toDF("l_orderkey", "l_linenumber", "l_linestatus") + .write + .format("parquet") + .saveAsTable("lineitem") + Seq(true, false).foreach { + collapsed => + withSQLConf( + GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = sql(query) + checkAnswer( + df, + Seq( + Row("1-URGENT"), + Row("1-URGENT"), + Row("2-HIGH"), + Row("2-HIGH"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("3-MEDIUM"), + Row("4-NOT SPECIFIED"), + Row("5-LOW") + ) + ) + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } == !collapsed + ) + } + } + } + } +} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala index d82e84d50d96..802d344634d1 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -91,52 +89,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait { conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } - /** - * Get all the children plan of plans. - * - * @param plans - * : the input plans. - * @return - */ - def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = { - if (plans.isEmpty) { - return Seq() - } - - val inputPlans: Seq[SparkPlan] = plans.map { - case stage: ShuffleQueryStageExec => stage.plan - case plan => plan - } - - var newChildren: Seq[SparkPlan] = Seq() - inputPlans.foreach { - plan => - newChildren = newChildren ++ getChildrenPlan(plan.children) - // To avoid duplication of WholeStageCodegenXXX and its children. - if (!plan.nodeName.startsWith("WholeStageCodegen")) { - newChildren = newChildren :+ plan - } - } - newChildren - } - - /** - * Get the executed plan of a data frame. - * - * @param df - * : dataframe. - * @return - * A sequence of executed plans. - */ - def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = { - df.queryExecution.executedPlan match { - case exec: AdaptiveSparkPlanExec => - getChildrenPlan(Seq(exec.executedPlan)) - case plan => - getChildrenPlan(Seq(plan)) - } - } - def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { val executedPlan = getExecutedPlan(df) assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass)) diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index c88208089738..1529e8a27796 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -291,6 +291,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def abandonPartialAggregationMinRows: Option[Int] = conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS) def enableNativeWriter: Boolean = conf.getConf(NATIVE_WRITER_ENABLED) + + def enableColumnarProjectCollapse: Boolean = conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE) } object GlutenConfig { @@ -1334,6 +1336,13 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val ENABLE_COLUMNAR_PROJECT_COLLAPSE = + buildConf("spark.gluten.sql.columnar.project.collapse") + .internal() + .doc("Combines two columnar project operators into one and perform alias substitution") + .booleanConf + .createWithDefault(true) + val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS = buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems") .internal() diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 000000000000..9b7a0bee45e0 --- /dev/null +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ + +object CollapseProjectShim extends AliasHelper { + def canCollapseExpressions( + consumers: Seq[NamedExpression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + canCollapseExpressions(consumers, getAliasMap(producers), alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + val aliases = getAliasMap(lower) + upper.map(replaceAliasButKeepName(_, aliases)) + } + + /** Check if we can collapse expressions safely. */ + private def canCollapseExpressions( + consumers: Seq[Expression], + producerMap: Map[Attribute, Expression], + alwaysInline: Boolean = false): Boolean = { + // We can only collapse expressions if all input expressions meet the following criteria: + // - The input is deterministic. + // - The input is only consumed once OR the underlying input expression is cheap. + consumers + .flatMap(collectReferences) + .groupBy(identity) + .mapValues(_.size) + .forall { + case (reference, count) => + val producer = producerMap.getOrElse(reference, reference) + producer.deterministic && (count == 1 || alwaysInline || { + val relatedConsumers = consumers.filter(_.references.contains(reference)) + // It's still exactly-only if there is only one reference in non-extract expressions, + // as we won't duplicate the expensive CreateStruct-like expressions. + val extractOnly = relatedConsumers.map(refCountInNonExtract(_, reference)).sum <= 1 + shouldInline(producer, extractOnly) + }) + } + } + + /** + * Return all the references of the given expression without deduplication, which is different + * from `Expression.references`. + */ + private def collectReferences(e: Expression): Seq[Attribute] = e.collect { + case a: Attribute => a + } + + /** Check if the given expression is cheap that we can inline it. */ + private def shouldInline(e: Expression, extractOnlyConsumer: Boolean): Boolean = e match { + case _: Attribute | _: OuterReference => true + case _ if e.foldable => true + // PythonUDF is handled by the rule ExtractPythonUDFs + case _: PythonUDF => true + // Alias and ExtractValue are very cheap. + case _: Alias | _: ExtractValue => e.children.forall(shouldInline(_, extractOnlyConsumer)) + // These collection create functions are not cheap, but we have optimizer rules that can + // optimize them out if they are only consumed by ExtractValue, so we need to allow to inline + // them to avoid perf regression. As an example: + // Project(s.a, s.b, Project(create_struct(a, b, c) as s, child)) + // We should collapse these two projects and eventually get Project(a, b, child) + case _: CreateNamedStruct | _: CreateArray | _: CreateMap | _: UpdateFields => + extractOnlyConsumer + case _ => false + } + + private def refCountInNonExtract(expr: Expression, ref: Attribute): Int = { + def refCount(e: Expression): Int = e match { + case a: Attribute if a.semanticEquals(ref) => 1 + // The first child of `ExtractValue` is the complex type to be extracted. + case e: ExtractValue if e.children.head.semanticEquals(ref) => 0 + case _ => e.children.map(refCount).sum + } + refCount(expr) + } +} diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 000000000000..1df1456f4011 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} + +object CollapseProjectShim { + def canCollapseExpressions( + consumers: Seq[Expression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + CollapseProject.buildCleanedProjectList(upper, lower) + } +} diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 000000000000..1df1456f4011 --- /dev/null +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} + +object CollapseProjectShim { + def canCollapseExpressions( + consumers: Seq[Expression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + CollapseProject.buildCleanedProjectList(upper, lower) + } +}