Skip to content

Commit

Permalink
remove BatchScanExec in filter pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 1, 2023
1 parent 495dea1 commit d879fd0
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.substrait.`type`.TypeBuilder
import io.glutenproject.substrait.SubstraitContext
Expand Down Expand Up @@ -415,7 +414,7 @@ object FilterHandler {

// Separate and compare the filter conditions in Scan and Filter.
// Push down the left conditions in Filter into Scan.
def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan =
def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan =
filter.child match {
case fileSourceScan: FileSourceScanExec =>
val leftFilters =
Expand All @@ -424,23 +423,6 @@ object FilterHandler {
fileSourceScan,
reuseSubquery,
extraFilters = leftFilters)
case batchScan: BatchScanExec =>
if (ScanTransformerFactory.supportedBatchScan(batchScan.scan)) {
ScanTransformerFactory.createBatchScanTransformer(batchScan, reuseSubquery)
} else {
if (batchScan.runtimeFilters.isEmpty) {
throw new UnsupportedOperationException(
s"${batchScan.scan.getClass.toString} is not supported.")
} else {
// IF filter expressions aren't empty, we need to transform the inner operators.
val newSource = batchScan.copy(runtimeFilters = ExpressionConverter
.transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery))
TransformHints.tagNotTransformable(
newSource,
"The scan in BatchScanExec is not a FileScan")
newSource
}
}
case other =>
throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,30 +136,20 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
private def genFilterExec(plan: FilterExec): SparkPlan = {
// FIXME: Filter push-down should be better done by Vanilla Spark's planner or by
// a individual rule.
// Push down the left conditions in Filter into Scan.
val newChild: SparkPlan =
if (
plan.child.isInstanceOf[FileSourceScanExec] ||
plan.child.isInstanceOf[BatchScanExec]
) {
TransformHints.getHint(plan.child) match {
// Push down the left conditions in Filter into FileSourceScan.
val newChild: SparkPlan = plan.child match {
case scan: FileSourceScanExec =>
TransformHints.getHint(scan) match {
case TRANSFORM_SUPPORTED() =>
val newScan = FilterHandler.applyFilterPushdownToScan(plan, reuseSubquery)
newScan match {
case ts: TransformSupport =>
if (ts.doValidate().isValid) {
ts
} else {
replaceWithTransformerPlan(plan.child)
}
case p: SparkPlan => p
case ts: TransformSupport if ts.doValidate().isValid => ts
case _ => replaceWithTransformerPlan(scan)
}
case _ =>
replaceWithTransformerPlan(plan.child)
case _ => replaceWithTransformerPlan(scan)
}
} else {
replaceWithTransformerPlan(plan.child)
}
case _ => replaceWithTransformerPlan(plan.child)
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genFilterExecTransformer(plan.condition, newChild)
Expand Down Expand Up @@ -544,18 +534,27 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
newSource
}
case plan: BatchScanExec =>
val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery)

val validationResult = transformer.doValidate()
if (validationResult.isValid) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
transformer
if (ScanTransformerFactory.supportedBatchScan(plan.scan)) {
val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery)
val validationResult = transformer.doValidate()
if (validationResult.isValid) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
transformer
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.")
val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters)
TransformHints.tagNotTransformable(newSource, validationResult.reason.get)
newSource
}
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.")
val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters)
TransformHints.tagNotTransformable(newSource, validationResult.reason.get)
// If filter expressions aren't empty, we need to transform the inner operators,
// and fallback the BatchScanExec itself.
val newSource = plan.copy(runtimeFilters = ExpressionConverter
.transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery))
TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.")
newSource
}

case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
// TODO: Add DynamicPartitionPruningHiveScanSuite.scala
val newPartitionFilters: Seq[Expression] = ExpressionConverter.transformDynamicPruningExpr(
Expand Down

0 comments on commit d879fd0

Please sign in to comment.