From 45cb2a4abb670f37a53d06d318a188d21f150cb3 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Wed, 3 Jul 2024 11:14:32 +0800 Subject: [PATCH] [CORE] Condition store the actual executed filter expressions --- .../execution/CHFilterExecTransformer.scala | 18 -------- .../execution/FilterExecTransformer.scala | 21 +--------- .../gluten/execution/TestOperator.scala | 12 ------ .../BasicPhysicalOperatorTransformer.scala | 28 ++++--------- .../columnar/OffloadSingleNode.scala | 42 +++++++++---------- 5 files changed, 29 insertions(+), 92 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala index 79ddf2942c3e..c98026e8d5b7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala @@ -21,23 +21,6 @@ import org.apache.spark.sql.execution.SparkPlan case class CHFilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanTransformer: BasicScanExecTransformer => - basicScanTransformer.filterExprs() - // In ColumnarGuardRules, the child is still row-based. Need to get the original filters. - case _ => - FilterHandler.getScanFilters(child) - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } override protected def withNewChildInternal(newChild: SparkPlan): CHFilterExecTransformer = copy(child = newChild) @@ -45,7 +28,6 @@ case class CHFilterExecTransformer(condition: Expression, child: SparkPlan) case class FilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - override protected def getRemainingCondition: Expression = condition override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer = copy(child = newChild) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala index daa08498bee3..fdfe1d7ea917 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala @@ -16,30 +16,11 @@ */ package org.apache.gluten.execution -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.SparkPlan case class FilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - // FIXME: Should use field "condition" to store the actual executed filter expressions. - // To make optimization easier (like to remove filter when it actually does nothing) - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() - // For fallback scan, we need to keep original filter. - case _ => - Seq.empty[Expression] - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer = copy(child = newChild) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index c010b9128ce1..bd7e76dba2b6 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -1895,18 +1895,6 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla test("fix non-deterministic filter executed twice when push down to scan") { val df = sql("select * from lineitem where rand() <= 0.5") - // plan check - val plan = df.queryExecution.executedPlan - val scans = plan.collect { case scan: FileSourceScanExecTransformer => scan } - val filters = plan.collect { case filter: FilterExecTransformer => filter } - assert(scans.size == 1) - assert(filters.size == 1) - assert(scans(0).dataFilters.size == 1) - val remainingFilters = FilterHandler.getRemainingFilters( - scans(0).dataFilters, - splitConjunctivePredicates(filters(0).condition)) - assert(remainingFilters.size == 0) - // result length check, table lineitem has 60,000 rows val resultLength = df.collect().length assert(resultLength > 25000 && resultLength < 35000) diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala index 0b792d52e056..aa3affca2067 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala @@ -106,11 +106,8 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP override protected def outputExpressions: Seq[NamedExpression] = child.output - protected def getRemainingCondition: Expression - override protected def doValidateInternal(): ValidationResult = { - val remainingCondition = getRemainingCondition - if (remainingCondition == null) { + if (this.isNoop()) { // All the filters can be pushed down and the computing of this Filter // is not needed. return ValidationResult.ok @@ -118,35 +115,24 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP val substraitContext = new SubstraitContext val operatorId = substraitContext.nextOperatorId(this.nodeName) // Firstly, need to check if the Substrait plan for this operator can be successfully generated. - val relNode = getRelNode( - substraitContext, - remainingCondition, - child.output, - operatorId, - null, - validation = true) + val relNode = + getRelNode(substraitContext, cond, child.output, operatorId, null, validation = true) // Then, validate the generated plan in native engine. doNativeValidation(substraitContext, relNode) } override protected def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) - val remainingCondition = getRemainingCondition val operatorId = context.nextOperatorId(this.nodeName) - if (remainingCondition == null) { + if (this.isNoop()) { // The computing for this filter is not needed. context.registerEmptyRelToOperator(operatorId) // Since some columns' nullability will be removed after this filter, we need to update the // outputAttributes of child context. return TransformContext(childCtx.inputAttributes, output, childCtx.root) } - val currRel = getRelNode( - context, - remainingCondition, - child.output, - operatorId, - childCtx.root, - validation = false) + val currRel = + getRelNode(context, cond, child.output, operatorId, childCtx.root, validation = false) assert(currRel != null, "Filter rel should be valid.") TransformContext(childCtx.outputAttributes, output, currRel) } @@ -155,7 +141,7 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP object FilterExecTransformerBase { implicit class FilterExecTransformerBaseImplicits(filter: FilterExecTransformerBase) { def isNoop(): Boolean = { - filter.getRemainingCondition == null + filter.cond == Literal.TrueLiteral } } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 7a4222b5cb38..28c1f31d3fcd 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -26,7 +26,7 @@ import org.apache.gluten.utils.{LogLevelUtil, PlanUtil} import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, Literal, NamedExpression, PredicateHelper} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution._ @@ -347,9 +347,7 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { } // Filter transformation. -case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { - import OffloadOthers._ - private val replace = new ReplaceSingleNode() +case class OffloadFilter() extends OffloadSingleNode with PredicateHelper with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { case filter: FilterExec => @@ -369,25 +367,27 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { if (FallbackTags.nonEmpty(filter)) { return filter } - - // FIXME: Filter push-down should be better done by Vanilla Spark's planner or by - // a individual rule. - // Push down the left conditions in Filter into FileSourceScan. - val newChild: SparkPlan = filter.child match { - case scan @ (_: FileSourceScanExec | _: BatchScanExec) => - if (FallbackTags.maybeOffloadable(scan)) { - val newScan = - FilterHandler.pushFilterToScan(filter.condition, scan) - newScan match { - case ts: TransformSupport if ts.doValidate().isValid => ts - case _ => scan - } - } else scan - case _ => filter.child - } + val newFilter = pushFilterToScan(filter) logDebug(s"Columnar Processing for ${filter.getClass} is currently supported.") BackendsApiManager.getSparkPlanExecApiInstance - .genFilterExecTransformer(filter.condition, newChild) + .genFilterExecTransformer(newFilter.condition, newFilter.child) + } + + // FIXME: Filter push-down should be better done by Vanilla Spark's planner or by + // a individual rule. + def pushFilterToScan(filter: FilterExec): FilterExec = filter.child match { + case scan @ (_: FileSourceScanExec | _: BatchScanExec) if FallbackTags.maybeOffloadable(scan) => + FilterHandler.pushFilterToScan(filter.condition, scan) match { + case st: BasicScanExecTransformer if st.doValidate().isValid => + val newCond = FilterHandler + .getRemainingFilters(st.filterExprs(), splitConjunctivePredicates(filter.condition)) + .reduceLeftOption(And) + // TODO: Add rule to remove filter condition always evaluate to true. + .getOrElse(Literal.TrueLiteral) + FilterExec(newCond, st) + case _ => filter + } + case _ => filter } }