Skip to content

Commit

Permalink
Pruning unused project column
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Jan 2, 2025
1 parent 4e0aab0 commit 47ece74
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ object VeloxRuleApi {
val validatorBuilder: GlutenConfig => Validator = conf =>
Validators.newValidator(conf, offloads)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
Seq(
RewriteIn,
RewriteMultiChildrenCount,
RewriteJoin,
PullOutPreProject,
PullOutPostProject,
ProjectColumnPruning)
injector.injectTransform(
c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), rewrites, offloads))

Expand Down Expand Up @@ -124,7 +130,13 @@ object VeloxRuleApi {
// Gluten RAS: The RAS rule.
val validatorBuilder: GlutenConfig => Validator = conf => Validators.newValidator(conf)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
Seq(
RewriteIn,
RewriteMultiChildrenCount,
RewriteJoin,
PullOutPreProject,
PullOutPostProject,
ProjectColumnPruning)
injector.injectCoster(_ => LegacyCoster)
injector.injectCoster(_ => RoughCoster)
injector.injectCoster(_ => RoughCoster2)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension.columnar.rewrite

import org.apache.spark.sql.execution.{ProjectExec, SparkPlan, UnaryExecNode}

/**
* After applying the PullOutPreProject rule, there may be some projects that contain columns not
* consumed by the parent. These columns will be removed by this rewrite rule.
*/
object ProjectColumnPruning extends RewriteSingleNode {
override def isRewritable(plan: SparkPlan): Boolean = {
RewriteEligibility.isRewritable(plan)
}

override def rewrite(plan: SparkPlan): SparkPlan = plan match {
case parent: UnaryExecNode if parent.child.isInstanceOf[ProjectExec] =>
val project = parent.child.asInstanceOf[ProjectExec]
val unusedAttribute = project.outputSet -- parent.references

if (unusedAttribute.nonEmpty) {
val newProject = project.copy(projectList = project.projectList.diff(unusedAttribute.toSeq))
parent.withNewChildren(Seq(newProject))
} else {
parent
}
case _ => plan
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
import org.apache.spark.sql.statistics.SparkFunctionStatistics
Expand Down Expand Up @@ -1934,6 +1934,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("right outer join with unique keys using SortMergeJoin (whole-stage-codegen on)")
enableSuite[GlutenCustomerExtensionSuite]
enableSuite[GlutenSessionExtensionSuite]
enableSuite[GlutenExtensionRewriteRuleSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
.exclude("avoid shuffle when join 2 bucketed tables")
.exclude("only shuffle one side when join bucketed table and non-bucketed table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources._

Expand All @@ -44,6 +44,7 @@ import org.apache.spark.sql.sources._
class VeloxTestSettings extends BackendTestSettings {

enableSuite[GlutenSessionExtensionSuite]
enableSuite[GlutenExtensionRewriteRuleSuite]

enableSuite[GlutenDataFrameAggregateSuite]
.exclude(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.extension

import org.apache.gluten.execution.ProjectExecTransformer

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsTrait

class GlutenExtensionRewriteRuleSuite extends GlutenSQLTestsTrait {

override def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.adaptive.enabled", "false")
}

testGluten("GLUTEN-8183 - Pruning unused column in project") {
val query =
"""
|SELECT
| max(n1),
| max(n2),
| sum(IF(n1 + n2 + n3 % 2 = 0, 1, 0))
|FROM
| (
| SELECT
| id + 1 AS n1,
| id + 2 AS n2,
| IF(id % 2 = 0, id + 3, id + 4) AS n3
| FROM
| RANGE(10)
| )
|""".stripMargin

val df = sql(query)
assert(
getExecutedPlan(df).exists {
case project: ProjectExecTransformer => project.projectList.length == 3
case _ => false
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -1816,6 +1816,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)")
enableSuite[GlutenCustomerExtensionSuite]
enableSuite[GlutenSessionExtensionSuite]
enableSuite[GlutenExtensionRewriteRuleSuite]
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
.exclude("avoid shuffle when join 2 bucketed tables")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -905,6 +905,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenTakeOrderedAndProjectSuite]
enableSuite[GlutenSessionExtensionSuite]
enableSuite[TestFileSourceScanExecTransformer]
enableSuite[GlutenExtensionRewriteRuleSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
// Exclude the following suite for plan changed from SMJ to SHJ.
.exclude("avoid shuffle when join 2 bucketed tables")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.extension

import org.apache.gluten.execution.ProjectExecTransformer

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsTrait

class GlutenExtensionRewriteRuleSuite extends GlutenSQLTestsTrait {

override def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.adaptive.enabled", "false")
}

testGluten("GLUTEN-8183 - Pruning unused column in project") {
val query =
"""
|SELECT
| max(n1),
| max(n2),
| sum(IF(n1 + n2 + n3 % 2 = 0, 1, 0))
|FROM
| (
| SELECT
| id + 1 AS n1,
| id + 2 AS n2,
| IF(id % 2 = 0, id + 3, id + 4) AS n3
| FROM
| RANGE(10)
| )
|""".stripMargin

val df = sql(query)
assert(
getExecutedPlan(df).exists {
case project: ProjectExecTransformer => project.projectList.length == 3
case _ => false
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -1660,6 +1660,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)")
enableSuite[GlutenCustomerExtensionSuite]
enableSuite[GlutenSessionExtensionSuite]
enableSuite[GlutenExtensionRewriteRuleSuite]
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
.exclude("avoid shuffle when join 2 bucketed tables")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}
Expand Down Expand Up @@ -915,6 +915,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenTakeOrderedAndProjectSuite]
enableSuite[GlutenSessionExtensionSuite]
enableSuite[TestFileSourceScanExecTransformer]
enableSuite[GlutenExtensionRewriteRuleSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
// Exclude the following suite for plan changed from SMJ to SHJ.
.exclude("avoid shuffle when join 2 bucketed tables")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.extension

import org.apache.gluten.execution.ProjectExecTransformer

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsTrait

class GlutenExtensionRewriteRuleSuite extends GlutenSQLTestsTrait {

override def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.adaptive.enabled", "false")
}

testGluten("GLUTEN-8183 - Pruning unused column in project") {
val query =
"""
|SELECT
| max(n1),
| max(n2),
| sum(IF(n1 + n2 + n3 % 2 = 0, 1, 0))
|FROM
| (
| SELECT
| id + 1 AS n1,
| id + 2 AS n2,
| IF(id % 2 = 0, id + 3, id + 4) AS n3
| FROM
| RANGE(10)
| )
|""".stripMargin

val df = sql(query)
assert(
getExecutedPlan(df).exists {
case project: ProjectExecTransformer => project.projectList.length == 3
case _ => false
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuiteForceShjOn, GlutenOuterJoinSuiteForceShjOn}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -1662,6 +1662,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)")
enableSuite[GlutenCustomerExtensionSuite]
enableSuite[GlutenSessionExtensionSuite]
enableSuite[GlutenExtensionRewriteRuleSuite]
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
.exclude("avoid shuffle when join 2 bucketed tables")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenExtensionRewriteRuleSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -929,6 +929,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenTakeOrderedAndProjectSuite]
enableSuite[GlutenSessionExtensionSuite]
enableSuite[TestFileSourceScanExecTransformer]
enableSuite[GlutenExtensionRewriteRuleSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
// Exclude the following suite for plan changed from SMJ to SHJ.
.exclude("avoid shuffle when join 2 bucketed tables")
Expand Down
Loading

0 comments on commit 47ece74

Please sign in to comment.