Skip to content

Commit

Permalink
[CORE] Push partial filters to offload scan when filter need fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Dec 6, 2024
1 parent f96105d commit 2d59e00
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ case class IcebergScanTransformer(
commonPartitionValues = commonPartitionValues
) {

override def filterExprs(): Seq[Expression] = pushdownFilters.getOrElse(Seq.empty)

override lazy val getPartitionSchema: StructType =
GlutenIcebergSourceUtil.getReadPartitionSchema(scan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -131,11 +131,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
}.asJava
// Will put all filter expressions into an AND expression
val transformer = filterExprs()
.map {
case ar: AttributeReference if ar.dataType == BooleanType =>
EqualNullSafe(ar, Literal.TrueLiteral)
case e => e
}
.map(ExpressionConverter.replaceAttributeReference)
.reduceLeftOption(And)
.map(ExpressionConverter.replaceWithExpressionTransformer(_, output))
val filterNodes = transformer.map(_.doTransform(context.registeredFunction))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
Expand Down Expand Up @@ -96,18 +96,22 @@ abstract class BatchScanExecTransformerBase(
// class. Otherwise, we will encounter an issue where makeCopy cannot find a constructor
// with the corresponding number of parameters.
// The workaround is to add a mutable list to pass in pushdownFilters.
protected var pushdownFilters: Option[Seq[Expression]] = None
protected var pushdownFilters: Seq[Expression] = scan match {
case fileScan: FileScan =>
fileScan.dataFilters.filter {
expr =>
ExpressionConverter.canReplaceWithExpressionTransformer(
ExpressionConverter.replaceAttributeReference(expr),
output)
}
case _ => Seq.empty
}

def setPushDownFilters(filters: Seq[Expression]): Unit = {
pushdownFilters = Some(filters)
pushdownFilters = filters
}

override def filterExprs(): Seq[Expression] = scan match {
case fileScan: FileScan =>
pushdownFilters.getOrElse(fileScan.dataFilters)
case _ =>
throw new GlutenNotSupportException(s"${scan.getClass.toString} is not supported")
}
override def filterExprs(): Seq[Expression] = pushdownFilters

override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
Expand Down Expand Up @@ -99,7 +100,12 @@ abstract class FileSourceScanExecTransformerBase(
.genFileSourceScanTransformerMetrics(sparkContext)
.filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias

override def filterExprs(): Seq[Expression] = dataFiltersInScan
override def filterExprs(): Seq[Expression] = dataFiltersInScan.filter {
expr =>
ExpressionConverter.canReplaceWithExpressionTransformer(
ExpressionConverter.replaceAttributeReference(expr),
output)
}

override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.connector.read.Scan
Expand All @@ -42,14 +43,20 @@ object ScanTransformerFactory {
.asInstanceOf[DataSourceScanTransformerRegister]
.createDataSourceTransformer(scanExec)
case _ =>
val dataFilters = scanExec.dataFilters.filter {
expr =>
ExpressionConverter.canReplaceWithExpressionTransformer(
ExpressionConverter.replaceAttributeReference(expr),
scanExec.output)
}
FileSourceScanExecTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
scanExec.partitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
dataFilters,
scanExec.tableIdentifier,
scanExec.disableBucketedScan
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ object ExpressionConverter extends SQLConfHelper with Logging {
replaceWithExpressionTransformer0(expr, attributeSeq, expressionsMap)
}

def canReplaceWithExpressionTransformer(
expr: Expression,
attributeSeq: Seq[Attribute]): Boolean = {
val expressionsMap = ExpressionMappings.expressionsMap
try {
replaceWithExpressionTransformer0(expr, attributeSeq, expressionsMap)
true
} catch {
case _: Exception => false
}
}

def replaceAttributeReference(expr: Expression): Expression = expr match {
case ar: AttributeReference if ar.dataType == BooleanType =>
EqualNullSafe(ar, Literal.TrueLiteral)
case e => e
}

private def replacePythonUDFWithExpressionTransformer(
udf: PythonUDF,
attributeSeq: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object PushDownFilterToScan extends Rule[SparkPlan] with PredicateHelper {
// If BatchScanExecTransformerBase's parent is filter, pushdownFilters can't be None.
batchScan.setPushDownFilters(Seq.empty)
val newScan = batchScan
if (pushDownFilters.size > 0) {
if (pushDownFilters.nonEmpty) {
newScan.setPushDownFilters(pushDownFilters)
if (newScan.doValidate().ok()) {
filter.withNewChildren(Seq(newScan))
Expand Down

0 comments on commit 2d59e00

Please sign in to comment.