diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index 1841faccf127..6fecb2c5f3da 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -75,7 +75,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { override def genSplitInfo( partition: InputPartition, partitionSchema: StructType, - fileFormat: ReadFileFormat): SplitInfo = { + fileFormat: ReadFileFormat, + metadataColumnNames: Seq[String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => val partLists = new JArrayList[String]() @@ -128,6 +129,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { starts, lengths, partitionColumns, + new JArrayList[JMap[String, String]](), fileFormat, preferredLocations.toList.asJava) case _ => diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index f2943d31dd44..84198a6f8ecc 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -20,6 +20,7 @@ import io.glutenproject.GlutenNumaBindingInfo import io.glutenproject.backendsapi.IteratorApi import io.glutenproject.execution._ import io.glutenproject.metrics.IMetrics +import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.substrait.plan.PlanNode import io.glutenproject.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat @@ -53,11 +54,12 @@ class IteratorApiImpl extends IteratorApi with Logging { override def genSplitInfo( partition: InputPartition, partitionSchema: StructType, - fileFormat: ReadFileFormat): SplitInfo = { + fileFormat: ReadFileFormat, + metadataColumnNames: Seq[String]): SplitInfo = { partition match { case f: FilePartition => - val (paths, starts, lengths, partitionColumns) = - constructSplitInfo(partitionSchema, f.files) + val (paths, starts, lengths, partitionColumns, metadataColumns) = + constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) LocalFilesBuilder.makeLocalFiles( @@ -66,6 +68,7 @@ class IteratorApiImpl extends IteratorApi with Logging { starts, lengths, partitionColumns, + metadataColumns, fileFormat, preferredLocations.toList.asJava) case _ => @@ -92,11 +95,15 @@ class IteratorApiImpl extends IteratorApi with Logging { } } - private def constructSplitInfo(schema: StructType, files: Array[PartitionedFile]) = { + private def constructSplitInfo( + schema: StructType, + files: Array[PartitionedFile], + metadataColumnNames: Seq[String]) = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] val lengths = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] + var metadataColumns = new JArrayList[JMap[String, String]] files.foreach { file => // The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded @@ -106,7 +113,9 @@ class IteratorApiImpl extends IteratorApi with Logging { .decode(file.filePath.toString, StandardCharsets.UTF_8.name())) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) - + val metadataColumn = + SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames) + metadataColumns.add(metadataColumn) val partitionColumn = new JHashMap[String, String]() for (i <- 0 until file.partitionValues.numFields) { val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { @@ -131,7 +140,7 @@ class IteratorApiImpl extends IteratorApi with Logging { } partitionColumns.add(partitionColumn) } - (paths, starts, lengths, partitionColumns) + (paths, starts, lengths, partitionColumns, metadataColumns) } override def injectWriteFilesTempPath(path: String): Unit = { diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index bab5e68ecf57..22a0f4ebc05b 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -248,6 +248,8 @@ object BackendSettings extends BackendSettingsApi { } } + override def supportNativeMetadataColumns(): Boolean = true + override def supportExpandExec(): Boolean = true override def supportSortExec(): Boolean = true diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 47738cff94b2..8ca9f85cd870 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -60,6 +60,7 @@ std::shared_ptr parseScanSplitInfo( splitInfo->starts.reserve(fileList.size()); splitInfo->lengths.reserve(fileList.size()); splitInfo->partitionColumns.reserve(fileList.size()); + splitInfo->metadataColumns.reserve(fileList.size()); for (const auto& file : fileList) { // Expect all Partitions share the same index. splitInfo->partitionIndex = file.partition_index(); @@ -70,6 +71,12 @@ std::shared_ptr parseScanSplitInfo( } splitInfo->partitionColumns.emplace_back(partitionColumnMap); + std::unordered_map metadataColumnMap; + for (const auto& metadataColumn : file.metadata_columns()) { + metadataColumnMap[metadataColumn.key()] = metadataColumn.value(); + } + splitInfo->metadataColumns.emplace_back(metadataColumnMap); + splitInfo->paths.emplace_back(file.uri_file()); splitInfo->starts.emplace_back(file.start()); splitInfo->lengths.emplace_back(file.length()); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b10c643c84bf..86431819ba94 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -145,15 +145,28 @@ WholeStageResultIterator::WholeStageResultIterator( const auto& lengths = scanInfo->lengths; const auto& format = scanInfo->format; const auto& partitionColumns = scanInfo->partitionColumns; + const auto& metadataColumns = scanInfo->metadataColumns; std::vector> connectorSplits; connectorSplits.reserve(paths.size()); for (int idx = 0; idx < paths.size(); idx++) { auto partitionColumn = partitionColumns[idx]; + auto metadataColumn = metadataColumns[idx]; std::unordered_map> partitionKeys; constructPartitionColumns(partitionKeys, partitionColumn); auto split = std::make_shared( - kHiveConnectorId, paths[idx], format, starts[idx], lengths[idx], partitionKeys); + kHiveConnectorId, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt, + std::unordered_map(), + nullptr, + std::unordered_map(), + 0, + metadataColumn); connectorSplits.emplace_back(split); } diff --git a/cpp/velox/substrait/SubstraitParser.cc b/cpp/velox/substrait/SubstraitParser.cc index 36fe84558b0a..35f130076aff 100644 --- a/cpp/velox/substrait/SubstraitParser.cc +++ b/cpp/velox/substrait/SubstraitParser.cc @@ -104,31 +104,41 @@ std::vector SubstraitParser::parseNamedStruct(const ::substrait::NamedS return typeList; } -std::vector SubstraitParser::parsePartitionColumns(const ::substrait::NamedStruct& namedStruct) { +void SubstraitParser::parsePartitionAndMetadataColumns( + const ::substrait::NamedStruct& namedStruct, + std::vector& isPartitionColumns, + std::vector& isMetadataColumns) { const auto& columnsTypes = namedStruct.column_types(); - std::vector isPartitionColumns; if (columnsTypes.size() == 0) { - // Regard all columns as non-partitioned columns. + // Regard all columns as regular columns. isPartitionColumns.resize(namedStruct.names().size(), false); - return isPartitionColumns; + isMetadataColumns.resize(namedStruct.names().size(), false); + return; } else { VELOX_CHECK_EQ(columnsTypes.size(), namedStruct.names().size(), "Wrong size for column types and column names."); } isPartitionColumns.reserve(columnsTypes.size()); + isMetadataColumns.reserve(columnsTypes.size()); for (const auto& columnType : columnsTypes) { switch (columnType) { case ::substrait::NamedStruct::NORMAL_COL: isPartitionColumns.emplace_back(false); + isMetadataColumns.emplace_back(false); break; case ::substrait::NamedStruct::PARTITION_COL: isPartitionColumns.emplace_back(true); + isMetadataColumns.emplace_back(false); + break; + case ::substrait::NamedStruct::METADATA_COL: + isPartitionColumns.emplace_back(false); + isMetadataColumns.emplace_back(true); break; default: VELOX_FAIL("Unspecified column type."); } } - return isPartitionColumns; + return; } int32_t SubstraitParser::parseReferenceSegment(const ::substrait::Expression::ReferenceSegment& refSegment) { diff --git a/cpp/velox/substrait/SubstraitParser.h b/cpp/velox/substrait/SubstraitParser.h index 80d4f2ee2ba1..4aaac5a7159e 100644 --- a/cpp/velox/substrait/SubstraitParser.h +++ b/cpp/velox/substrait/SubstraitParser.h @@ -41,8 +41,11 @@ class SubstraitParser { const ::substrait::NamedStruct& namedStruct, bool asLowerCase = false); - /// Used to parse partition columns from Substrait NamedStruct. - static std::vector parsePartitionColumns(const ::substrait::NamedStruct& namedStruct); + /// Used to parse partition & metadata columns from Substrait NamedStruct. + static void parsePartitionAndMetadataColumns( + const ::substrait::NamedStruct& namedStruct, + std::vector& isPartitionColumns, + std::vector& isMetadataColumns); /// Parse Substrait Type to Velox type. static facebook::velox::TypePtr parseType(const ::substrait::Type& substraitType, bool asLowerCase = false); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index b8ca25a43a26..aa19b7f6a735 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -596,11 +596,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::vector tableColumnNames; std::vector partitionedKey; std::vector isPartitionColumns; + std::vector isMetadataColumns; tableColumnNames.reserve(writeRel.table_schema().names_size()); VELOX_CHECK(writeRel.has_table_schema(), "WriteRel should have the table schema to store the column information"); const auto& tableSchema = writeRel.table_schema(); - isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema); + SubstraitParser::parsePartitionAndMetadataColumns(tableSchema, isPartitionColumns, isMetadataColumns); for (const auto& name : tableSchema.names()) { tableColumnNames.emplace_back(name); @@ -1040,6 +1041,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::vector colNameList; std::vector veloxTypeList; std::vector isPartitionColumns; + std::vector isMetadataColumns; // Convert field names into lower case when not case-sensitive. std::shared_ptr veloxCfg = std::make_shared(confMap_); @@ -1055,7 +1057,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: colNameList.emplace_back(fieldName); } veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema, asLowerCase); - isPartitionColumns = SubstraitParser::parsePartitionColumns(baseSchema); + SubstraitParser::parsePartitionAndMetadataColumns(baseSchema, isPartitionColumns, isMetadataColumns); } // Do not hard-code connector ID and allow for connectors other than Hive. @@ -1110,8 +1112,13 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::unordered_map> assignments; for (int idx = 0; idx < colNameList.size(); idx++) { auto outName = SubstraitParser::makeNodeName(planNodeId_, idx); - auto columnType = isPartitionColumns[idx] ? connector::hive::HiveColumnHandle::ColumnType::kPartitionKey - : connector::hive::HiveColumnHandle::ColumnType::kRegular; + auto columnType = connector::hive::HiveColumnHandle::ColumnType::kRegular; + if (isPartitionColumns[idx]) { + columnType = connector::hive::HiveColumnHandle::ColumnType::kPartitionKey; + } + if (isMetadataColumns[idx]) { + columnType = connector::hive::HiveColumnHandle::ColumnType::kSynthesized; + } assignments[outName] = std::make_shared( colNameList[idx], columnType, veloxTypeList[idx], veloxTypeList[idx]); outNames.emplace_back(outName); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 21a318b91157..895c1d24e768 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -36,6 +36,9 @@ struct SplitInfo { /// The partition columns associated with partitioned table. std::vector> partitionColumns; + /// The metadata columns associated with partitioned table. + std::vector> metadataColumns; + /// The file paths to be scanned. std::vector paths; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 0e81ba91a34f..323c216e34cb 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -366,7 +366,9 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::WriteRel& writeR // Validate partition key type. if (writeRel.has_table_schema()) { const auto& tableSchema = writeRel.table_schema(); - auto isPartitionColumns = SubstraitParser::parsePartitionColumns(tableSchema); + std::vector isMetadataColumns; + std::vector isPartitionColumns; + SubstraitParser::parsePartitionAndMetadataColumns(tableSchema, isPartitionColumns, isMetadataColumns); for (auto i = 0; i < types.size(); i++) { if (isPartitionColumns[i]) { switch (types[i]->kind()) { diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java index c86c90cc667a..1f6eaabce18f 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesBuilder.java @@ -28,10 +28,18 @@ public static LocalFilesNode makeLocalFiles( List starts, List lengths, List> partitionColumns, + List> metadataColumns, LocalFilesNode.ReadFileFormat fileFormat, List preferredLocations) { return new LocalFilesNode( - index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); + index, + paths, + starts, + lengths, + partitionColumns, + metadataColumns, + fileFormat, + preferredLocations); } public static LocalFilesNode makeLocalFiles(String iterPath) { diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java index 32e81f2c6c98..e0700ded29c7 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java @@ -35,6 +35,7 @@ public class LocalFilesNode implements SplitInfo { private final List starts = new ArrayList<>(); private final List lengths = new ArrayList<>(); private final List> partitionColumns = new ArrayList<>(); + private final List> metadataColumns = new ArrayList<>(); private final List preferredLocations = new ArrayList<>(); // The format of file to read. @@ -60,6 +61,7 @@ public enum ReadFileFormat { List starts, List lengths, List> partitionColumns, + List> metadataColumns, ReadFileFormat fileFormat, List preferredLocations) { this.index = index; @@ -68,6 +70,7 @@ public enum ReadFileFormat { this.lengths.addAll(lengths); this.fileFormat = fileFormat; this.partitionColumns.addAll(partitionColumns); + this.metadataColumns.addAll(metadataColumns); this.preferredLocations.addAll(preferredLocations); } @@ -141,7 +144,22 @@ public ReadRel.LocalFiles toProtobuf() { } fileBuilder.setLength(lengths.get(i)); fileBuilder.setStart(starts.get(i)); - + if (!metadataColumns.isEmpty()) { + Map metadataColumn = metadataColumns.get(i); + if (!metadataColumn.isEmpty()) { + metadataColumn.forEach( + (key, value) -> { + ReadRel.LocalFiles.FileOrFiles.metadataColumn.Builder mcBuilder = + ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder(); + mcBuilder.setKey(key).setValue(value); + fileBuilder.addMetadataColumns(mcBuilder.build()); + }); + } + } else { + ReadRel.LocalFiles.FileOrFiles.metadataColumn.Builder mcBuilder = + ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder(); + fileBuilder.addMetadataColumns(mcBuilder.build()); + } NamedStruct namedStruct = buildNamedStruct(); fileBuilder.setSchema(namedStruct); diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index b72bcbb01562..63a0f36ea66b 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -169,6 +169,12 @@ message ReadRel { /// File schema NamedStruct schema = 17; + + message metadataColumn { + string key = 1; + string value = 2; + } + repeated metadataColumn metadata_columns = 18; } } } diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/type.proto b/gluten-core/src/main/resources/substrait/proto/substrait/type.proto index 6130f2d76e22..5c7ee6a382ce 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/type.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/type.proto @@ -237,5 +237,6 @@ message NamedStruct { enum ColumnType { NORMAL_COL = 0; PARTITION_COL = 1; + METADATA_COL = 2; } } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index 83db8a8da85e..25d71f0fcb48 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -41,6 +41,7 @@ trait BackendSettingsApi { fields: Array[StructField], bucketSpec: Option[BucketSpec], options: Map[String, String]): ValidationResult = ValidationResult.ok + def supportNativeMetadataColumns(): Boolean = false def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false def supportSortMergeJoinExec(): Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala index 2f506f483c79..cd8995211985 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala @@ -36,7 +36,8 @@ trait IteratorApi { def genSplitInfo( partition: InputPartition, partitionSchema: StructType, - fileFormat: ReadFileFormat): SplitInfo + fileFormat: ReadFileFormat, + metadataColumnNames: Seq[String]): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index 625ab6e97449..2be1a162c996 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -37,12 +37,15 @@ import com.google.protobuf.StringValue import scala.collection.JavaConverters._ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource { + import org.apache.spark.sql.catalyst.util._ /** Returns the filters that can be pushed down to native file scan */ def filterExprs(): Seq[Expression] def outputAttributes(): Seq[Attribute] + def getMetadataColumns(): Seq[AttributeReference] + /** This can be used to report FileFormat for a file based scan operator. */ val fileFormat: ReadFileFormat @@ -63,7 +66,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getSplitInfos: Seq[SplitInfo] = { getPartitions.map( BackendsApiManager.getIteratorApiInstance - .genSplitInfo(_, getPartitionSchema, fileFormat)) + .genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name))) } def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { @@ -112,6 +115,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource attr => if (getPartitionSchema.exists(_.name.equals(attr.name))) { new ColumnTypeNode(1) + } else if (attr.isMetadataCol) { + new ColumnTypeNode(2) } else { new ColumnTypeNode(0) } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala index afa0ce0e20ab..dfb448f13242 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala @@ -108,6 +108,8 @@ abstract class BatchScanExecTransformerBase( throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported") } + override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty + override def outputAttributes(): Seq[Attribute] = output override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala index fb8cd31cfe28..2c8bee43ff1e 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala @@ -23,7 +23,7 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.FileSourceScanExecShim @@ -99,9 +99,11 @@ abstract class FileSourceScanExecTransformerBase( .genFileSourceScanTransformerMetrics(sparkContext) .filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias - def getPartitionFilters(): Seq[Expression] = partitionFilters + override def filterExprs(): Seq[Expression] = dataFiltersInScan + + override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns - override def filterExprs(): Seq[Expression] = dataFilters + def getPartitionFilters(): Seq[Expression] = partitionFilters override def outputAttributes(): Seq[Attribute] = output @@ -125,8 +127,14 @@ abstract class FileSourceScanExecTransformerBase( } override protected def doValidateInternal(): ValidationResult = { - if (hasMetadataColumns) { - return ValidationResult.notOk(s"Unsupported metadataColumns scan in native.") + if ( + !metadataColumns.isEmpty && !BackendsApiManager.getSettings.supportNativeMetadataColumns() + ) { + return ValidationResult.notOk(s"Unsupported metadata columns scan in native.") + } + + if (hasUnsupportedColumns) { + return ValidationResult.notOk(s"Unsupported columns scan in native.") } if (hasFieldIds) { diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index dc3beeb0dd8d..1f32ee8b401f 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -25,7 +25,7 @@ import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, Expression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.SparkPlan @@ -63,6 +63,8 @@ case class HiveTableScanExecTransformer( override def filterExprs(): Seq[Expression] = Seq.empty + override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty + override def outputAttributes(): Seq[Attribute] = output override def getPartitions: Seq[InputPartition] = partitions diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java index c763a46b1915..98cc0d90e8fe 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java @@ -16,6 +16,7 @@ */ package io.glutenproject.substrait.rel; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -58,6 +59,14 @@ class DeleteFile { List> partitionColumns, ReadFileFormat fileFormat, List preferredLocations) { - super(index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); + super( + index, + paths, + starts, + lengths, + partitionColumns, + new ArrayList<>(), + fileFormat, + preferredLocations); } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala index af15f7386fca..794c0089b073 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala @@ -16,6 +16,145 @@ */ package org.apache.spark.sql.execution.datasources +import io.glutenproject.execution.{FileSourceScanExecTransformer, FilterExecTransformer} +import io.glutenproject.utils.BackendTestUtils + +import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.GlutenSQLTestsBaseTrait +import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} + +import java.io.File +import java.sql.Timestamp + +import scala.reflect.ClassTag + +class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenSQLTestsBaseTrait { + + val schemaWithFilePathField: StructType = new StructType() + .add(StructField("file_path", StringType)) + .add(StructField("age", IntegerType)) + .add( + StructField( + "info", + new StructType() + .add(StructField("id", LongType)) + .add(StructField("university", StringType)))) + + private val METADATA_FILE_PATH = "_metadata.file_path" + private val METADATA_FILE_NAME = "_metadata.file_name" + private val METADATA_FILE_SIZE = "_metadata.file_size" + private val METADATA_FILE_MODIFICATION_TIME = "_metadata.file_modification_time" + + private def getMetadataForFile(f: File): Map[String, Any] = { + Map( + METADATA_FILE_PATH -> f.toURI.toString, + METADATA_FILE_NAME -> f.getName, + METADATA_FILE_SIZE -> f.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified()) + ) + } + + private def metadataColumnsNativeTest(testName: String, fileSchema: StructType)( + f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = { + Seq("parquet").foreach { + testFileFormat => + test(s"$GLUTEN_TEST metadata struct ($testFileFormat): " + testName) { + withTempDir { + dir => + import scala.collection.JavaConverters._ + + // 1. create df0 and df1 and save under /data/f0 and /data/f1 + val df0 = spark.createDataFrame(data0.asJava, fileSchema) + val f0 = new File(dir, "data/f0").getCanonicalPath + df0.coalesce(1).write.format(testFileFormat).save(f0) + + val df1 = spark.createDataFrame(data1.asJava, fileSchema) + val f1 = new File(dir, "data/f1 gluten").getCanonicalPath + df1.coalesce(1).write.format(testFileFormat).save(f1) + + // 2. read both f0 and f1 + val df = spark.read + .format(testFileFormat) + .schema(fileSchema) + .load(new File(dir, "data").getCanonicalPath + "/*") + val realF0 = new File(dir, "data/f0") + .listFiles() + .filter(_.getName.endsWith(s".$testFileFormat")) + .head + val realF1 = new File(dir, "data/f1 gluten") + .listFiles() + .filter(_.getName.endsWith(s".$testFileFormat")) + .head + f(df, getMetadataForFile(realF0), getMetadataForFile(realF1)) + } + } + } + } + + def checkOperatorMatch[T](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { + val executedPlan = getExecutedPlan(df) + assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass)) + } + + metadataColumnsNativeTest( + "plan check with metadata and user data select", + schemaWithFilePathField) { + (df, f0, f1) => + var dfWithMetadata = df.select( + METADATA_FILE_NAME, + METADATA_FILE_PATH, + METADATA_FILE_SIZE, + METADATA_FILE_MODIFICATION_TIME, + "age") + dfWithMetadata.collect + if (BackendTestUtils.isVeloxBackendLoaded()) { + checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata) + } else { + checkOperatorMatch[FileSourceScanExec](dfWithMetadata) + } + + // would fallback + dfWithMetadata = df.select(METADATA_FILE_PATH, "file_path") + checkAnswer( + dfWithMetadata, + Seq( + Row(f0(METADATA_FILE_PATH), "jack"), + Row(f1(METADATA_FILE_PATH), "lily") + ) + ) + checkOperatorMatch[FileSourceScanExec](dfWithMetadata) + } + + metadataColumnsNativeTest("plan check with metadata filter", schemaWithFilePathField) { + (df, f0, f1) => + var filterDF = df + .select("file_path", "age", METADATA_FILE_NAME) + .where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME))) + val ret = filterDF.collect + assert(ret.size == 1) + if (BackendTestUtils.isVeloxBackendLoaded()) { + checkOperatorMatch[FileSourceScanExecTransformer](filterDF) + } else { + checkOperatorMatch[FileSourceScanExec](filterDF) + } + checkOperatorMatch[FilterExecTransformer](filterDF) -class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenSQLTestsBaseTrait {} + // case to check if file_path is URI string + filterDF = + df.select(METADATA_FILE_PATH).where(Column(METADATA_FILE_NAME) === f1((METADATA_FILE_NAME))) + checkAnswer( + filterDF, + Seq( + Row(f1(METADATA_FILE_PATH)) + ) + ) + if (BackendTestUtils.isVeloxBackendLoaded()) { + checkOperatorMatch[FileSourceScanExecTransformer](filterDF) + } else { + checkOperatorMatch[FileSourceScanExec](filterDF) + } + checkOperatorMatch[FilterExecTransformer](filterDF) + } +} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala index af15f7386fca..794c0089b073 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileMetadataStructSuite.scala @@ -16,6 +16,145 @@ */ package org.apache.spark.sql.execution.datasources +import io.glutenproject.execution.{FileSourceScanExecTransformer, FilterExecTransformer} +import io.glutenproject.utils.BackendTestUtils + +import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.GlutenSQLTestsBaseTrait +import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} + +import java.io.File +import java.sql.Timestamp + +import scala.reflect.ClassTag + +class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenSQLTestsBaseTrait { + + val schemaWithFilePathField: StructType = new StructType() + .add(StructField("file_path", StringType)) + .add(StructField("age", IntegerType)) + .add( + StructField( + "info", + new StructType() + .add(StructField("id", LongType)) + .add(StructField("university", StringType)))) + + private val METADATA_FILE_PATH = "_metadata.file_path" + private val METADATA_FILE_NAME = "_metadata.file_name" + private val METADATA_FILE_SIZE = "_metadata.file_size" + private val METADATA_FILE_MODIFICATION_TIME = "_metadata.file_modification_time" + + private def getMetadataForFile(f: File): Map[String, Any] = { + Map( + METADATA_FILE_PATH -> f.toURI.toString, + METADATA_FILE_NAME -> f.getName, + METADATA_FILE_SIZE -> f.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified()) + ) + } + + private def metadataColumnsNativeTest(testName: String, fileSchema: StructType)( + f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = { + Seq("parquet").foreach { + testFileFormat => + test(s"$GLUTEN_TEST metadata struct ($testFileFormat): " + testName) { + withTempDir { + dir => + import scala.collection.JavaConverters._ + + // 1. create df0 and df1 and save under /data/f0 and /data/f1 + val df0 = spark.createDataFrame(data0.asJava, fileSchema) + val f0 = new File(dir, "data/f0").getCanonicalPath + df0.coalesce(1).write.format(testFileFormat).save(f0) + + val df1 = spark.createDataFrame(data1.asJava, fileSchema) + val f1 = new File(dir, "data/f1 gluten").getCanonicalPath + df1.coalesce(1).write.format(testFileFormat).save(f1) + + // 2. read both f0 and f1 + val df = spark.read + .format(testFileFormat) + .schema(fileSchema) + .load(new File(dir, "data").getCanonicalPath + "/*") + val realF0 = new File(dir, "data/f0") + .listFiles() + .filter(_.getName.endsWith(s".$testFileFormat")) + .head + val realF1 = new File(dir, "data/f1 gluten") + .listFiles() + .filter(_.getName.endsWith(s".$testFileFormat")) + .head + f(df, getMetadataForFile(realF0), getMetadataForFile(realF1)) + } + } + } + } + + def checkOperatorMatch[T](df: DataFrame)(implicit tag: ClassTag[T]): Unit = { + val executedPlan = getExecutedPlan(df) + assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass)) + } + + metadataColumnsNativeTest( + "plan check with metadata and user data select", + schemaWithFilePathField) { + (df, f0, f1) => + var dfWithMetadata = df.select( + METADATA_FILE_NAME, + METADATA_FILE_PATH, + METADATA_FILE_SIZE, + METADATA_FILE_MODIFICATION_TIME, + "age") + dfWithMetadata.collect + if (BackendTestUtils.isVeloxBackendLoaded()) { + checkOperatorMatch[FileSourceScanExecTransformer](dfWithMetadata) + } else { + checkOperatorMatch[FileSourceScanExec](dfWithMetadata) + } + + // would fallback + dfWithMetadata = df.select(METADATA_FILE_PATH, "file_path") + checkAnswer( + dfWithMetadata, + Seq( + Row(f0(METADATA_FILE_PATH), "jack"), + Row(f1(METADATA_FILE_PATH), "lily") + ) + ) + checkOperatorMatch[FileSourceScanExec](dfWithMetadata) + } + + metadataColumnsNativeTest("plan check with metadata filter", schemaWithFilePathField) { + (df, f0, f1) => + var filterDF = df + .select("file_path", "age", METADATA_FILE_NAME) + .where(Column(METADATA_FILE_NAME) === f0((METADATA_FILE_NAME))) + val ret = filterDF.collect + assert(ret.size == 1) + if (BackendTestUtils.isVeloxBackendLoaded()) { + checkOperatorMatch[FileSourceScanExecTransformer](filterDF) + } else { + checkOperatorMatch[FileSourceScanExec](filterDF) + } + checkOperatorMatch[FilterExecTransformer](filterDF) -class GlutenFileMetadataStructSuite extends FileMetadataStructSuite with GlutenSQLTestsBaseTrait {} + // case to check if file_path is URI string + filterDF = + df.select(METADATA_FILE_PATH).where(Column(METADATA_FILE_NAME) === f1((METADATA_FILE_NAME))) + checkAnswer( + filterDF, + Seq( + Row(f1(METADATA_FILE_PATH)) + ) + ) + if (BackendTestUtils.isVeloxBackendLoaded()) { + checkOperatorMatch[FileSourceScanExecTransformer](filterDF) + } else { + checkOperatorMatch[FileSourceScanExec](filterDF) + } + checkOperatorMatch[FilterExecTransformer](filterDF) + } +} diff --git a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala index ab560a06049f..64ed1b866c0c 100644 --- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala @@ -44,6 +44,8 @@ import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.hadoop.fs.{FileStatus, Path} +import java.util.{ArrayList => JArrayList, Map => JMap} + sealed abstract class ShimDescriptor case class SparkShimDescriptor(major: Int, minor: Int, patch: Int) extends ShimDescriptor { @@ -159,6 +161,10 @@ trait SparkShims { def attributesFromStruct(structType: StructType): Seq[Attribute] + def generateMetadataColumns( + file: PartitionedFile, + metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String] + // For compatibility with Spark-3.5. def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan] diff --git a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala index 5a3579b25c18..8f028968e18a 100644 --- a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala @@ -45,6 +45,8 @@ import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.hadoop.fs.{FileStatus, Path} +import java.util.{HashMap => JHashMap, Map => JMap} + class Spark32Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR @@ -185,6 +187,11 @@ class Spark32Shims extends SparkShims { } } + override def generateMetadataColumns( + file: PartitionedFile, + metadataColumnNames: Seq[String]): JMap[String, String] = + new JHashMap[String, String]() + def getAnalysisExceptionPlan(ae: AnalysisException): Option[LogicalPlan] = { ae.plan } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 654bc4924e70..e27f4bc28893 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -53,7 +53,14 @@ abstract class FileSourceScanExecShim( // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics: Map[String, SQLMetric] = Map() - def hasMetadataColumns: Boolean = false + def dataFiltersInScan: Seq[Expression] = dataFilters + + def metadataColumns: Seq[AttributeReference] = Seq.empty + + def hasUnsupportedColumns: Boolean = { + // Below name has special meaning in Velox. + output.exists(a => a.name == "$path" || a.name == "$bucket") + } def isMetadataColumn(attr: Attribute): Boolean = false diff --git a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala index 5f8134f7e525..8e770325f9b3 100644 --- a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala @@ -33,10 +33,11 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} -import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.text.TextScan @@ -48,6 +49,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.hadoop.fs.{FileStatus, Path} +import java.time.ZoneOffset +import java.util.{HashMap => JHashMap, Map => JMap} + class Spark33Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR @@ -152,6 +156,27 @@ class Spark33Shims extends SparkShims { case _ => None } } + override def generateMetadataColumns( + file: PartitionedFile, + metadataColumnNames: Seq[String]): JMap[String, String] = { + val metadataColumn = new JHashMap[String, String]() + val path = new Path(file.filePath.toString) + for (columnName <- metadataColumnNames) { + columnName match { + case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, path.toString) + case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, path.getName) + case FileFormat.FILE_SIZE => + metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString) + case FileFormat.FILE_MODIFICATION_TIME => + val fileModifyTime = TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(file.modificationTime * 1000L) + metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime) + case _ => + } + } + metadataColumn + } private def invalidBucketFile(path: String): Throwable = { new SparkException( diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index e07899793693..92f32f847d96 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution import io.glutenproject.metrics.GlutenTimeMetric import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, PlanExpression, Predicate} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceMetadataAttribute, PlanExpression, Predicate} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType @@ -54,7 +54,20 @@ abstract class FileSourceScanExecShim( // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics: Map[String, SQLMetric] = Map() - def hasMetadataColumns: Boolean = metadataColumns.nonEmpty + def dataFiltersInScan: Seq[Expression] = dataFilters.filterNot(_.references.exists { + case FileSourceMetadataAttribute(attr) if attr.name == "_metadata" => true + case _ => false + }) + + def hasUnsupportedColumns: Boolean = { + // TODO, fallback if user define same name column due to we can't right now + // detect which column is metadata column which is user defined column. + val metadataColumnsNames = metadataColumns.map(_.name) + output + .filterNot(metadataColumns.toSet) + .exists(v => metadataColumnsNames.contains(v.name)) || + output.exists(a => a.name == "$path" || a.name == "$bucket") + } def isMetadataColumn(attr: Attribute): Boolean = metadataColumns.contains(attr) diff --git a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala index cd8449bb37dd..c98def5daeff 100644 --- a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala @@ -34,11 +34,12 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, KeyGroupedPartitioning, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper +import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, Scan} import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec, GlutenFileFormatWriter, PartitionedFileUtil, SparkPlan, TakeOrderedAndProjectExec} -import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult} +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat, FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.text.TextScan import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil @@ -49,6 +50,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.hadoop.fs.{FileStatus, Path} +import java.time.ZoneOffset +import java.util.{HashMap => JHashMap, Map => JMap} + class Spark34Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR @@ -155,6 +159,34 @@ class Spark34Shims extends SparkShims { } } + override def generateMetadataColumns( + file: PartitionedFile, + metadataColumnNames: Seq[String]): JMap[String, String] = { + val metadataColumn = new JHashMap[String, String]() + val path = new Path(file.filePath.toString) + for (columnName <- metadataColumnNames) { + columnName match { + case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, path.toString) + case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, path.getName) + case FileFormat.FILE_SIZE => + metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString) + case FileFormat.FILE_MODIFICATION_TIME => + val fileModifyTime = TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(file.modificationTime * 1000L) + metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime) + case FileFormat.FILE_BLOCK_START => + metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString) + case FileFormat.FILE_BLOCK_LENGTH => + metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH, file.length.toString) + case _ => + } + } + + // TODO: row_index metadata support + metadataColumn + } + // https://issues.apache.org/jira/browse/SPARK-40400 private def invalidBucketFile(path: String): Throwable = { new SparkException( diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index f24ab8d57172..57d1bd06a20e 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution import io.glutenproject.metrics.GlutenTimeMetric import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, PlanExpression, Predicate} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, FileSourceMetadataAttribute, PlanExpression, Predicate} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType @@ -55,7 +55,18 @@ abstract class FileSourceScanExecShim( case FileSourceGeneratedMetadataAttribute(attr) => attr } - def hasMetadataColumns: Boolean = metadataColumns.nonEmpty + def dataFiltersInScan: Seq[Expression] = dataFilters + + def hasUnsupportedColumns: Boolean = { + val metadataColumnsNames = metadataColumns.map(_.name) + // row_index metadata is not support yet + metadataColumnsNames.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) || + output + .filterNot(metadataColumns.toSet) + .exists(v => metadataColumnsNames.contains(v.name)) || + // Below name has special meaning in Velox. + output.exists(a => a.name == "$path" || a.name == "$bucket") + } def isMetadataColumn(attr: Attribute): Boolean = metadataColumns.contains(attr) diff --git a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala index a33801653390..6a6f3b2c8fd1 100644 --- a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala @@ -34,10 +34,11 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec, GlutenFileFormatWriter, PartitionedFileUtil, SparkPlan, TakeOrderedAndProjectExec} -import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, FileStatusWithMetadata, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult} +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormat, FilePartition, FileScanRDD, FileStatusWithMetadata, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.text.TextScan import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil @@ -48,6 +49,9 @@ import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.hadoop.fs.{FileStatus, Path} +import java.time.ZoneOffset +import java.util.{HashMap => JHashMap, Map => JMap} + class Spark35Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR @@ -152,6 +156,34 @@ class Spark35Shims extends SparkShims { } } + override def generateMetadataColumns( + file: PartitionedFile, + metadataColumnNames: Seq[String]): JMap[String, String] = { + val metadataColumn = new JHashMap[String, String]() + val path = new Path(file.filePath.toString) + for (columnName <- metadataColumnNames) { + columnName match { + case FileFormat.FILE_PATH => metadataColumn.put(FileFormat.FILE_PATH, path.toString) + case FileFormat.FILE_NAME => metadataColumn.put(FileFormat.FILE_NAME, path.getName) + case FileFormat.FILE_SIZE => + metadataColumn.put(FileFormat.FILE_SIZE, file.fileSize.toString) + case FileFormat.FILE_MODIFICATION_TIME => + val fileModifyTime = TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(file.modificationTime * 1000L) + metadataColumn.put(FileFormat.FILE_MODIFICATION_TIME, fileModifyTime) + case FileFormat.FILE_BLOCK_START => + metadataColumn.put(FileFormat.FILE_BLOCK_START, file.start.toString) + case FileFormat.FILE_BLOCK_LENGTH => + metadataColumn.put(FileFormat.FILE_BLOCK_LENGTH, file.length.toString) + case _ => + } + } + + // TODO row_index metadata support + metadataColumn + } + // https://issues.apache.org/jira/browse/SPARK-40400 private def invalidBucketFile(path: String): Throwable = { new SparkException( diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index acede96da0ea..efcb9dbad3b1 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution import io.glutenproject.metrics.GlutenTimeMetric import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, PlanExpression, Predicate} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, FileSourceMetadataAttribute, PlanExpression, Predicate} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType @@ -50,14 +50,27 @@ abstract class FileSourceScanExecShim( // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics: Map[String, SQLMetric] = Map() - lazy val metadataColumns = output.collect { + lazy val metadataColumns: Seq[AttributeReference] = output.collect { case FileSourceConstantMetadataAttribute(attr) => attr - case FileSourceGeneratedMetadataAttribute(attr) => attr + case FileSourceGeneratedMetadataAttribute(attr, _) => attr } protected lazy val driverMetricsAlias = driverMetrics - def hasMetadataColumns: Boolean = metadataColumns.nonEmpty + def dataFiltersInScan: Seq[Expression] = dataFilters.filterNot(_.references.exists { + case FileSourceMetadataAttribute(_) => true + case _ => false + }) + + def hasUnsupportedColumns: Boolean = { + // TODO, fallback if user define same name column due to we can't right now + // detect which column is metadata column which is user defined column. + val metadataColumnsNames = metadataColumns.map(_.name) + output + .filterNot(metadataColumns.toSet) + .exists(v => metadataColumnsNames.contains(v.name)) || + output.exists(a => a.name == "$path" || a.name == "$bucket") + } def isMetadataColumn(attr: Attribute): Boolean = metadataColumns.contains(attr)