From d879fd097dada9aba9c21046b01b1104f916188a Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Fri, 1 Dec 2023 23:57:00 +0800 Subject: [PATCH] remove BatchScanExec in filter pushdown --- .../BasicPhysicalOperatorTransformer.scala | 20 +------ .../extension/ColumnarOverrides.scala | 55 +++++++++---------- 2 files changed, 28 insertions(+), 47 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index d2406bc06cb2..29654d99d709 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -19,7 +19,6 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer} import io.glutenproject.extension.{GlutenPlan, ValidationResult} -import io.glutenproject.extension.columnar.TransformHints import io.glutenproject.metrics.MetricsUpdater import io.glutenproject.substrait.`type`.TypeBuilder import io.glutenproject.substrait.SubstraitContext @@ -415,7 +414,7 @@ object FilterHandler { // Separate and compare the filter conditions in Scan and Filter. // Push down the left conditions in Filter into Scan. - def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan = + def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan = filter.child match { case fileSourceScan: FileSourceScanExec => val leftFilters = @@ -424,23 +423,6 @@ object FilterHandler { fileSourceScan, reuseSubquery, extraFilters = leftFilters) - case batchScan: BatchScanExec => - if (ScanTransformerFactory.supportedBatchScan(batchScan.scan)) { - ScanTransformerFactory.createBatchScanTransformer(batchScan, reuseSubquery) - } else { - if (batchScan.runtimeFilters.isEmpty) { - throw new UnsupportedOperationException( - s"${batchScan.scan.getClass.toString} is not supported.") - } else { - // IF filter expressions aren't empty, we need to transform the inner operators. - val newSource = batchScan.copy(runtimeFilters = ExpressionConverter - .transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery)) - TransformHints.tagNotTransformable( - newSource, - "The scan in BatchScanExec is not a FileScan") - newSource - } - } case other => throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.") } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 8c9248695bb8..412b29085e15 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -136,30 +136,20 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) private def genFilterExec(plan: FilterExec): SparkPlan = { // FIXME: Filter push-down should be better done by Vanilla Spark's planner or by // a individual rule. - // Push down the left conditions in Filter into Scan. - val newChild: SparkPlan = - if ( - plan.child.isInstanceOf[FileSourceScanExec] || - plan.child.isInstanceOf[BatchScanExec] - ) { - TransformHints.getHint(plan.child) match { + // Push down the left conditions in Filter into FileSourceScan. + val newChild: SparkPlan = plan.child match { + case scan: FileSourceScanExec => + TransformHints.getHint(scan) match { case TRANSFORM_SUPPORTED() => val newScan = FilterHandler.applyFilterPushdownToScan(plan, reuseSubquery) newScan match { - case ts: TransformSupport => - if (ts.doValidate().isValid) { - ts - } else { - replaceWithTransformerPlan(plan.child) - } - case p: SparkPlan => p + case ts: TransformSupport if ts.doValidate().isValid => ts + case _ => replaceWithTransformerPlan(scan) } - case _ => - replaceWithTransformerPlan(plan.child) + case _ => replaceWithTransformerPlan(scan) } - } else { - replaceWithTransformerPlan(plan.child) - } + case _ => replaceWithTransformerPlan(plan.child) + } logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") BackendsApiManager.getSparkPlanExecApiInstance .genFilterExecTransformer(plan.condition, newChild) @@ -544,18 +534,27 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) newSource } case plan: BatchScanExec => - val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery) - - val validationResult = transformer.doValidate() - if (validationResult.isValid) { - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - transformer + if (ScanTransformerFactory.supportedBatchScan(plan.scan)) { + val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery) + val validationResult = transformer.doValidate() + if (validationResult.isValid) { + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + transformer + } else { + logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") + val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters) + TransformHints.tagNotTransformable(newSource, validationResult.reason.get) + newSource + } } else { - logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters) - TransformHints.tagNotTransformable(newSource, validationResult.reason.get) + // If filter expressions aren't empty, we need to transform the inner operators, + // and fallback the BatchScanExec itself. + val newSource = plan.copy(runtimeFilters = ExpressionConverter + .transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery)) + TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.") newSource } + case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => // TODO: Add DynamicPartitionPruningHiveScanSuite.scala val newPartitionFilters: Seq[Expression] = ExpressionConverter.transformDynamicPruningExpr(