diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala index daa08498bee36..fdfe1d7ea9178 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala @@ -16,30 +16,11 @@ */ package org.apache.gluten.execution -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.SparkPlan case class FilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - // FIXME: Should use field "condition" to store the actual executed filter expressions. - // To make optimization easier (like to remove filter when it actually does nothing) - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() - // For fallback scan, we need to keep original filter. - case _ => - Seq.empty[Expression] - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer = copy(child = newChild) diff --git a/gluten-delta/src/main/delta-20/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/delta-20/org/apache/gluten/execution/DeltaFilterExecTransformer.scala index a9aa8ac15660c..ca4665c0d0cb8 100644 --- a/gluten-delta/src/main/delta-20/org/apache/gluten/execution/DeltaFilterExecTransformer.scala +++ b/gluten-delta/src/main/delta-20/org/apache/gluten/execution/DeltaFilterExecTransformer.scala @@ -16,30 +16,12 @@ */ package org.apache.gluten.execution -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.SparkPlan case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() - // For fallback scan, we need to keep original filter. - case _ => - Seq.empty[Expression] - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } - override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer = copy(child = newChild) } diff --git a/gluten-delta/src/main/delta-23/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/delta-23/org/apache/gluten/execution/DeltaFilterExecTransformer.scala index a9aa8ac15660c..ca4665c0d0cb8 100644 --- a/gluten-delta/src/main/delta-23/org/apache/gluten/execution/DeltaFilterExecTransformer.scala +++ b/gluten-delta/src/main/delta-23/org/apache/gluten/execution/DeltaFilterExecTransformer.scala @@ -16,30 +16,12 @@ */ package org.apache.gluten.execution -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.SparkPlan case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() - // For fallback scan, we need to keep original filter. - case _ => - Seq.empty[Expression] - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } - override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer = copy(child = newChild) } diff --git a/gluten-delta/src/main/delta-24/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/delta-24/org/apache/gluten/execution/DeltaFilterExecTransformer.scala index a9aa8ac15660c..ca4665c0d0cb8 100644 --- a/gluten-delta/src/main/delta-24/org/apache/gluten/execution/DeltaFilterExecTransformer.scala +++ b/gluten-delta/src/main/delta-24/org/apache/gluten/execution/DeltaFilterExecTransformer.scala @@ -16,30 +16,12 @@ */ package org.apache.gluten.execution -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.SparkPlan case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() - // For fallback scan, we need to keep original filter. - case _ => - Seq.empty[Expression] - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } - override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer = copy(child = newChild) } diff --git a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala index c6d6dc06ad23a..0c8cd54902c29 100644 --- a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala +++ b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala @@ -24,7 +24,7 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder import org.apache.gluten.substrait.rel.{RelBuilder, RelNode} -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.delta.metric.IncrementMetric import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetric @@ -36,10 +36,6 @@ case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan) private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty - // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. - @transient override lazy val metrics = - BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext) - override def metricsUpdater(): MetricsUpdater = BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater( metrics, @@ -80,24 +76,6 @@ case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan) } } - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() - // For fallback scan, we need to keep original filter. - case _ => - Seq.empty[Expression] - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } - override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer = copy(child = newChild) } diff --git a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala index 34590854bccfe..2d9b15cb3c243 100644 --- a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala +++ b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala @@ -37,10 +37,6 @@ case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)] - // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. - @transient override lazy val metrics = - BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetrics(sparkContext) - override def metricsUpdater(): MetricsUpdater = BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater( metrics, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala index fce5ab39d2556..b5357e92b026e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala @@ -106,7 +106,25 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP override protected def outputExpressions: Seq[NamedExpression] = child.output - protected def getRemainingCondition: Expression + // FIXME: Should use field "condition" to store the actual executed filter expressions. + // To make optimization easier (like to remove filter when it actually does nothing) + protected def getRemainingCondition: Expression = { + val scanFilters = child match { + // Get the filters including the manually pushed down ones. + case basicScanExecTransformer: BasicScanExecTransformer => + basicScanExecTransformer.filterExprs() + // For fallback scan, we need to keep original filter. + case _ => + Seq.empty[Expression] + } + if (scanFilters.isEmpty) { + cond + } else { + val remainingFilters = + FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(cond)) + remainingFilters.reduceLeftOption(And).orNull + } + } override protected def doValidateInternal(): ValidationResult = { val remainingCondition = getRemainingCondition