From b12127aef9662ecbce4a3fc01ce74771cf69c8e7 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Fri, 6 Dec 2024 13:18:04 +0800 Subject: [PATCH 1/4] [CORE] Push partial filters to offload scan when filter need fallback --- .../execution/IcebergScanTransformer.scala | 2 -- .../execution/BasicScanExecTransformer.scala | 8 ++----- .../execution/BatchScanExecTransformer.scala | 22 +++++++++++-------- .../FileSourceScanExecTransformer.scala | 8 ++++++- .../execution/ScanTransformerFactory.scala | 9 +++++++- .../expression/ExpressionConverter.scala | 18 +++++++++++++++ .../columnar/PushDownFilterToScan.scala | 2 +- 7 files changed, 49 insertions(+), 20 deletions(-) diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 56041a6a99c1..63ab9eb206e5 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -50,8 +50,6 @@ case class IcebergScanTransformer( IcebergScanTransformer.supportsBatchScan(scan) } - override def filterExprs(): Seq[Expression] = pushdownFilters.getOrElse(Seq.empty) - override lazy val getPartitionSchema: StructType = GlutenIcebergSourceUtil.getReadPartitionSchema(scan) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 73ed35e7190b..e869107477be 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -30,7 +30,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer -import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{StringType, StructField, StructType} import com.google.protobuf.StringValue import io.substrait.proto.NamedStruct @@ -131,11 +131,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource }.asJava // Will put all filter expressions into an AND expression val transformer = filterExprs() - .map { - case ar: AttributeReference if ar.dataType == BooleanType => - EqualNullSafe(ar, Literal.TrueLiteral) - case e => e - } + .map(ExpressionConverter.replaceAttributeReference) .reduceLeftOption(And) .map(ExpressionConverter.replaceWithExpressionTransformer(_, output)) val filterNodes = transformer.map(_.doTransform(context.registeredFunction)) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index 55777c11c1bb..29bf1dcab6d8 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -17,7 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.exception.GlutenNotSupportException +import org.apache.gluten.expression.ExpressionConverter import org.apache.gluten.extension.ValidationResult import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.sql.shims.SparkShimLoader @@ -101,18 +101,22 @@ abstract class BatchScanExecTransformerBase( // class. Otherwise, we will encounter an issue where makeCopy cannot find a constructor // with the corresponding number of parameters. // The workaround is to add a mutable list to pass in pushdownFilters. - protected var pushdownFilters: Option[Seq[Expression]] = None + protected var pushdownFilters: Seq[Expression] = scan match { + case fileScan: FileScan => + fileScan.dataFilters.filter { + expr => + ExpressionConverter.canReplaceWithExpressionTransformer( + ExpressionConverter.replaceAttributeReference(expr), + output) + } + case _ => Seq.empty + } def setPushDownFilters(filters: Seq[Expression]): Unit = { - pushdownFilters = Some(filters) + pushdownFilters = filters } - override def filterExprs(): Seq[Expression] = scan match { - case fileScan: FileScan => - pushdownFilters.getOrElse(fileScan.dataFilters) - case _ => - throw new GlutenNotSupportException(s"${scan.getClass.toString} is not supported") - } + override def filterExprs(): Seq[Expression] = pushdownFilters override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index 7f3c6d4f9f47..a113221d2f98 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.expression.ExpressionConverter import org.apache.gluten.extension.ValidationResult import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.sql.shims.SparkShimLoader @@ -102,7 +103,12 @@ abstract class FileSourceScanExecTransformerBase( .genFileSourceScanTransformerMetrics(sparkContext) .filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias - override def filterExprs(): Seq[Expression] = dataFiltersInScan + override def filterExprs(): Seq[Expression] = dataFiltersInScan.filter { + expr => + ExpressionConverter.canReplaceWithExpressionTransformer( + ExpressionConverter.replaceAttributeReference(expr), + output) + } override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index 745c895688c9..194288de5a79 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.execution +import org.apache.gluten.expression.ExpressionConverter import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution.FileSourceScanExec @@ -41,6 +42,12 @@ object ScanTransformerFactory { .asInstanceOf[DataSourceScanTransformerRegister] .createDataSourceTransformer(scanExec) case _ => + val dataFilters = scanExec.dataFilters.filter { + expr => + ExpressionConverter.canReplaceWithExpressionTransformer( + ExpressionConverter.replaceAttributeReference(expr), + scanExec.output) + } FileSourceScanExecTransformer( scanExec.relation, scanExec.output, @@ -48,7 +55,7 @@ object ScanTransformerFactory { scanExec.partitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, - scanExec.dataFilters, + dataFilters, scanExec.tableIdentifier, scanExec.disableBucketedScan ) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index f1bf2d00d0ab..bbcf256b5d2c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -55,6 +55,24 @@ object ExpressionConverter extends SQLConfHelper with Logging { replaceWithExpressionTransformer0(expr, attributeSeq, expressionsMap) } + def canReplaceWithExpressionTransformer( + expr: Expression, + attributeSeq: Seq[Attribute]): Boolean = { + val expressionsMap = ExpressionMappings.expressionsMap + try { + replaceWithExpressionTransformer0(expr, attributeSeq, expressionsMap) + true + } catch { + case _: Exception => false + } + } + + def replaceAttributeReference(expr: Expression): Expression = expr match { + case ar: AttributeReference if ar.dataType == BooleanType => + EqualNullSafe(ar, Literal.TrueLiteral) + case e => e + } + private def replacePythonUDFWithExpressionTransformer( udf: PythonUDF, attributeSeq: Seq[Attribute], diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala index 3ff098ae4b41..e0da91dd7c49 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala @@ -50,7 +50,7 @@ object PushDownFilterToScan extends Rule[SparkPlan] with PredicateHelper { // If BatchScanExecTransformerBase's parent is filter, pushdownFilters can't be None. batchScan.setPushDownFilters(Seq.empty) val newScan = batchScan - if (pushDownFilters.size > 0) { + if (pushDownFilters.nonEmpty) { newScan.setPushDownFilters(pushDownFilters) if (newScan.doValidate().ok()) { filter.withNewChildren(Seq(newScan)) From 41855f9691aa919a799edbf0682092ef5c0228e8 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Fri, 6 Dec 2024 15:13:27 +0800 Subject: [PATCH 2/4] update --- .../apache/gluten/execution/ScanTransformerFactory.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index 194288de5a79..745c895688c9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.execution -import org.apache.gluten.expression.ExpressionConverter import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution.FileSourceScanExec @@ -42,12 +41,6 @@ object ScanTransformerFactory { .asInstanceOf[DataSourceScanTransformerRegister] .createDataSourceTransformer(scanExec) case _ => - val dataFilters = scanExec.dataFilters.filter { - expr => - ExpressionConverter.canReplaceWithExpressionTransformer( - ExpressionConverter.replaceAttributeReference(expr), - scanExec.output) - } FileSourceScanExecTransformer( scanExec.relation, scanExec.output, @@ -55,7 +48,7 @@ object ScanTransformerFactory { scanExec.partitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, - dataFilters, + scanExec.dataFilters, scanExec.tableIdentifier, scanExec.disableBucketedScan ) From bdf62b7d3a559df80305c021da28e558ae9ad816 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Fri, 6 Dec 2024 15:25:51 +0800 Subject: [PATCH 3/4] add ut --- .../gluten/execution/VeloxScanSuite.scala | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala index a4f16ecc3c8f..51a5925b0658 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala @@ -24,6 +24,7 @@ import org.apache.gluten.utils.VeloxFileSystemValidationJniWrapper import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.GreaterThan import org.apache.spark.sql.execution.ScalarSubquery +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class VeloxScanSuite extends VeloxWholeStageTransformerSuite { @@ -150,4 +151,40 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite { } } } + + test("push partial filters to offload scan when filter need fallback - v1") { + withSQLConf(GlutenConfig.EXPRESSION_BLACK_LIST.key -> "add") { + createTPCHNotNullTables() + val query = "select l_partkey from lineitem where l_partkey + 1 > 5 and l_partkey - 1 < 8" + runQueryAndCompare(query) { + df => + { + val executedPlan = getExecutedPlan(df) + val scans = executedPlan.collect { case p: FileSourceScanExecTransformer => p } + assert(scans.size == 1) + // isnotnull(l_partkey) and l_partkey - 1 < 8 + assert(scans.head.filterExprs().size == 2) + } + } + } + } + + test("push partial filters to offload scan when filter need fallback - v2") { + withSQLConf( + GlutenConfig.EXPRESSION_BLACK_LIST.key -> "add", + SQLConf.USE_V1_SOURCE_LIST.key -> "") { + createTPCHNotNullTables() + val query = "select l_partkey from lineitem where l_partkey + 1 > 5 and l_partkey - 1 < 8" + runQueryAndCompare(query) { + df => + { + val executedPlan = getExecutedPlan(df) + val scans = executedPlan.collect { case p: BatchScanExecTransformer => p } + assert(scans.size == 1) + // isnotnull(l_partkey) and l_partkey - 1 < 8 + assert(scans.head.filterExprs().size == 2) + } + } + } + } } From c4a7368db76b6f712a5b235e104eb97ee75606c4 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 10 Dec 2024 18:04:28 +0800 Subject: [PATCH 4/4] update --- .../apache/gluten/execution/BatchScanExecTransformer.scala | 4 +++- .../org/apache/gluten/expression/ExpressionConverter.scala | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index 29bf1dcab6d8..67710cba072a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -109,7 +109,9 @@ abstract class BatchScanExecTransformerBase( ExpressionConverter.replaceAttributeReference(expr), output) } - case _ => Seq.empty + case _ => + logInfo(s"${scan.getClass.toString} does not support push down filters") + Seq.empty } def setPushDownFilters(filters: Seq[Expression]): Unit = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index bbcf256b5d2c..e4aeb3ef78a9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -58,12 +58,13 @@ object ExpressionConverter extends SQLConfHelper with Logging { def canReplaceWithExpressionTransformer( expr: Expression, attributeSeq: Seq[Attribute]): Boolean = { - val expressionsMap = ExpressionMappings.expressionsMap try { - replaceWithExpressionTransformer0(expr, attributeSeq, expressionsMap) + replaceWithExpressionTransformer(expr, attributeSeq) true } catch { - case _: Exception => false + case e: Exception => + logInfo(e.getMessage) + false } }