diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 79e6af43563c..5131c616b2fa 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.ClickHouseScan @@ -571,4 +572,30 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { } } } + + /** Clickhouse Backend only supports part of filters for parquet. */ + override def postProcessPushDownFilter( + extraFilters: Seq[Expression], + sparkExecNode: LeafExecNode): Seq[Expression] = { + // FIXME: DeltaMergeTreeFileFormat should not inherit from ParquetFileFormat. + def isParquetFormat(fileFormat: FileFormat): Boolean = fileFormat match { + case p: ParquetFileFormat if p.shortName().equals("parquet") => true + case _ => false + } + + // TODO: datasource v2 ? + // TODO: Push down conditions with scalar subquery + // For example, consider TPCH 22 'c_acctbal > (select avg(c_acctbal) from customer where ...)'. + // Vanilla Spark only pushes down the Parquet Filter not Catalyst Filter, so it can not get the + // subquery result, while gluten pushes down the Catalyst Filter which we can benefit from this + // to get result. But the current implementation is ineffective, since we didn't use + // ReusedSubqueryExec + + sparkExecNode match { + case fileSourceScan: FileSourceScanExec + if isParquetFormat(fileSourceScan.relation.fileFormat) => + fileSourceScan.dataFilters + case _ => super.postProcessPushDownFilter(extraFilters, sparkExecNode) + } + } } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 32efabc9472e..f710333d60c5 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -294,8 +294,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite val adaptiveSparkPlanExec = collectWithSubqueries(df.queryExecution.executedPlan) { case adaptive: AdaptiveSparkPlanExec => adaptive } - assert(adaptiveSparkPlanExec.size == 3) - assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2)) + assert(adaptiveSparkPlanExec.size == 2) } } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala index ae2a105fb1f1..a324ed6733c4 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala @@ -218,8 +218,7 @@ class GlutenClickHouseTPCHParquetAQESuite val adaptiveSparkPlanExec = collectWithSubqueries(df.queryExecution.executedPlan) { case adaptive: AdaptiveSparkPlanExec => adaptive } - assert(adaptiveSparkPlanExec.size == 3) - assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2)) + assert(adaptiveSparkPlanExec.size == 2) } } diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index cee45697365a..ccb5b008acf9 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -481,7 +481,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list // Remove this compatiability in later and then only java iter has local files in ReadRel. if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read))) { - assert(rel.has_base_schema()); + assert(read.has_base_schema()); QueryPlanStepPtr step; if (isReadRelFromJava(read)) step = parseReadRealWithJavaIter(read); diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala index 3a4ef6ad3d95..3e1dcf6f1f8a 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.hive.HiveTableScanExecTransformer @@ -530,4 +531,39 @@ trait SparkPlanExecApi { def genInjectedFunctions(): Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = Seq.empty def rewriteSpillPath(path: String): String = path + + /** + * Vanilla spark just push down part of filter condition into scan, however gluten can push down + * all filters. This function calculates the remaining conditions in FilterExec, add into the + * dataFilters of the leaf node. + * @param extraFilters: + * Conjunctive Predicates, which are split from the upper FilterExec + * @param sparkExecNode: + * The vanilla leaf node of the plan tree, which is FileSourceScanExec or BatchScanExec + * @return + * return all push down filters + */ + def postProcessPushDownFilter( + extraFilters: Seq[Expression], + sparkExecNode: LeafExecNode): Seq[Expression] = { + sparkExecNode match { + case fileSourceScan: FileSourceScanExec => + fileSourceScan.dataFilters ++ FilterHandler.getRemainingFilters( + fileSourceScan.dataFilters, + extraFilters) + case batchScan: BatchScanExec => + batchScan.scan match { + case fileScan: FileScan => + fileScan.dataFilters ++ FilterHandler.getRemainingFilters( + fileScan.dataFilters, + extraFilters) + case _ => + // TODO: For data lake format use pushedFilters in SupportsPushDownFilters + extraFilters + } + case _ => + throw new UnsupportedOperationException( + s"${sparkExecNode.getClass.toString} is not supported.") + } + } } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index 1da16ba86373..1a03a05e54b7 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -387,30 +387,27 @@ object FilterHandler extends PredicateHelper { (ExpressionSet(filters) -- ExpressionSet(scanFilters)).toSeq // Separate and compare the filter conditions in Scan and Filter. - // Push down the remaining conditions in Filter into Scan. + // Try push down the remaining conditions in Filter into Scan. def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan = filter.child match { case fileSourceScan: FileSourceScanExec => - val remainingFilters = - getRemainingFilters( - fileSourceScan.dataFilters, - splitConjunctivePredicates(filter.condition)) + val pushDownFilters = + BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter( + splitConjunctivePredicates(filter.condition), + fileSourceScan) ScanTransformerFactory.createFileSourceScanTransformer( fileSourceScan, reuseSubquery, - extraFilters = remainingFilters) + allPushDownFilters = Some(pushDownFilters)) case batchScan: BatchScanExec => - val remainingFilters = batchScan.scan match { - case fileScan: FileScan => - getRemainingFilters(fileScan.dataFilters, splitConjunctivePredicates(filter.condition)) - case _ => - // TODO: For data lake format use pushedFilters in SupportsPushDownFilters - splitConjunctivePredicates(filter.condition) - } + val pushDownFilters = + BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter( + splitConjunctivePredicates(filter.condition), + batchScan) ScanTransformerFactory.createBatchScanTransformer( batchScan, reuseSubquery, - pushdownFilters = remainingFilters) + allPushDownFilters = Some(pushDownFilters)) case other => throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.") } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala index 07365e04fe5a..7c4d0c65462f 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala @@ -34,8 +34,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import java.util.Objects -import scala.collection.mutable.ListBuffer - /** * Columnar Based BatchScanExec. Although keyGroupedPartitioning is not used, it cannot be deleted, * it can make BatchScanExecTransformer contain a constructor with the same parameters as @@ -64,13 +62,15 @@ class BatchScanExecTransformer( // class. Otherwise, we will encounter an issue where makeCopy cannot find a constructor // with the corresponding number of parameters. // The workaround is to add a mutable list to pass in pushdownFilters. - val pushdownFilters: ListBuffer[Expression] = ListBuffer.empty + protected var pushdownFilters: Option[Seq[Expression]] = None - def addPushdownFilters(filters: Seq[Expression]): Unit = pushdownFilters ++= filters + def setPushDownFilters(filters: Seq[Expression]): Unit = { + pushdownFilters = Some(filters) + } override def filterExprs(): Seq[Expression] = scan match { case fileScan: FileScan => - fileScan.dataFilters ++ pushdownFilters + pushdownFilters.getOrElse(fileScan.dataFilters) case _ => throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported") } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala index c5f547308c06..0a8962917b32 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala @@ -37,7 +37,7 @@ object ScanTransformerFactory { def createFileSourceScanTransformer( scanExec: FileSourceScanExec, reuseSubquery: Boolean, - extraFilters: Seq[Expression] = Seq.empty, + allPushDownFilters: Option[Seq[Expression]] = None, validation: Boolean = false): FileSourceScanExecTransformer = { // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters val newPartitionFilters = if (validation) { @@ -61,7 +61,7 @@ object ScanTransformerFactory { newPartitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, - scanExec.dataFilters ++ extraFilters, + allPushDownFilters.getOrElse(scanExec.dataFilters), scanExec.tableIdentifier, scanExec.disableBucketedScan ) @@ -97,7 +97,7 @@ object ScanTransformerFactory { def createBatchScanTransformer( batchScan: BatchScanExec, reuseSubquery: Boolean, - pushdownFilters: Seq[Expression] = Seq.empty, + allPushDownFilters: Option[Seq[Expression]] = None, validation: Boolean = false): SparkPlan = { if (supportedBatchScan(batchScan.scan)) { val newPartitionFilters = if (validation) { @@ -108,9 +108,9 @@ object ScanTransformerFactory { ExpressionConverter.transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery) } val transformer = lookupBatchScanTransformer(batchScan, newPartitionFilters) - if (!validation && pushdownFilters.nonEmpty) { - transformer.addPushdownFilters(pushdownFilters) - // Validate again if pushdownFilters is not empty. + if (!validation && allPushDownFilters.isDefined) { + transformer.setPushDownFilters(allPushDownFilters.get) + // Validate again if allPushDownFilters is defined. val validationResult = transformer.doValidate() if (validationResult.isValid) { transformer @@ -135,7 +135,7 @@ object ScanTransformerFactory { } } - def supportedBatchScan(scan: Scan): Boolean = scan match { + private def supportedBatchScan(scan: Scan): Boolean = scan match { case _: FileScan => true case _ => lookupDataSourceScanTransformer(scan.getClass.getName).nonEmpty } diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala index 639e627d165d..5cbbeac4bd4a 100644 --- a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala @@ -39,7 +39,7 @@ class IcebergScanTransformer( runtimeFilters = runtimeFilters, table = table) { - override def filterExprs(): Seq[Expression] = pushdownFilters + override def filterExprs(): Seq[Expression] = pushdownFilters.getOrElse(Seq.empty) override def getPartitionSchema: StructType = GlutenIcebergSourceUtil.getPartitionSchema(scan) diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala index 6510f2f467d3..53593753a619 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import io.glutenproject.GlutenConfig import io.glutenproject.execution.{BatchScanExecTransformer, FileSourceScanExecTransformer, FilterExecTransformerBase} +import io.glutenproject.utils.BackendTestUtils import org.apache.spark.SparkConf import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST @@ -635,8 +636,15 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff // // See also io.glutenproject.execution.FilterHandler#applyFilterPushdownToScan // See also DynamicPartitionPruningSuite.scala:1362 - assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly") - assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly") + if (BackendTestUtils.isCHBackendLoaded()) { + assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly") + assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly") + } else if (BackendTestUtils.isVeloxBackendLoaded()) { + assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly") + assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly") + } else { + assert(false, "Unknown backend") + } assert( reusedSubqueryIds.forall(subqueryIds.contains(_)), "ReusedSubqueryExec should reuse an existing subquery") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala index 3815a1369e07..035bdc561b7d 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import io.glutenproject.GlutenConfig import io.glutenproject.execution.{BatchScanExecTransformer, FileSourceScanExecTransformer, FilterExecTransformerBase} +import io.glutenproject.utils.BackendTestUtils import org.apache.spark.SparkConf import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST @@ -640,8 +641,15 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff // // See also io.glutenproject.execution.FilterHandler#applyFilterPushdownToScan // See also DynamicPartitionPruningSuite.scala:1362 - assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly") - assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly") + if (BackendTestUtils.isCHBackendLoaded()) { + assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly") + assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly") + } else if (BackendTestUtils.isVeloxBackendLoaded()) { + assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly") + assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly") + } else { + assert(false, "Unknown backend") + } assert( reusedSubqueryIds.forall(subqueryIds.contains(_)), "ReusedSubqueryExec should reuse an existing subquery")