Skip to content

Commit

Permalink
[VL] Collapse trivial projects generated by rule PushDownInputFileExp…
Browse files Browse the repository at this point in the history
…ression
  • Loading branch information
zml1206 authored Sep 13, 2024
1 parent 8ef4ab4 commit db5a2f7
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.catalyst.optimizer.NullPropagation
import org.apache.spark.sql.execution.ProjectExec
Expand Down Expand Up @@ -1366,6 +1368,21 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite {
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

// Collapse project if scan is fallback and the outer project is cheap or fallback.
Seq("true", "false").foreach {
flag =>
withSQLConf(
GlutenConfig.COLUMNAR_PROJECT_ENABLED.key -> flag,
GlutenConfig.COLUMNAR_BATCHSCAN_ENABLED.key -> "false") {
runQueryAndCompare("SELECT l_orderkey, input_file_name() as name FROM lineitem") {
df =>
val plan = df.queryExecution.executedPlan
assert(collect(plan) { case f: ProjectExecTransformer => f }.size == 0)
assert(collect(plan) { case f: ProjectExec => f }.size == 1)
}
}
}
}

testWithSpecifiedSparkVersion("array insert", Some("3.4")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package org.apache.gluten.extension.columnar

import org.apache.gluten.execution.{BatchScanExecTransformer, FileSourceScanExecTransformer}
import org.apache.gluten.execution.{BatchScanExecTransformer, FileSourceScanExecTransformer, ProjectExecTransformer}

import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, NamedExpression}
import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode, ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}

Expand All @@ -34,8 +35,8 @@ import scala.collection.mutable
* Two rules are involved:
* - Before offload, add new project before leaf node and push down input file expression to the
* new project
* - After offload, if scan be offloaded, push down input file expression into scan and remove
* project
* - After offload, push down input file expression into scan and remove project if scan be
* offloaded, collapse project if scan is fallback and the outer project is cheap or fallback
*/
object PushDownInputFileExpression {
def containsInputFileRelatedExpr(expr: Expression): Boolean = {
Expand Down Expand Up @@ -113,6 +114,28 @@ object PushDownInputFileExpression {
case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
case p1 @ ProjectExec(_, p2: ProjectExec) if canCollapseProject(p2) =>
p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList))
case p1 @ ProjectExecTransformer(_, p2: ProjectExec) if canCollapseProject(p1, p2) =>
p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList))
}

private def canCollapseProject(project: ProjectExec): Boolean = {
project.projectList.forall {
case Alias(_: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength, _) => true
case _: Attribute => true
case _ => false
}
}

private def canCollapseProject(p1: ProjectExecTransformer, p2: ProjectExec): Boolean = {
canCollapseProject(p2) && p1.projectList.forall {
case Alias(_: Attribute, _) => true
case _: Attribute => true
case _ => false
}
}
}
}

0 comments on commit db5a2f7

Please sign in to comment.