Skip to content

Commit

Permalink
support project transformer collapse
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 5, 2023
1 parent 9d07d48 commit 5b46463
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.{ProjectExecTransformer, VeloxWholeStageTransformerSuite}

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

class CollapseProjectExecTransformerSuite
extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPlanHelper {

protected val rootPath: String = getClass.getResource("/").getPath
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

override def beforeAll(): Unit = {
super.beforeAll()
createTPCHNotNullTables()
}

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
}

test("Support ProjectExecTransformer collapse") {
val query =
"""
|SELECT
| o_orderpriority
|FROM
| orders
|WHERE
| o_orderdate >= '1993-07-01'
| AND EXISTS (
| SELECT
| *
| FROM
| lineitem
| WHERE
| l_orderkey = o_orderkey
| AND l_commitdate < l_receiptdate
| )
|ORDER BY
| o_orderpriority
|LIMIT
| 100;
|""".stripMargin
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "false") {
runQueryAndCompare(query) {
df =>
{
assert(
getExecutedPlan(df).exists {
case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
}
)
}
}
}
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> "true") {
runQueryAndCompare(query) {
df =>
{
assert(
!getExecutedPlan(df).exists {
case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
}
)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.ProjectExecTransformer

import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan

object CollapseProjectTransformer extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
if (!GlutenConfig.getConf.enableColumnarProjectCollapse) {
return plan
}
plan.transformUp {
case p1 @ ProjectExecTransformer(_, p2: ProjectExecTransformer)
if CollapseProjectShim.canCollapseExpressions(
p1.projectList,
p2.projectList,
alwaysInline = false) =>
p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,8 @@ case class ColumnarOverrideRules(session: SparkSession)
(_: SparkSession) => AddTransformHintRule(),
(_: SparkSession) => TransformPreOverrides(isAdaptiveContext),
(spark: SparkSession) => RewriteTransformer(spark),
(_: SparkSession) => EnsureLocalSortRequirements
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => CollapseProjectTransformer
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() :::
SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def abandonPartialAggregationMinRows: Option[Int] =
conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
def enableNativeWriter: Boolean = conf.getConf(NATIVE_WRITER_ENABLED)

def enableColumnarProjectCollapse: Boolean = conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
}

object GlutenConfig {
Expand Down Expand Up @@ -1324,6 +1326,13 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)

val ENABLE_COLUMNAR_PROJECT_COLLAPSE =
buildConf("spark.gluten.sql.columnar.project.collapse")
.internal()
.doc("Combines two columnar project operators into one and perform alias substitution")
.booleanConf
.createWithDefault(true)

val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS =
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems")
.internal()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._

object CollapseProjectShim extends AliasHelper {
def canCollapseExpressions(
consumers: Seq[NamedExpression],
producers: Seq[NamedExpression],
alwaysInline: Boolean): Boolean = {
canCollapseExpressions(consumers, getAliasMap(producers), alwaysInline)
}

def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
val aliases = getAliasMap(lower)
upper.map(replaceAliasButKeepName(_, aliases))
}

/** Check if we can collapse expressions safely. */
private def canCollapseExpressions(
consumers: Seq[Expression],
producerMap: Map[Attribute, Expression],
alwaysInline: Boolean = false): Boolean = {
// We can only collapse expressions if all input expressions meet the following criteria:
// - The input is deterministic.
// - The input is only consumed once OR the underlying input expression is cheap.
consumers
.flatMap(collectReferences)
.groupBy(identity)
.mapValues(_.size)
.forall {
case (reference, count) =>
val producer = producerMap.getOrElse(reference, reference)
producer.deterministic && (count == 1 || alwaysInline || {
val relatedConsumers = consumers.filter(_.references.contains(reference))
// It's still exactly-only if there is only one reference in non-extract expressions,
// as we won't duplicate the expensive CreateStruct-like expressions.
val extractOnly = relatedConsumers.map(refCountInNonExtract(_, reference)).sum <= 1
shouldInline(producer, extractOnly)
})
}
}

/**
* Return all the references of the given expression without deduplication, which is different
* from `Expression.references`.
*/
private def collectReferences(e: Expression): Seq[Attribute] = e.collect {
case a: Attribute => a
}

/** Check if the given expression is cheap that we can inline it. */
private def shouldInline(e: Expression, extractOnlyConsumer: Boolean): Boolean = e match {
case _: Attribute | _: OuterReference => true
case _ if e.foldable => true
// PythonUDF is handled by the rule ExtractPythonUDFs
case _: PythonUDF => true
// Alias and ExtractValue are very cheap.
case _: Alias | _: ExtractValue => e.children.forall(shouldInline(_, extractOnlyConsumer))
// These collection create functions are not cheap, but we have optimizer rules that can
// optimize them out if they are only consumed by ExtractValue, so we need to allow to inline
// them to avoid perf regression. As an example:
// Project(s.a, s.b, Project(create_struct(a, b, c) as s, child))
// We should collapse these two projects and eventually get Project(a, b, child)
case _: CreateNamedStruct | _: CreateArray | _: CreateMap | _: UpdateFields =>
extractOnlyConsumer
case _ => false
}

private def refCountInNonExtract(expr: Expression, ref: Attribute): Int = {
def refCount(e: Expression): Int = e match {
case a: Attribute if a.semanticEquals(ref) => 1
// The first child of `ExtractValue` is the complex type to be extracted.
case e: ExtractValue if e.children.head.semanticEquals(ref) => 0
case _ => e.children.map(refCount).sum
}
refCount(expr)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}

object CollapseProjectShim {
def canCollapseExpressions(
consumers: Seq[Expression],
producers: Seq[NamedExpression],
alwaysInline: Boolean): Boolean = {
CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline)
}

def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
CollapseProject.buildCleanedProjectList(upper, lower)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}

object CollapseProjectShim {
def canCollapseExpressions(
consumers: Seq[Expression],
producers: Seq[NamedExpression],
alwaysInline: Boolean): Boolean = {
CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline)
}

def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
CollapseProject.buildCleanedProjectList(upper, lower)
}
}

0 comments on commit 5b46463

Please sign in to comment.