diff --git a/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala b/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala new file mode 100644 index 000000000000..feaa32877e97 --- /dev/null +++ b/backends-velox/src/test/scala/io/glutenproject/extension/CollapseProjectExecTransformerSuite.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.{ProjectExecTransformer, VeloxWholeStageTransformerSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +class CollapseProjectExecTransformerSuite + extends VeloxWholeStageTransformerSuite + with AdaptiveSparkPlanHelper { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override def beforeAll(): Unit = { + super.beforeAll() + createTPCHNotNullTables() + } + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + } + + test("Support ProjectExecTransformer collapse") { + val query = + """ + |SELECT + | o_orderpriority + |FROM + | orders + |WHERE + | o_orderdate >= '1993-07-01' + | AND EXISTS ( + | SELECT + | * + | FROM + | lineitem + | WHERE + | l_orderkey = o_orderkey + | AND l_commitdate < l_receiptdate + | ) + |ORDER BY + | o_orderpriority + |LIMIT + | 100; + |""".stripMargin + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "false") { + runQueryAndCompare(query) { + df => + { + assert( + getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } + ) + } + } + } + withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") { + runQueryAndCompare(query) { + df => + { + assert( + !getExecutedPlan(df).exists { + case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true + case _ => false + } + ) + } + } + } + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala new file mode 100644 index 000000000000..90cd0e00b2fb --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/extension/CollapseProjectTransformer.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.extension + +import io.glutenproject.GlutenConfig +import io.glutenproject.execution.ProjectExecTransformer + +import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan + +object CollapseProjectTransformer extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!GlutenConfig.getConf.enableColumnarProjectCollapse) { + return plan + } + plan.transformUp { + case p1 @ ProjectExecTransformer(_, p2: ProjectExecTransformer) + if CollapseProjectShim.canCollapseExpressions( + p1.projectList, + p2.projectList, + alwaysInline = false) => + p2.copy(projectList = + CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList)) + } + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 412b29085e15..17f43fcd7540 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -793,7 +793,8 @@ case class ColumnarOverrideRules(session: SparkSession) (_: SparkSession) => AddTransformHintRule(), (_: SparkSession) => TransformPreOverrides(isAdaptiveContext), (spark: SparkSession) => RewriteTransformer(spark), - (_: SparkSession) => EnsureLocalSortRequirements + (_: SparkSession) => EnsureLocalSortRequirements, + (_: SparkSession) => CollapseProjectTransformer ) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() ::: SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules) diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 92dc9a291c98..b173052047a9 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -288,6 +288,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def abandonPartialAggregationMinRows: Option[Int] = conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS) def enableNativeWriter: Boolean = conf.getConf(NATIVE_WRITER_ENABLED) + + def enableColumnarProjectCollapse: Boolean = conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE) } object GlutenConfig { @@ -1324,6 +1326,13 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val ENABLE_COLUMNAR_PROJECT_COLLAPSE = + buildConf("spark.gluten.sql.columnar.project.collapse") + .internal() + .doc("Combines two columnar project operators into one and perform alias substitution") + .booleanConf + .createWithDefault(true) + val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS = buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems") .internal() diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 000000000000..9b7a0bee45e0 --- /dev/null +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ + +object CollapseProjectShim extends AliasHelper { + def canCollapseExpressions( + consumers: Seq[NamedExpression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + canCollapseExpressions(consumers, getAliasMap(producers), alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + val aliases = getAliasMap(lower) + upper.map(replaceAliasButKeepName(_, aliases)) + } + + /** Check if we can collapse expressions safely. */ + private def canCollapseExpressions( + consumers: Seq[Expression], + producerMap: Map[Attribute, Expression], + alwaysInline: Boolean = false): Boolean = { + // We can only collapse expressions if all input expressions meet the following criteria: + // - The input is deterministic. + // - The input is only consumed once OR the underlying input expression is cheap. + consumers + .flatMap(collectReferences) + .groupBy(identity) + .mapValues(_.size) + .forall { + case (reference, count) => + val producer = producerMap.getOrElse(reference, reference) + producer.deterministic && (count == 1 || alwaysInline || { + val relatedConsumers = consumers.filter(_.references.contains(reference)) + // It's still exactly-only if there is only one reference in non-extract expressions, + // as we won't duplicate the expensive CreateStruct-like expressions. + val extractOnly = relatedConsumers.map(refCountInNonExtract(_, reference)).sum <= 1 + shouldInline(producer, extractOnly) + }) + } + } + + /** + * Return all the references of the given expression without deduplication, which is different + * from `Expression.references`. + */ + private def collectReferences(e: Expression): Seq[Attribute] = e.collect { + case a: Attribute => a + } + + /** Check if the given expression is cheap that we can inline it. */ + private def shouldInline(e: Expression, extractOnlyConsumer: Boolean): Boolean = e match { + case _: Attribute | _: OuterReference => true + case _ if e.foldable => true + // PythonUDF is handled by the rule ExtractPythonUDFs + case _: PythonUDF => true + // Alias and ExtractValue are very cheap. + case _: Alias | _: ExtractValue => e.children.forall(shouldInline(_, extractOnlyConsumer)) + // These collection create functions are not cheap, but we have optimizer rules that can + // optimize them out if they are only consumed by ExtractValue, so we need to allow to inline + // them to avoid perf regression. As an example: + // Project(s.a, s.b, Project(create_struct(a, b, c) as s, child)) + // We should collapse these two projects and eventually get Project(a, b, child) + case _: CreateNamedStruct | _: CreateArray | _: CreateMap | _: UpdateFields => + extractOnlyConsumer + case _ => false + } + + private def refCountInNonExtract(expr: Expression, ref: Attribute): Int = { + def refCount(e: Expression): Int = e match { + case a: Attribute if a.semanticEquals(ref) => 1 + // The first child of `ExtractValue` is the complex type to be extracted. + case e: ExtractValue if e.children.head.semanticEquals(ref) => 0 + case _ => e.children.map(refCount).sum + } + refCount(expr) + } +} diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 000000000000..1df1456f4011 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} + +object CollapseProjectShim { + def canCollapseExpressions( + consumers: Seq[Expression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + CollapseProject.buildCleanedProjectList(upper, lower) + } +} diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala new file mode 100644 index 000000000000..1df1456f4011 --- /dev/null +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectShim.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} + +object CollapseProjectShim { + def canCollapseExpressions( + consumers: Seq[Expression], + producers: Seq[NamedExpression], + alwaysInline: Boolean): Boolean = { + CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline) + } + + def buildCleanedProjectList( + upper: Seq[NamedExpression], + lower: Seq[NamedExpression]): Seq[NamedExpression] = { + CollapseProject.buildCleanedProjectList(upper, lower) + } +}