Skip to content

Commit

Permalink
[GLUTEN-7203][Core] Make push down filter to scan as a individual rule
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Sep 12, 2024
1 parent 85d90c9 commit 1f44e30
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
// Let's make push down functionally same as vanilla Spark for now.

sparkExecNode match {
case fileSourceScan: FileSourceScanExec
case fileSourceScan: FileSourceScanExecTransformerBase
if isParquetFormat(fileSourceScan.relation.fileFormat) =>
PushDownUtil.removeNotSupportPushDownFilters(
fileSourceScan.conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private object VeloxRuleApi {
injector.injectTransform(_ => TransformPreOverrides())
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => PushDownFilterToScan)
injector.injectTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
Expand Down Expand Up @@ -107,6 +108,7 @@ private object VeloxRuleApi {
injector.inject(_ => RemoveTransitions)
injector.inject(_ => RemoveNativeWriteFilesSortAndProject())
injector.inject(c => RewriteTransformer.apply(c.session))
injector.inject(_ => PushDownFilterToScan)
injector.inject(_ => PushDownInputFileExpression.PostOffload)
injector.inject(_ => EnsureLocalSortRequirements)
injector.inject(_ => EliminateLocalSort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -627,9 +627,9 @@ trait SparkPlanExecApi {
})
}
sparkExecNode match {
case fileSourceScan: FileSourceScanExec =>
case fileSourceScan: FileSourceScanExecTransformerBase =>
getPushedFilter(fileSourceScan.dataFilters)
case batchScan: BatchScanExec =>
case batchScan: BatchScanExecTransformerBase =>
batchScan.scan match {
case fileScan: FileScan =>
getPushedFilter(fileScan.dataFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,28 +357,4 @@ object FilterHandler extends PredicateHelper {
*/
def getRemainingFilters(scanFilters: Seq[Expression], filters: Seq[Expression]): Seq[Expression] =
(filters.toSet -- scanFilters.toSet).toSeq

// Separate and compare the filter conditions in Scan and Filter.
// Try to push down the remaining conditions in Filter into Scan.
def pushFilterToScan(condition: Expression, scan: SparkPlan): SparkPlan =
scan match {
case fileSourceScan: FileSourceScanExec =>
val pushDownFilters =
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
splitConjunctivePredicates(condition),
fileSourceScan)
ScanTransformerFactory.createFileSourceScanTransformer(
fileSourceScan,
allPushDownFilters = Some(pushDownFilters))
case batchScan: BatchScanExec =>
val pushDownFilters =
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
splitConjunctivePredicates(condition),
batchScan)
ScanTransformerFactory.createBatchScanTransformer(
batchScan,
allPushDownFilters = Some(pushDownFilters))
case other =>
throw new GlutenNotSupportException(s"${other.getClass.toString} is not supported.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object MiscColumnarRules {
object TransformPreOverrides {
def apply(): TransformPreOverrides = {
TransformPreOverrides(
List(OffloadFilter()),
List(),
List(
OffloadOthers(),
OffloadAggregate(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,51 +220,6 @@ object OffloadJoin {
}
}

// Filter transformation.
case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil {
import OffloadOthers._
private val replace = new ReplaceSingleNode()

override def offload(plan: SparkPlan): SparkPlan = plan match {
case filter: FilterExec =>
genFilterExec(filter)
case other => other
}

/**
* Generate a plan for filter.
*
* @param filter
* : the original Spark plan.
* @return
* the actually used plan for execution.
*/
private def genFilterExec(filter: FilterExec): SparkPlan = {
if (FallbackTags.nonEmpty(filter)) {
return filter
}

// FIXME: Filter push-down should be better done by Vanilla Spark's planner or by
// a individual rule.
// Push down the left conditions in Filter into FileSourceScan.
val newChild: SparkPlan = filter.child match {
case scan @ (_: FileSourceScanExec | _: BatchScanExec) =>
if (FallbackTags.maybeOffloadable(scan)) {
val newScan =
FilterHandler.pushFilterToScan(filter.condition, scan)
newScan match {
case ts: TransformSupport if ts.doValidate().ok() => ts
case _ => scan
}
} else scan
case _ => filter.child
}
logDebug(s"Columnar Processing for ${filter.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genFilterExecTransformer(filter.condition, newChild)
}
}

// Other transformations.
case class OffloadOthers() extends OffloadSingleNode with LogLevelUtil {
import OffloadOthers._
Expand Down Expand Up @@ -297,6 +252,10 @@ object OffloadOthers {
case plan: CoalesceExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarCoalesceExec(plan.numPartitions, plan.child)
case plan: FilterExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genFilterExecTransformer(plan.condition, plan.child)
case plan: ProjectExec =>
val columnarChild = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension.columnar

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{BatchScanExecTransformerBase, FileSourceScanExecTransformer, FilterExecTransformerBase}

import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._

/**
* Vanilla spark just push down part of filter condition into scan, however gluten can push down all
* filters.
*/
object PushDownFilterToScan extends Rule[SparkPlan] with PredicateHelper {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case filter: FilterExecTransformerBase =>
filter.child match {
case fileScan: FileSourceScanExecTransformer =>
val pushDownFilters =
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
splitConjunctivePredicates(filter.cond),
fileScan)
if (pushDownFilters.size > fileScan.dataFilters.size) {
val newScan = fileScan.copy(dataFilters = pushDownFilters)
if (newScan.doValidate().ok()) {
filter.withNewChildren(Seq(newScan))
} else {
filter
}
} else {
filter
}
case batchScan: BatchScanExecTransformerBase =>
val pushDownFilters =
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
splitConjunctivePredicates(filter.cond),
batchScan)
if (pushDownFilters.size > 0) {
val newScan = batchScan
newScan.setPushDownFilters(pushDownFilters)
if (newScan.doValidate().ok()) {
filter.withNewChildren(Seq(newScan))
} else {
filter
}
} else {
filter
}
case _ => filter
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ case class EnumeratedTransform(session: SparkSession, outputsColumnar: Boolean)
.build()

private val rules = List(
new PushFilterToScan(validator),
RemoveSort,
RemoveFilter
RemoveSort
)

// TODO: Should obey ReplaceSingleNode#applyScanNotTransformable to select
Expand Down

This file was deleted.

Loading

0 comments on commit 1f44e30

Please sign in to comment.