From daa973031dc50e16b9749f3f4fb8d69ec3e1835b Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Thu, 7 Dec 2023 15:21:18 +0800 Subject: [PATCH] Support project transformer collapse --- .../tpch-q2-wholestage-2-metrics.json | 20 +--- .../GlutenClickHouseTPCHMetricsSuite.scala | 8 +- .../CollapseProjectExecTransformerSuite.scala | 89 +++++++++++++++++ ep/build-clickhouse/src/build_clickhouse.sh | 4 +- .../CollapseProjectTransformer.scala | 62 ++++++++++++ .../extension/ColumnarOverrides.scala | 3 +- .../scala/io/glutenproject/GlutenConfig.scala | 9 ++ .../optimizer/CollapseProjectShim.scala | 96 +++++++++++++++++++ .../optimizer/CollapseProjectShim.scala | 34 +++++++ .../optimizer/CollapseProjectShim.scala | 34 +++++++ 10 files changed, 333 insertions(+), 26 deletions(-) create mode 100644 backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala create mode 100644 gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala create mode 100644 shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala create mode 100644 shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala create mode 100644 shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala diff --git a/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json b/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json index 068c8ccfa3bae..8ea9fe28b31f4 100644 --- a/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json +++ b/backends-clickhouse/src/test/resources/metrics-json/tpch-q2-wholestage-2-metrics.json @@ -1,5 +1,5 @@ [{ - "id": 6, + "id": 5, "name": "kProject", "time": 4, "input_wait_time": 7736, @@ -16,24 +16,6 @@ "input_bytes": 14308 }] }] -}, { - "id": 5, - "name": "kProject", - "time": 6, - "input_wait_time": 7725, - "output_wait_time": 48, - "steps": [{ - "name": "Expression", - "description": "Project", - "processors": [{ - "name": "ExpressionTransform", - "time": 6, - "output_rows": 292, - "output_bytes": 14308, - "input_rows": 292, - "input_bytes": 16644 - }] - }] }, { "id": 4, "name": "kProject", diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index 91fbc57b6c082..13b2d00aaf152 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -207,14 +207,14 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g } - val scanPlan = allGlutenPlans(10) + val scanPlan = allGlutenPlans(9) assert(scanPlan.metrics("scanTime").value == 2) assert(scanPlan.metrics("inputWaitTime").value == 3) assert(scanPlan.metrics("outputWaitTime").value == 1) assert(scanPlan.metrics("outputRows").value == 80000) assert(scanPlan.metrics("outputBytes").value == 2160000) - val filterPlan = allGlutenPlans(9) + val filterPlan = allGlutenPlans(8) assert(filterPlan.metrics("totalTime").value == 1) assert(filterPlan.metrics("inputWaitTime").value == 13) assert(filterPlan.metrics("outputWaitTime").value == 1) @@ -223,7 +223,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite assert(filterPlan.metrics("inputRows").value == 80000) assert(filterPlan.metrics("inputBytes").value == 2160000) - val joinPlan = allGlutenPlans(3) + val joinPlan = allGlutenPlans(2) assert(joinPlan.metrics("totalTime").value == 1) assert(joinPlan.metrics("inputWaitTime").value == 6) assert(joinPlan.metrics("outputWaitTime").value == 0) @@ -244,7 +244,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g } - assert(allGlutenPlans.size == 58) + assert(allGlutenPlans.size == 57) val shjPlan = allGlutenPlans(8) assert(shjPlan.metrics("totalTime").value == 6) diff --git a/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala b/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala new file mode 100644 index 0000000000000..36e93bb1af5fb --- /dev/null +++ b/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.{ProjectExecTransformer, VeloxWholeStageTransformerSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +class CollapseProjectExecTransformerSuite + extends VeloxWholeStageTransformerSuite + with AdaptiveSparkPlanHelper { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override def beforeAll(): Unit = { + super.beforeAll() + createTPCHNotNullTables() + } + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + } + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_orderdate >= '1993-07-01' + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_commitdate < l_receiptdate + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + Seq(true, false).foreach { + collapsed => + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString) { + runQueryAndCompare(query) { + df => + { + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } == !collapsed + ) + } + } + } + } + } +} diff --git a/ep/build-clickhouse/src/build_clickhouse.sh b/ep/build-clickhouse/src/build_clickhouse.sh index 1fca38c8116e0..0a14a86be1aa9 100644 --- a/ep/build-clickhouse/src/build_clickhouse.sh +++ b/ep/build-clickhouse/src/build_clickhouse.sh @@ -38,7 +38,7 @@ done echo ${GLUTEN_SOURCE} -export CC=${CC:-clang-16} -export CXX=${CXX:-clang++-16} +export CC=${CC:-clang-15} +export CXX=${CXX:-clang++-15} cmake -G Ninja -S ${GLUTEN_SOURCE}/cpp-ch -B ${GLUTEN_SOURCE}/cpp-ch/build_ch -DCH_SOURCE_DIR=${CH_SOURCE_DIR} "-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}" cmake --build ${GLUTEN_SOURCE}/cpp-ch/build_ch --target build_ch diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala new file mode 100644 index 0000000000000..222e738b80fc8 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.catalyst.expressions.{Alias, CreateNamedStruct, NamedExpression} +import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan + +object CollapseProjectTransformer extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!GlutenConfig.getConf.enableColumnarProjectCollapse) { + return plan + } + plan.transformUp { + case p1 @ ProjectExecTransformer(_, p2: ProjectExecTransformer) + if !containsNamedStructAlias(p2.projectList) + && CollapseProjectShim.canCollapseExpressions( + p1.projectList, + p2.projectList, + alwaysInline = false) => + val collapsedProject = p2.copy(projectList = + CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList)) + val validationResult = collapsedProject.doValidate() + if (validationResult.isValid) { + logDebug(s"Collapse project $p1 and $p2.") + collapsedProject + } else { + plan + } + } + } + + /** + * In Velox, CreateNamedStruct will generate a special output named obj, We cannot collapse such + * ProjectTransformer, otherwise it will result in a bind reference failure. + */ + private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = { + projectList.exists { + case _ @Alias(_: CreateNamedStruct, _) => true + case _ => false + } + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 84a0dc58bc35d..367ad70a59df0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -794,7 +794,8 @@ case class ColumnarOverrideRules(session: SparkSession) (_: SparkSession) => FallbackBloomFilterAggIfNeeded(), (_: SparkSession) => TransformPreOverrides(isAdaptiveContext), (spark: SparkSession) => RewriteTransformer(spark), - (_: SparkSession) => EnsureLocalSortRequirements + (_: SparkSession) => EnsureLocalSortRequirements, + (_: SparkSession) => CollapseProjectTransformer ) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() ::: SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules) diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index c882080897386..1529e8a27796f 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -291,6 +291,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def abandonPartialAggregationMinRows: Option[Int] = conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS) def enableNativeWriter: Boolean = conf.getConf(NATIVE_WRITER_ENABLED) + + def enableColumnarProjectCollapse: Boolean = conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE) } object GlutenConfig { @@ -1334,6 +1336,13 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val ENABLE_COLUMNAR_PROJECT_COLLAPSE = + buildConf("spark.gluten.sql.columnar.project.collapse") + .internal() + .doc("Combines two columnar project operators into one and perform alias substitution") + .booleanConf + .createWithDefault(true) + val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS = buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems") .internal() diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 0000000000000..9b7a0bee45e0f --- /dev/null +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ + +object CollapseProjectShim extends AliasHelper { + def canCollapseExpressions( + consumers: Seq[NamedExpression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + canCollapseExpressions(consumers, getAliasMap(producers), alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + val aliases = getAliasMap(lower) + upper.map(replaceAliasButKeepName(_, aliases)) + } + + /** Check if we can collapse expressions safely. */ + private def canCollapseExpressions( + consumers: Seq[Expression], + producerMap: Map[Attribute, Expression], + alwaysInline: Boolean = false): Boolean = { + // We can only collapse expressions if all input expressions meet the following criteria: + // - The input is deterministic. + // - The input is only consumed once OR the underlying input expression is cheap. + consumers + .flatMap(collectReferences) + .groupBy(identity) + .mapValues(_.size) + .forall { + case (reference, count) => + val producer = producerMap.getOrElse(reference, reference) + producer.deterministic && (count == 1 || alwaysInline || { + val relatedConsumers = consumers.filter(_.references.contains(reference)) + // It's still exactly-only if there is only one reference in non-extract expressions, + // as we won't duplicate the expensive CreateStruct-like expressions. + val extractOnly = relatedConsumers.map(refCountInNonExtract(_, reference)).sum <= 1 + shouldInline(producer, extractOnly) + }) + } + } + + /** + * Return all the references of the given expression without deduplication, which is different + * from `Expression.references`. + */ + private def collectReferences(e: Expression): Seq[Attribute] = e.collect { + case a: Attribute => a + } + + /** Check if the given expression is cheap that we can inline it. */ + private def shouldInline(e: Expression, extractOnlyConsumer: Boolean): Boolean = e match { + case _: Attribute | _: OuterReference => true + case _ if e.foldable => true + // PythonUDF is handled by the rule ExtractPythonUDFs + case _: PythonUDF => true + // Alias and ExtractValue are very cheap. + case _: Alias | _: ExtractValue => e.children.forall(shouldInline(_, extractOnlyConsumer)) + // These collection create functions are not cheap, but we have optimizer rules that can + // optimize them out if they are only consumed by ExtractValue, so we need to allow to inline + // them to avoid perf regression. As an example: + // Project(s.a, s.b, Project(create_struct(a, b, c) as s, child)) + // We should collapse these two projects and eventually get Project(a, b, child) + case _: CreateNamedStruct | _: CreateArray | _: CreateMap | _: UpdateFields => + extractOnlyConsumer + case _ => false + } + + private def refCountInNonExtract(expr: Expression, ref: Attribute): Int = { + def refCount(e: Expression): Int = e match { + case a: Attribute if a.semanticEquals(ref) => 1 + // The first child of `ExtractValue` is the complex type to be extracted. + case e: ExtractValue if e.children.head.semanticEquals(ref) => 0 + case _ => e.children.map(refCount).sum + } + refCount(expr) + } +} diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 0000000000000..1df1456f40112 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} + +object CollapseProjectShim { + def canCollapseExpressions( + consumers: Seq[Expression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + CollapseProject.buildCleanedProjectList(upper, lower) + } +} diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 0000000000000..1df1456f40112 --- /dev/null +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} + +object CollapseProjectShim { + def canCollapseExpressions( + consumers: Seq[Expression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + CollapseProject.buildCleanedProjectList(upper, lower) + } +}