diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 1108b8b3c5010..a21636251413a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -845,7 +845,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { // Let's make push down functionally same as vanilla Spark for now. sparkExecNode match { - case fileSourceScan: FileSourceScanExec + case fileSourceScan: FileSourceScanExecTransformerBase if isParquetFormat(fileSourceScan.relation.fileFormat) => PushDownUtil.removeNotSupportPushDownFilters( fileSourceScan.conf, diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 6204ab092a397..ff8400a042a31 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -65,6 +65,7 @@ private object VeloxRuleApi { injector.injectTransform(_ => TransformPreOverrides()) injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject()) injector.injectTransform(c => RewriteTransformer.apply(c.session)) + injector.injectTransform(_ => PushDownFilterToScan) injector.injectTransform(_ => PushDownInputFileExpression.PostOffload) injector.injectTransform(_ => EnsureLocalSortRequirements) injector.injectTransform(_ => EliminateLocalSort) @@ -107,6 +108,7 @@ private object VeloxRuleApi { injector.inject(_ => RemoveTransitions) injector.inject(_ => RemoveNativeWriteFilesSortAndProject()) injector.inject(c => RewriteTransformer.apply(c.session)) + injector.inject(_ => PushDownFilterToScan) injector.inject(_ => PushDownInputFileExpression.PostOffload) injector.inject(_ => EnsureLocalSortRequirements) injector.inject(_ => EliminateLocalSort) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index a55926d76d12d..6744b67374646 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.FileFormat -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} +import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric @@ -627,9 +627,9 @@ trait SparkPlanExecApi { }) } sparkExecNode match { - case fileSourceScan: FileSourceScanExec => + case fileSourceScan: FileSourceScanExecTransformerBase => getPushedFilter(fileSourceScan.dataFilters) - case batchScan: BatchScanExec => + case batchScan: BatchScanExecTransformerBase => batchScan.scan match { case fileScan: FileScan => getPushedFilter(fileScan.dataFilters) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala index 8e87baf5381d9..a76b49cfbb955 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala @@ -357,28 +357,4 @@ object FilterHandler extends PredicateHelper { */ def getRemainingFilters(scanFilters: Seq[Expression], filters: Seq[Expression]): Seq[Expression] = (filters.toSet -- scanFilters.toSet).toSeq - - // Separate and compare the filter conditions in Scan and Filter. - // Try to push down the remaining conditions in Filter into Scan. - def pushFilterToScan(condition: Expression, scan: SparkPlan): SparkPlan = - scan match { - case fileSourceScan: FileSourceScanExec => - val pushDownFilters = - BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter( - splitConjunctivePredicates(condition), - fileSourceScan) - ScanTransformerFactory.createFileSourceScanTransformer( - fileSourceScan, - allPushDownFilters = Some(pushDownFilters)) - case batchScan: BatchScanExec => - val pushDownFilters = - BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter( - splitConjunctivePredicates(condition), - batchScan) - ScanTransformerFactory.createBatchScanTransformer( - batchScan, - allPushDownFilters = Some(pushDownFilters)) - case other => - throw new GlutenNotSupportException(s"${other.getClass.toString} is not supported.") - } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala index 3bfa611ed531c..c2cc35dddfdeb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala @@ -34,7 +34,7 @@ object MiscColumnarRules { object TransformPreOverrides { def apply(): TransformPreOverrides = { TransformPreOverrides( - List(OffloadFilter()), + List(), List( OffloadOthers(), OffloadAggregate(), diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 6047789e6abe9..bbcdf24bef908 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -220,51 +220,6 @@ object OffloadJoin { } } -// Filter transformation. -case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { - import OffloadOthers._ - private val replace = new ReplaceSingleNode() - - override def offload(plan: SparkPlan): SparkPlan = plan match { - case filter: FilterExec => - genFilterExec(filter) - case other => other - } - - /** - * Generate a plan for filter. - * - * @param filter - * : the original Spark plan. - * @return - * the actually used plan for execution. - */ - private def genFilterExec(filter: FilterExec): SparkPlan = { - if (FallbackTags.nonEmpty(filter)) { - return filter - } - - // FIXME: Filter push-down should be better done by Vanilla Spark's planner or by - // a individual rule. - // Push down the left conditions in Filter into FileSourceScan. - val newChild: SparkPlan = filter.child match { - case scan @ (_: FileSourceScanExec | _: BatchScanExec) => - if (FallbackTags.maybeOffloadable(scan)) { - val newScan = - FilterHandler.pushFilterToScan(filter.condition, scan) - newScan match { - case ts: TransformSupport if ts.doValidate().ok() => ts - case _ => scan - } - } else scan - case _ => filter.child - } - logDebug(s"Columnar Processing for ${filter.getClass} is currently supported.") - BackendsApiManager.getSparkPlanExecApiInstance - .genFilterExecTransformer(filter.condition, newChild) - } -} - // Other transformations. case class OffloadOthers() extends OffloadSingleNode with LogLevelUtil { import OffloadOthers._ @@ -297,6 +252,10 @@ object OffloadOthers { case plan: CoalesceExec => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") ColumnarCoalesceExec(plan.numPartitions, plan.child) + case plan: FilterExec => + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + BackendsApiManager.getSparkPlanExecApiInstance + .genFilterExecTransformer(plan.condition, plan.child) case plan: ProjectExec => val columnarChild = plan.child logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala new file mode 100644 index 0000000000000..7b1f552dff7a1 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension.columnar + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.{BatchScanExecTransformerBase, FileSourceScanExecTransformer, FilterExecTransformerBase} + +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution._ + +/** + * Vanilla spark just push down part of filter condition into scan, however gluten can push down all + * filters. + */ +object PushDownFilterToScan extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan.transformUp { + case filter: FilterExecTransformerBase => + filter.child match { + case fileScan: FileSourceScanExecTransformer => + val pushDownFilters = + BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter( + splitConjunctivePredicates(filter.cond), + fileScan) + if (pushDownFilters.size > fileScan.dataFilters.size) { + val newScan = fileScan.copy(dataFilters = pushDownFilters) + if (newScan.doValidate().ok()) { + filter.withNewChildren(Seq(newScan)) + } else { + filter + } + } else { + filter + } + case batchScan: BatchScanExecTransformerBase => + val pushDownFilters = + BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter( + splitConjunctivePredicates(filter.cond), + batchScan) + if (pushDownFilters.size > 0) { + val newScan = batchScan + newScan.setPushDownFilters(pushDownFilters) + if (newScan.doValidate().ok()) { + filter.withNewChildren(Seq(newScan)) + } else { + filter + } + } else { + filter + } + case _ => filter + } + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala index 007f18fca40bd..bde1a75c2443f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala @@ -53,9 +53,7 @@ case class EnumeratedTransform(session: SparkSession, outputsColumnar: Boolean) .build() private val rules = List( - new PushFilterToScan(validator), - RemoveSort, - RemoveFilter + RemoveSort ) // TODO: Should obey ReplaceSingleNode#applyScanNotTransformable to select diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala deleted file mode 100644 index 4070a0a586120..0000000000000 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.extension.columnar.enumerated - -import org.apache.gluten.execution.{FilterHandler, TransformSupport} -import org.apache.gluten.extension.columnar.validator.Validator -import org.apache.gluten.ras.path.Pattern._ -import org.apache.gluten.ras.path.Pattern.Matchers._ -import org.apache.gluten.ras.rule.{RasRule, Shape} -import org.apache.gluten.ras.rule.Shapes._ - -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec - -// TODO: Match on Vanilla filter + Gluten scan. -class PushFilterToScan(validator: Validator) extends RasRule[SparkPlan] { - override def shift(node: SparkPlan): Iterable[SparkPlan] = node match { - case FilterAndScan(filter, scan) => - validator.validate(scan) match { - case Validator.Failed(reason) => - List.empty - case Validator.Passed => - val newScan = - FilterHandler.pushFilterToScan(filter.condition, scan) - newScan match { - case ts: TransformSupport if ts.doValidate().ok() => - List(filter.withNewChildren(List(ts))) - case _ => - List.empty - } - } - case _ => - List.empty - } - - override def shape(): Shape[SparkPlan] = - anyOf( - pattern( - branch[SparkPlan]( - clazz(classOf[FilterExec]), - leaf( - or(clazz(classOf[FileSourceScanExec]), clazz(classOf[BatchScanExec])) - ) - ).build()), - pattern( - branch[SparkPlan]( - clazz(classOf[FilterExec]), - branch( - clazz(classOf[ColumnarToRowTransition]), - leaf( - or(clazz(classOf[FileSourceScanExec]), clazz(classOf[BatchScanExec])) - ) - ) - ).build()) - ) - - private object FilterAndScan { - def unapply(node: SparkPlan): Option[(FilterExec, SparkPlan)] = node match { - case f @ FilterExec(cond, ColumnarToRowExec(scan)) => - ensureScan(scan) - Some(f, scan) - case f @ FilterExec(cond, scan) => - ensureScan(scan) - Some(f, scan) - case _ => - None - } - - private def ensureScan(node: SparkPlan): Unit = { - assert(node.isInstanceOf[FileSourceScanExec] || node.isInstanceOf[BatchScanExec]) - } - } -} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala deleted file mode 100644 index 8b8441e8d6ce7..0000000000000 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveFilter.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.extension.columnar.enumerated - -import org.apache.gluten.execution._ -import org.apache.gluten.metrics.MetricsUpdater -import org.apache.gluten.ras.path.Pattern._ -import org.apache.gluten.ras.path.Pattern.Matchers._ -import org.apache.gluten.ras.rule.{RasRule, Shape} -import org.apache.gluten.ras.rule.Shapes._ -import org.apache.gluten.substrait.SubstraitContext - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.vectorized.ColumnarBatch - -// Removes Gluten filter operator if its no-op. Typically a Gluten filter is no-op when it -// pushes all of its conditions into the child scan. -// -// The rule is needed in RAS since our cost model treats all filter + scan plans with constant cost -// because the pushed filter is not considered in the model. Removing the filter will make -// optimizer choose a single scan as the winner sub-plan since a single scan's cost is lower than -// filter + scan. -object RemoveFilter extends RasRule[SparkPlan] { - override def shift(node: SparkPlan): Iterable[SparkPlan] = { - val filter = node.asInstanceOf[FilterExecTransformerBase] - if (filter.isNoop()) { - if (filter.output != filter.child.output) { - val out = NoopFilter(filter.child, filter.output) - out.copyTagsFrom(filter) - return List(out) - } - return List(filter.child) - } - List.empty - } - - override def shape(): Shape[SparkPlan] = - pattern( - branch[SparkPlan]( - clazz(classOf[FilterExecTransformerBase]), - leaf(clazz(classOf[BasicScanExecTransformer])) - ).build()) - - // A noop filter placeholder that indicates that all conditions were pushed down to scan. - // - // This operator has zero cost in cost model to avoid planner from choosing the - // original filter-scan that doesn't have all conditions pushed down to scan. - // - // We cannot simply remove the filter to let planner choose the pushed scan since by vanilla - // Spark's definition the filter may have different output nullability than scan. So - // we have to keep this empty filter to let the optimized tree have the identical output schema - // with the original tree. If we simply remove the filter, possible UBs might be caused. For - // example, redundant broadcast exchanges may be added by EnsureRequirements because the - // broadcast join detects that its join keys' nullabilities have been changed. Then AQE - // re-optimization could be broken by ValidateSparkPlan so that AQE could completely - // have no effect as if it's off. This case can be observed by explicitly setting a higher - // AQE logger level to make sure the validation log doesn't get suppressed, e.g., - // spark.sql.adaptive.logLevel=ERROR. - case class NoopFilter(override val child: SparkPlan, override val output: Seq[Attribute]) - extends UnaryTransformSupport { - override def metricsUpdater(): MetricsUpdater = MetricsUpdater.None - override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(newChild) - override def outputPartitioning: Partitioning = child.outputPartitioning - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override protected def doTransform(context: SubstraitContext): TransformContext = - child.asInstanceOf[TransformSupport].transform(context) - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = child.executeColumnar() - } -} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala index 3b631872caa66..b5946d2ddabbd 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.planner.cost -import org.apache.gluten.extension.columnar.enumerated.RemoveFilter import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, RowToColumnarLike} import org.apache.gluten.utils.PlanUtil @@ -29,9 +28,6 @@ class LegacyCostModel extends LongCostModel { // much as possible. override def selfLongCostOf(node: SparkPlan): Long = { node match { - case _: RemoveFilter.NoopFilter => - // To make planner choose the tree that has applied rule PushFilterToScan. - 0L case ColumnarToRowExec(_) => 10L case RowToColumnarExec(_) => 10L case ColumnarToRowLike(_) => 10L diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala index d621c3010c160..3fff7068afe84 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.planner.cost -import org.apache.gluten.extension.columnar.enumerated.RemoveFilter import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, RowToColumnarLike} import org.apache.gluten.utils.PlanUtil @@ -27,9 +26,6 @@ class RoughCostModel extends LongCostModel { override def selfLongCostOf(node: SparkPlan): Long = { node match { - case _: RemoveFilter.NoopFilter => - // To make planner choose the tree that has applied rule PushFilterToScan. - 0L case ProjectExec(projectList, _) if projectList.forall(isCheapExpression) => // Make trivial ProjectExec has the same cost as ProjectExecTransform to reduce unnecessary // c2r and r2c.