diff --git a/cpp/velox/substrait/SubstraitParser.cc b/cpp/velox/substrait/SubstraitParser.cc index 8abc39bc67eb7..1db33e568625a 100644 --- a/cpp/velox/substrait/SubstraitParser.cc +++ b/cpp/velox/substrait/SubstraitParser.cc @@ -131,7 +131,7 @@ void SubstraitParser::parsePartitionAndMetadataColumns( case ::substrait::NamedStruct::METADATA_COL: isPartitionColumns.emplace_back(false); isMetadataColumns.emplace_back(true); - break; + break; default: VELOX_FAIL("Unspecified column type."); } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index 6a671815412fe..9921037999a6d 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -40,11 +40,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { // The key of merge schema option in Parquet reader. protected val mergeSchemaOptionKey = "mergeschema" - def filterExprs(): Seq[Expression] + def filterExprs(hasMetadataColFilters: Boolean = true): Seq[Expression] def outputAttributes(): Seq[Attribute] - def getMetadataColumns(): Seq[Attribute] + def getMetadataColumns(): Seq[AttributeReference] def getPartitions: Seq[InputPartition] @@ -116,7 +116,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { } }.asJava // Will put all filter expressions into an AND expression - val transformer = filterExprs() + val transformer = filterExprs(false) .reduceLeftOption(And) .map(ExpressionConverter.replaceWithExpressionTransformer(_, output)) val filterNodes = transformer.map(_.doTransform(context.registeredFunction)) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala index fdffe4aae47a8..75481ea4fc8e2 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala @@ -57,14 +57,14 @@ class BatchScanExecTransformer( @transient override lazy val metrics: Map[String, SQLMetric] = BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetrics(sparkContext) - override def filterExprs(): Seq[Expression] = scan match { + override def filterExprs(hasMetataColFilters: Boolean): Seq[Expression] = scan match { case fileScan: FileScan => fileScan.dataFilters case _ => throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported") } - override def getMetadataColumns(): Seq[Attribute] = Seq.empty + override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty override def outputAttributes(): Seq[Attribute] = output diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala index f381c4136db4d..2a9bd9c176ae2 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala @@ -26,7 +26,7 @@ import io.glutenproject.substrait.rel.ReadRelNode import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression} import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.FileSourceScanExecShim import org.apache.spark.sql.execution.datasources.HadoopFsRelation @@ -75,9 +75,15 @@ class FileSourceScanExecTransformer( Map.empty[String, SQLMetric] } - override def filterExprs(): Seq[Expression] = dataFiltersWithoutMetatadaAttr + override def filterExprs(hasMetadataColFilters: Boolean): Seq[Expression] = { + if (hasMetadataColFilters) { + dataFilters + } else { + dataFiltersWithoutMetadataAttr + } + } - override def getMetadataColumns(): Seq[Attribute] = metadataColumns + override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns override def outputAttributes(): Seq[Attribute] = output diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index eb53ec46f8d14..5d539cb6ee4db 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -27,7 +27,7 @@ import io.glutenproject.substrait.rel.ReadRelNode import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetric @@ -64,9 +64,9 @@ class HiveTableScanExecTransformer( hiveQlTable.getOutputFormatClass, hiveQlTable.getMetadata) - override def filterExprs(): Seq[Expression] = Seq.empty + override def filterExprs(hasMetadataColFilters: Boolean): Seq[Expression] = Seq.empty - override def getMetadataColumns(): Seq[Attribute] = Seq.empty + override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty override def outputAttributes(): Seq[Attribute] = output diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 5ce57bf2f2756..0eeb29413c1ae 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -63,10 +63,9 @@ class FileSourceScanExecShim( override def canEqual(other: Any): Boolean = other.isInstanceOf[FileSourceScanExecShim] - def dataFiltersWithoutMetatadaAttr: Seq[Expression] = dataFilters - def metadataColumns: Seq[AttributeReference] = Seq.empty + def dataFiltersWithoutMetadataAttr: Seq[Expression] = dataFilters def hasUnsupportedColumns: Boolean = { // Below name has special meaning in Velox. output.exists(a => a.name == "$path" || a.name == "$bucket") diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 85ae37f81e954..ebd8430879ea9 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -64,8 +64,9 @@ class FileSourceScanExecShim( override def canEqual(other: Any): Boolean = other.isInstanceOf[FileSourceScanExecShim] - def dataFiltersWithoutMetatadaAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists { + def dataFiltersWithoutMetadataAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists { case FileSourceMetadataAttribute(_) => true + case _ => false }) def hasUnsupportedColumns: Boolean = { diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 6e49ef7e3ef10..40808ee435ab5 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import io.glutenproject.metrics.GlutenTimeMetric import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceMetadataAttribute, MetadataAttribute, PlanExpression, Predicate} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceMetadataAttribute, PlanExpression, Predicate} import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.metric.SQLMetric @@ -60,12 +60,12 @@ class FileSourceScanExecShim( override def canEqual(other: Any): Boolean = other.isInstanceOf[FileSourceScanExecShim] - def dataFiltersWithoutMetatadaAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists { + def dataFiltersWithoutMetadataAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists { case FileSourceMetadataAttribute(_) => true + case _ => false }) def metadataColumns: Seq[AttributeReference] = fileConstantMetadataColumns - def hasUnsupportedColumns: Boolean = { val metadataColumnsNames = metadataColumns.map(_.name) output