Skip to content

Commit

Permalink
Support project transformer collapse
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 7, 2023
1 parent 0b74898 commit b5c01cb
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[{
"id": 6,
"id": 5,
"name": "kProject",
"time": 4,
"input_wait_time": 7736,
Expand All @@ -16,24 +16,6 @@
"input_bytes": 14308
}]
}]
}, {
"id": 5,
"name": "kProject",
"time": 6,
"input_wait_time": 7725,
"output_wait_time": 48,
"steps": [{
"name": "Expression",
"description": "Project",
"processors": [{
"name": "ExpressionTransform",
"time": 6,
"output_rows": 292,
"output_bytes": 14308,
"input_rows": 292,
"input_bytes": 16644
}]
}]
}, {
"id": 4,
"name": "kProject",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,14 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g
}

val scanPlan = allGlutenPlans(10)
val scanPlan = allGlutenPlans(9)
assert(scanPlan.metrics("scanTime").value == 2)
assert(scanPlan.metrics("inputWaitTime").value == 3)
assert(scanPlan.metrics("outputWaitTime").value == 1)
assert(scanPlan.metrics("outputRows").value == 80000)
assert(scanPlan.metrics("outputBytes").value == 2160000)

val filterPlan = allGlutenPlans(9)
val filterPlan = allGlutenPlans(8)
assert(filterPlan.metrics("totalTime").value == 1)
assert(filterPlan.metrics("inputWaitTime").value == 13)
assert(filterPlan.metrics("outputWaitTime").value == 1)
Expand All @@ -223,7 +223,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
assert(filterPlan.metrics("inputRows").value == 80000)
assert(filterPlan.metrics("inputBytes").value == 2160000)

val joinPlan = allGlutenPlans(3)
val joinPlan = allGlutenPlans(2)
assert(joinPlan.metrics("totalTime").value == 1)
assert(joinPlan.metrics("inputWaitTime").value == 6)
assert(joinPlan.metrics("outputWaitTime").value == 0)
Expand All @@ -244,7 +244,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g
}

assert(allGlutenPlans.size == 58)
assert(allGlutenPlans.size == 57)

val shjPlan = allGlutenPlans(8)
assert(shjPlan.metrics("totalTime").value == 6)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.{ProjectExecTransformer, VeloxWholeStageTransformerSuite}

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

class CollapseProjectExecTransformerSuite
extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPlanHelper {

protected val rootPath: String = getClass.getResource("/").getPath
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

override def beforeAll(): Unit = {
super.beforeAll()
createTPCHNotNullTables()
}

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
}

test("Support ProjectExecTransformer collapse") {
val query =
"""
|SELECT
| o_orderpriority
|FROM
| orders
|WHERE
| o_orderdate >= '1993-07-01'
| AND EXISTS (
| SELECT
| *
| FROM
| lineitem
| WHERE
| l_orderkey = o_orderkey
| AND l_commitdate < l_receiptdate
| )
|ORDER BY
| o_orderpriority
|LIMIT
| 100;
|""".stripMargin
Seq(true, false).foreach {
collapsed =>
withSQLConf(GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString) {
runQueryAndCompare(query) {
df =>
{
assert(
getExecutedPlan(df).exists {
case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
} == !collapsed
)
}
}
}
}
}
}
4 changes: 2 additions & 2 deletions ep/build-clickhouse/src/build_clickhouse.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ done

echo ${GLUTEN_SOURCE}

export CC=${CC:-clang-16}
export CXX=${CXX:-clang++-16}
export CC=${CC:-clang-15}
export CXX=${CXX:-clang++-15}
cmake -G Ninja -S ${GLUTEN_SOURCE}/cpp-ch -B ${GLUTEN_SOURCE}/cpp-ch/build_ch -DCH_SOURCE_DIR=${CH_SOURCE_DIR} "-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}"
cmake --build ${GLUTEN_SOURCE}/cpp-ch/build_ch --target build_ch
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.ProjectExecTransformer

import org.apache.spark.sql.catalyst.expressions.{Alias, CreateNamedStruct, NamedExpression}
import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan

object CollapseProjectTransformer extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
if (!GlutenConfig.getConf.enableColumnarProjectCollapse) {
return plan
}
plan.transformUp {
case p1 @ ProjectExecTransformer(_, p2: ProjectExecTransformer)
if !containsNamedStructAlias(p2.projectList)
&& CollapseProjectShim.canCollapseExpressions(
p1.projectList,
p2.projectList,
alwaysInline = false) =>
val collapsedProject = p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList))
val validationResult = collapsedProject.doValidate()
if (validationResult.isValid) {
logDebug(s"Collapse project $p1 and $p2.")
collapsedProject
} else {
plan
}
}
}

