diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index 7d8c5aab6b87..52dad6da3773 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -20,7 +20,6 @@ import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} @@ -35,8 +34,7 @@ object ScanTransformerFactory { private val scanTransformerMap = new ConcurrentHashMap[String, Class[_]]() def createFileSourceScanTransformer( - scanExec: FileSourceScanExec, - allPushDownFilters: Option[Seq[Expression]] = None): FileSourceScanExecTransformerBase = { + scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = { val fileFormat = scanExec.relation.fileFormat lookupDataSourceScanTransformer(fileFormat.getClass.getName) match { case Some(clz) => @@ -53,7 +51,7 @@ object ScanTransformerFactory { scanExec.partitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, - allPushDownFilters.getOrElse(scanExec.dataFilters), + scanExec.dataFilters, scanExec.tableIdentifier, scanExec.disableBucketedScan ) @@ -87,20 +85,16 @@ object ScanTransformerFactory { def createBatchScanTransformer( batchScan: BatchScanExec, - allPushDownFilters: Option[Seq[Expression]] = None, validation: Boolean = false): SparkPlan = { if (supportedBatchScan(batchScan.scan)) { val transformer = lookupBatchScanTransformer(batchScan) - if (!validation && allPushDownFilters.isDefined) { - transformer.setPushDownFilters(allPushDownFilters.get) - // Validate again if allPushDownFilters is defined. + if (!validation) { val validationResult = transformer.doValidate() if (validationResult.ok()) { transformer } else { - val newSource = batchScan.copy(runtimeFilters = transformer.runtimeFilters) - FallbackTags.add(newSource, validationResult.reason()) - newSource + FallbackTags.add(batchScan, validationResult.reason()) + batchScan } } else { transformer