Skip to content

Commit

Permalink
[GLUTEN-3582][CORE][VL][CH] Refactor filter pushdown logic (#4582)
Browse files Browse the repository at this point in the history
* Add postProcessPushDownFilter in SparkPlanExecApi

* fix IcebergScanTransformer build

* Fix UT for TPCH 22, since we didn't push down subquery for parquet
  • Loading branch information
baibaichen authored Jan 31, 2024
1 parent ec90ede commit 512b69a
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec}
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.ClickHouseScan
Expand Down Expand Up @@ -571,4 +572,30 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
}
}
}

/** Clickhouse Backend only supports part of filters for parquet. */
override def postProcessPushDownFilter(
extraFilters: Seq[Expression],
sparkExecNode: LeafExecNode): Seq[Expression] = {
// FIXME: DeltaMergeTreeFileFormat should not inherit from ParquetFileFormat.
def isParquetFormat(fileFormat: FileFormat): Boolean = fileFormat match {
case p: ParquetFileFormat if p.shortName().equals("parquet") => true
case _ => false
}

// TODO: datasource v2 ?
// TODO: Push down conditions with scalar subquery
// For example, consider TPCH 22 'c_acctbal > (select avg(c_acctbal) from customer where ...)'.
// Vanilla Spark only pushes down the Parquet Filter not Catalyst Filter, so it can not get the
// subquery result, while gluten pushes down the Catalyst Filter which we can benefit from this
// to get result. But the current implementation is ineffective, since we didn't use
// ReusedSubqueryExec

sparkExecNode match {
case fileSourceScan: FileSourceScanExec
if isParquetFormat(fileSourceScan.relation.fileFormat) =>
fileSourceScan.dataFilters
case _ => super.postProcessPushDownFilter(extraFilters, sparkExecNode)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
val adaptiveSparkPlanExec = collectWithSubqueries(df.queryExecution.executedPlan) {
case adaptive: AdaptiveSparkPlanExec => adaptive
}
assert(adaptiveSparkPlanExec.size == 3)
assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
assert(adaptiveSparkPlanExec.size == 2)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ class GlutenClickHouseTPCHParquetAQESuite
val adaptiveSparkPlanExec = collectWithSubqueries(df.queryExecution.executedPlan) {
case adaptive: AdaptiveSparkPlanExec => adaptive
}
assert(adaptiveSparkPlanExec.size == 3)
assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
assert(adaptiveSparkPlanExec.size == 2)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list
// Remove this compatiability in later and then only java iter has local files in ReadRel.
if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read)))
{
assert(rel.has_base_schema());
assert(read.has_base_schema());
QueryPlanStepPtr step;
if (isReadRelFromJava(read))
step = parseReadRealWithJavaIter(read);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
Expand Down Expand Up @@ -530,4 +531,39 @@ trait SparkPlanExecApi {
def genInjectedFunctions(): Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = Seq.empty

def rewriteSpillPath(path: String): String = path

/**
* Vanilla spark just push down part of filter condition into scan, however gluten can push down
* all filters. This function calculates the remaining conditions in FilterExec, add into the
* dataFilters of the leaf node.
* @param extraFilters:
* Conjunctive Predicates, which are split from the upper FilterExec
* @param sparkExecNode:
* The vanilla leaf node of the plan tree, which is FileSourceScanExec or BatchScanExec
* @return
* return all push down filters
*/
def postProcessPushDownFilter(
extraFilters: Seq[Expression],
sparkExecNode: LeafExecNode): Seq[Expression] = {
sparkExecNode match {
case fileSourceScan: FileSourceScanExec =>
fileSourceScan.dataFilters ++ FilterHandler.getRemainingFilters(
fileSourceScan.dataFilters,
extraFilters)
case batchScan: BatchScanExec =>
batchScan.scan match {
case fileScan: FileScan =>
fileScan.dataFilters ++ FilterHandler.getRemainingFilters(
fileScan.dataFilters,
extraFilters)
case _ =>
// TODO: For data lake format use pushedFilters in SupportsPushDownFilters
extraFilters
}
case _ =>
throw new UnsupportedOperationException(
s"${sparkExecNode.getClass.toString} is not supported.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,30 +387,27 @@ object FilterHandler extends PredicateHelper {
(ExpressionSet(filters) -- ExpressionSet(scanFilters)).toSeq

// Separate and compare the filter conditions in Scan and Filter.
// Push down the remaining conditions in Filter into Scan.
// Try push down the remaining conditions in Filter into Scan.
def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): SparkPlan =
filter.child match {
case fileSourceScan: FileSourceScanExec =>
val remainingFilters =
getRemainingFilters(
fileSourceScan.dataFilters,
splitConjunctivePredicates(filter.condition))
val pushDownFilters =
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
splitConjunctivePredicates(filter.condition),
fileSourceScan)
ScanTransformerFactory.createFileSourceScanTransformer(
fileSourceScan,
reuseSubquery,
extraFilters = remainingFilters)
allPushDownFilters = Some(pushDownFilters))
case batchScan: BatchScanExec =>
val remainingFilters = batchScan.scan match {
case fileScan: FileScan =>
getRemainingFilters(fileScan.dataFilters, splitConjunctivePredicates(filter.condition))
case _ =>
// TODO: For data lake format use pushedFilters in SupportsPushDownFilters
splitConjunctivePredicates(filter.condition)
}
val pushDownFilters =
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
splitConjunctivePredicates(filter.condition),
batchScan)
ScanTransformerFactory.createBatchScanTransformer(
batchScan,
reuseSubquery,
pushdownFilters = remainingFilters)
allPushDownFilters = Some(pushDownFilters))
case other =>
throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch

import java.util.Objects

import scala.collection.mutable.ListBuffer

/**
* Columnar Based BatchScanExec. Although keyGroupedPartitioning is not used, it cannot be deleted,
* it can make BatchScanExecTransformer contain a constructor with the same parameters as
Expand Down Expand Up @@ -64,13 +62,15 @@ class BatchScanExecTransformer(
// class. Otherwise, we will encounter an issue where makeCopy cannot find a constructor
// with the corresponding number of parameters.
// The workaround is to add a mutable list to pass in pushdownFilters.
val pushdownFilters: ListBuffer[Expression] = ListBuffer.empty
protected var pushdownFilters: Option[Seq[Expression]] = None

def addPushdownFilters(filters: Seq[Expression]): Unit = pushdownFilters ++= filters
def setPushDownFilters(filters: Seq[Expression]): Unit = {
pushdownFilters = Some(filters)
}

override def filterExprs(): Seq[Expression] = scan match {
case fileScan: FileScan =>
fileScan.dataFilters ++ pushdownFilters
pushdownFilters.getOrElse(fileScan.dataFilters)
case _ =>
throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object ScanTransformerFactory {
def createFileSourceScanTransformer(
scanExec: FileSourceScanExec,
reuseSubquery: Boolean,
extraFilters: Seq[Expression] = Seq.empty,
allPushDownFilters: Option[Seq[Expression]] = None,
validation: Boolean = false): FileSourceScanExecTransformer = {
// transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters
val newPartitionFilters = if (validation) {
Expand All @@ -61,7 +61,7 @@ object ScanTransformerFactory {
newPartitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters ++ extraFilters,
allPushDownFilters.getOrElse(scanExec.dataFilters),
scanExec.tableIdentifier,
scanExec.disableBucketedScan
)
Expand Down Expand Up @@ -97,7 +97,7 @@ object ScanTransformerFactory {
def createBatchScanTransformer(
batchScan: BatchScanExec,
reuseSubquery: Boolean,
pushdownFilters: Seq[Expression] = Seq.empty,
allPushDownFilters: Option[Seq[Expression]] = None,
validation: Boolean = false): SparkPlan = {
if (supportedBatchScan(batchScan.scan)) {
val newPartitionFilters = if (validation) {
Expand All @@ -108,9 +108,9 @@ object ScanTransformerFactory {
ExpressionConverter.transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery)
}
val transformer = lookupBatchScanTransformer(batchScan, newPartitionFilters)
if (!validation && pushdownFilters.nonEmpty) {
transformer.addPushdownFilters(pushdownFilters)
// Validate again if pushdownFilters is not empty.
if (!validation && allPushDownFilters.isDefined) {
transformer.setPushDownFilters(allPushDownFilters.get)
// Validate again if allPushDownFilters is defined.
val validationResult = transformer.doValidate()
if (validationResult.isValid) {
transformer
Expand All @@ -135,7 +135,7 @@ object ScanTransformerFactory {
}
}

def supportedBatchScan(scan: Scan): Boolean = scan match {
private def supportedBatchScan(scan: Scan): Boolean = scan match {
case _: FileScan => true
case _ => lookupDataSourceScanTransformer(scan.getClass.getName).nonEmpty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class IcebergScanTransformer(
runtimeFilters = runtimeFilters,
table = table) {

override def filterExprs(): Seq[Expression] = pushdownFilters
override def filterExprs(): Seq[Expression] = pushdownFilters.getOrElse(Seq.empty)

override def getPartitionSchema: StructType = GlutenIcebergSourceUtil.getPartitionSchema(scan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.{BatchScanExecTransformer, FileSourceScanExecTransformer, FilterExecTransformerBase}
import io.glutenproject.utils.BackendTestUtils

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
Expand Down Expand Up @@ -635,8 +636,15 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff
//
// See also io.glutenproject.execution.FilterHandler#applyFilterPushdownToScan
// See also DynamicPartitionPruningSuite.scala:1362
assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
if (BackendTestUtils.isCHBackendLoaded()) {
assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly")
} else if (BackendTestUtils.isVeloxBackendLoaded()) {
assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
} else {
assert(false, "Unknown backend")
}
assert(
reusedSubqueryIds.forall(subqueryIds.contains(_)),
"ReusedSubqueryExec should reuse an existing subquery")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.{BatchScanExecTransformer, FileSourceScanExecTransformer, FilterExecTransformerBase}
import io.glutenproject.utils.BackendTestUtils

import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
Expand Down Expand Up @@ -640,8 +641,15 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff
//
// See also io.glutenproject.execution.FilterHandler#applyFilterPushdownToScan
// See also DynamicPartitionPruningSuite.scala:1362
assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
if (BackendTestUtils.isCHBackendLoaded()) {
assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly")
} else if (BackendTestUtils.isVeloxBackendLoaded()) {
assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly")
assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not working correctly")
} else {
assert(false, "Unknown backend")
}
assert(
reusedSubqueryIds.forall(subqueryIds.contains(_)),
"ReusedSubqueryExec should reuse an existing subquery")
Expand Down

0 comments on commit 512b69a

Please sign in to comment.