/**
* In Velox, CreateNamedStruct will generate a special output named obj, We cannot collapse such
* ProjectTransformer, otherwise it will result in a bind reference failure.
*/
private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = {
projectList.exists {
case _ @Alias(_: CreateNamedStruct, _) => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,8 @@ case class ColumnarOverrideRules(session: SparkSession)
(_: SparkSession) => FallbackBloomFilterAggIfNeeded(),
(_: SparkSession) => TransformPreOverrides(isAdaptiveContext),
(spark: SparkSession) => RewriteTransformer(spark),
(_: SparkSession) => EnsureLocalSortRequirements
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => CollapseProjectTransformer
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() :::
SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def abandonPartialAggregationMinRows: Option[Int] =
conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
def enableNativeWriter: Boolean = conf.getConf(NATIVE_WRITER_ENABLED)

def enableColumnarProjectCollapse: Boolean = conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
}

object GlutenConfig {
Expand Down Expand Up @@ -1324,6 +1326,13 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)

val ENABLE_COLUMNAR_PROJECT_COLLAPSE =
buildConf("spark.gluten.sql.columnar.project.collapse")
.internal()
.doc("Combines two columnar project operators into one and perform alias substitution")
.booleanConf
.createWithDefault(true)

val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS =
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems")
.internal()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._

object CollapseProjectShim extends AliasHelper {
def canCollapseExpressions(
consumers: Seq[NamedExpression],
producers: Seq[NamedExpression],
alwaysInline: Boolean): Boolean = {
canCollapseExpressions(consumers, getAliasMap(producers), alwaysInline)
}

def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
val aliases = getAliasMap(lower)
upper.map(replaceAliasButKeepName(_, aliases))
}

/** Check if we can collapse expressions safely. */
private def canCollapseExpressions(
consumers: Seq[Expression],
producerMap: Map[Attribute, Expression],
alwaysInline: Boolean = false): Boolean = {
// We can only collapse expressions if all input expressions meet the following criteria:
// - The input is deterministic.
// - The input is only consumed once OR the underlying input expression is cheap.
consumers
.flatMap(collectReferences)
.groupBy(identity)
.mapValues(_.size)
.forall {
case (reference, count) =>
val producer = producerMap.getOrElse(reference, reference)
producer.deterministic && (count == 1 || alwaysInline || {
val relatedConsumers = consumers.filter(_.references.contains(reference))
// It's still exactly-only if there is only one reference in non-extract expressions,
// as we won't duplicate the expensive CreateStruct-like expressions.
val extractOnly = relatedConsumers.map(refCountInNonExtract(_, reference)).sum <= 1
shouldInline(producer, extractOnly)
})
}
}

/**
* Return all the references of the given expression without deduplication, which is different
* from `Expression.references`.
*/
private def collectReferences(e: Expression): Seq[Attribute] = e.collect {
case a: Attribute => a
}

/** Check if the given expression is cheap that we can inline it. */
private def shouldInline(e: Expression, extractOnlyConsumer: Boolean): Boolean = e match {
case _: Attribute | _: OuterReference => true
case _ if e.foldable => true
// PythonUDF is handled by the rule ExtractPythonUDFs
case _: PythonUDF => true
// Alias and ExtractValue are very cheap.
case _: Alias | _: ExtractValue => e.children.forall(shouldInline(_, extractOnlyConsumer))
// These collection create functions are not cheap, but we have optimizer rules that can
// optimize them out if they are only consumed by ExtractValue, so we need to allow to inline
// them to avoid perf regression. As an example:
// Project(s.a, s.b, Project(create_struct(a, b, c) as s, child))
// We should collapse these two projects and eventually get Project(a, b, child)
case _: CreateNamedStruct | _: CreateArray | _: CreateMap | _: UpdateFields =>
extractOnlyConsumer
case _ => false
}

private def refCountInNonExtract(expr: Expression, ref: Attribute): Int = {
def refCount(e: Expression): Int = e match {
case a: Attribute if a.semanticEquals(ref) => 1
// The first child of `ExtractValue` is the complex type to be extracted.
case e: ExtractValue if e.children.head.semanticEquals(ref) => 0
case _ => e.children.map(refCount).sum
}
refCount(expr)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}

object CollapseProjectShim {
def canCollapseExpressions(
consumers: Seq[Expression],
producers: Seq[NamedExpression],
alwaysInline: Boolean): Boolean = {
CollapseProject.canCollapseExpressions(consumers, producers, alwaysInline)
}

def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
CollapseProject.buildCleanedProjectList(upper, lower)
}
}
Loading

0 comments on commit b5c01cb

Please sign in to comment.