Skip to content

Commit

Permalink
remove redundant function
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun authored and hengzhen.sq committed Sep 11, 2024
1 parent 9aaf7b5 commit 0c6cf6d
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,11 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.SparkPlan

case class FilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {
// FIXME: Should use field "condition" to store the actual executed filter expressions.
// To make optimization easier (like to remove filter when it actually does nothing)
override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer =
copy(child = newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,12 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.SparkPlan

case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {

override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,12 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.SparkPlan

case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {

override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,12 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.SparkPlan

case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {

override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -36,10 +36,6 @@ case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)

private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(
metrics,
Expand Down Expand Up @@ -80,24 +76,6 @@ case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
}
}

override protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
condition
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child:

private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)]

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,25 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP

override protected def outputExpressions: Seq[NamedExpression] = child.output

protected def getRemainingCondition: Expression
// FIXME: Should use field "condition" to store the actual executed filter expressions.
// To make optimization easier (like to remove filter when it actually does nothing)
protected def getRemainingCondition: Expression = {
val scanFilters = child match {
// Get the filters including the manually pushed down ones.
case basicScanExecTransformer: BasicScanExecTransformer =>
basicScanExecTransformer.filterExprs()
// For fallback scan, we need to keep original filter.
case _ =>
Seq.empty[Expression]
}
if (scanFilters.isEmpty) {
cond
} else {
val remainingFilters =
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(cond))
remainingFilters.reduceLeftOption(And).orNull
}
}

override protected def doValidateInternal(): ValidationResult = {
val remainingCondition = getRemainingCondition
Expand Down

0 comments on commit 0c6cf6d

Please sign in to comment.