Skip to content

Commit

Permalink
[CORE] Condition store the actual executed filter expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Jul 3, 2024
1 parent 47fa44f commit 45cb2a4
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,13 @@ import org.apache.spark.sql.execution.SparkPlan

case class CHFilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {
override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanTransformer: BasicScanExecTransformer =>
basicScanTransformer.filterExprs()
// In ColumnarGuardRules, the child is still row-based. Need to get the original filters.
case _ =>
FilterHandler.getScanFilters(child)
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): CHFilterExecTransformer =
copy(child = newChild)
}

case class FilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {
override protected def getRemainingCondition: Expression = condition
override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,11 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.SparkPlan

case class FilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {
// FIXME: Should use field "condition" to store the actual executed filter expressions.
// To make optimization easier (like to remove filter when it actually does nothing)
override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer =
copy(child = newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1895,18 +1895,6 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla

test("fix non-deterministic filter executed twice when push down to scan") {
val df = sql("select * from lineitem where rand() <= 0.5")
// plan check
val plan = df.queryExecution.executedPlan
val scans = plan.collect { case scan: FileSourceScanExecTransformer => scan }
val filters = plan.collect { case filter: FilterExecTransformer => filter }
assert(scans.size == 1)
assert(filters.size == 1)
assert(scans(0).dataFilters.size == 1)
val remainingFilters = FilterHandler.getRemainingFilters(
scans(0).dataFilters,
splitConjunctivePredicates(filters(0).condition))
assert(remainingFilters.size == 0)

// result length check, table lineitem has 60,000 rows
val resultLength = df.collect().length
assert(resultLength > 25000 && resultLength < 35000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,47 +106,33 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP

override protected def outputExpressions: Seq[NamedExpression] = child.output

protected def getRemainingCondition: Expression

override protected def doValidateInternal(): ValidationResult = {
val remainingCondition = getRemainingCondition
if (remainingCondition == null) {
if (this.isNoop()) {
// All the filters can be pushed down and the computing of this Filter
// is not needed.
return ValidationResult.ok
}
val substraitContext = new SubstraitContext
val operatorId = substraitContext.nextOperatorId(this.nodeName)
// Firstly, need to check if the Substrait plan for this operator can be successfully generated.
val relNode = getRelNode(
substraitContext,
remainingCondition,
child.output,
operatorId,
null,
validation = true)
val relNode =
getRelNode(substraitContext, cond, child.output, operatorId, null, validation = true)
// Then, validate the generated plan in native engine.
doNativeValidation(substraitContext, relNode)
}

override protected def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val remainingCondition = getRemainingCondition
val operatorId = context.nextOperatorId(this.nodeName)
if (remainingCondition == null) {
if (this.isNoop()) {
// The computing for this filter is not needed.
context.registerEmptyRelToOperator(operatorId)
// Since some columns' nullability will be removed after this filter, we need to update the
// outputAttributes of child context.
return TransformContext(childCtx.inputAttributes, output, childCtx.root)
}
val currRel = getRelNode(
context,
remainingCondition,
child.output,
operatorId,
childCtx.root,
validation = false)
val currRel =
getRelNode(context, cond, child.output, operatorId, childCtx.root, validation = false)
assert(currRel != null, "Filter rel should be valid.")
TransformContext(childCtx.outputAttributes, output, currRel)
}
Expand All @@ -155,7 +141,7 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
object FilterExecTransformerBase {
implicit class FilterExecTransformerBaseImplicits(filter: FilterExecTransformerBase) {
def isNoop(): Boolean = {
filter.getRemainingCondition == null
filter.cond == Literal.TrueLiteral
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}

import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, Literal, NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -347,9 +347,7 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil {
}

// Filter transformation.
case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil {
import OffloadOthers._
private val replace = new ReplaceSingleNode()
case class OffloadFilter() extends OffloadSingleNode with PredicateHelper with LogLevelUtil {

override def offload(plan: SparkPlan): SparkPlan = plan match {
case filter: FilterExec =>
Expand All @@ -369,25 +367,27 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil {
if (FallbackTags.nonEmpty(filter)) {
return filter
}

// FIXME: Filter push-down should be better done by Vanilla Spark's planner or by
// a individual rule.
// Push down the left conditions in Filter into FileSourceScan.
val newChild: SparkPlan = filter.child match {
case scan @ (_: FileSourceScanExec | _: BatchScanExec) =>
if (FallbackTags.maybeOffloadable(scan)) {
val newScan =
FilterHandler.pushFilterToScan(filter.condition, scan)
newScan match {
case ts: TransformSupport if ts.doValidate().isValid => ts
case _ => scan
}
} else scan
case _ => filter.child
}
val newFilter = pushFilterToScan(filter)
logDebug(s"Columnar Processing for ${filter.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genFilterExecTransformer(filter.condition, newChild)
.genFilterExecTransformer(newFilter.condition, newFilter.child)
}

// FIXME: Filter push-down should be better done by Vanilla Spark's planner or by
// a individual rule.
def pushFilterToScan(filter: FilterExec): FilterExec = filter.child match {
case scan @ (_: FileSourceScanExec | _: BatchScanExec) if FallbackTags.maybeOffloadable(scan) =>
FilterHandler.pushFilterToScan(filter.condition, scan) match {
case st: BasicScanExecTransformer if st.doValidate().isValid =>
val newCond = FilterHandler
.getRemainingFilters(st.filterExprs(), splitConjunctivePredicates(filter.condition))
.reduceLeftOption(And)
// TODO: Add rule to remove filter condition always evaluate to true.
.getOrElse(Literal.TrueLiteral)
FilterExec(newCond, st)
case _ => filter
}
case _ => filter
}
}

Expand Down

0 comments on commit 45cb2a4

Please sign in to comment.