From 13babf369d4a5f5dc9833bea7ec22bfa682f8ffe Mon Sep 17 00:00:00 2001 From: Ankita Victor Date: Tue, 11 Jun 2024 07:19:57 +0530 Subject: [PATCH] [VL] Pass file size and modification time in split (#6029) --- .../clickhouse/CHIteratorApi.scala | 5 +++- .../backendsapi/velox/VeloxIteratorApi.scala | 23 +++++++++++++++++-- cpp/velox/compute/VeloxPlanConverter.cc | 3 +++ cpp/velox/compute/WholeStageResultIterator.cc | 8 +++++-- cpp/velox/substrait/SubstraitToVeloxPlan.h | 5 ++++ .../substrait/rel/LocalFilesBuilder.java | 4 ++++ .../gluten/substrait/rel/LocalFilesNode.java | 18 +++++++++++++++ .../substrait/proto/substrait/algebra.proto | 7 ++++++ .../substrait/rel/IcebergLocalFilesNode.java | 2 ++ .../apache/gluten/sql/shims/SparkShims.scala | 3 +++ .../sql/shims/spark32/Spark32Shims.scala | 5 ++++ .../sql/shims/spark33/Spark33Shims.scala | 5 ++++ .../sql/shims/spark34/Spark34Shims.scala | 5 ++++ .../sql/shims/spark35/Spark35Shims.scala | 5 ++++ 14 files changed, 93 insertions(+), 5 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 63f7eeb798f3..1221710bce6b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -131,10 +131,13 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { paths, starts, lengths, + new JArrayList[JLong](), + new JArrayList[JLong](), partitionColumns, new JArrayList[JMap[String, String]](), fileFormat, - preferredLocations.toList.asJava) + preferredLocations.toList.asJava + ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition: $partition.") } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 5f9b5afa9976..b20eccafb625 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -56,7 +56,14 @@ class VeloxIteratorApi extends IteratorApi with Logging { metadataColumnNames: Seq[String]): SplitInfo = { partition match { case f: FilePartition => - val (paths, starts, lengths, partitionColumns, metadataColumns) = + val ( + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns) = constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) @@ -65,6 +72,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { paths, starts, lengths, + fileSizes, + modificationTimes, partitionColumns, metadataColumns, fileFormat, @@ -100,6 +109,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] val lengths = new JArrayList[JLong]() + val fileSizes = new JArrayList[JLong]() + val modificationTimes = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] var metadataColumns = new JArrayList[JMap[String, String]] files.foreach { @@ -111,6 +122,14 @@ class VeloxIteratorApi extends IteratorApi with Logging { .decode(file.filePath.toString, StandardCharsets.UTF_8.name())) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) + val (fileSize, modificationTime) = + SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file) + (fileSize, modificationTime) match { + case (Some(size), Some(time)) => + fileSizes.add(JLong.valueOf(size)) + modificationTimes.add(JLong.valueOf(time)) + case _ => // Do nothing + } val metadataColumn = SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames) metadataColumns.add(metadataColumn) @@ -138,7 +157,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { } partitionColumns.add(partitionColumn) } - (paths, starts, lengths, partitionColumns, metadataColumns) + (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns) } override def injectWriteFilesTempPath(path: String): Unit = { diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index ed42cb15a51e..bcd03b110afd 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -60,6 +60,7 @@ std::shared_ptr parseScanSplitInfo( splitInfo->starts.reserve(fileList.size()); splitInfo->lengths.reserve(fileList.size()); splitInfo->partitionColumns.reserve(fileList.size()); + splitInfo->properties.reserve(fileList.size()); splitInfo->metadataColumns.reserve(fileList.size()); for (const auto& file : fileList) { // Expect all Partitions share the same index. @@ -80,6 +81,8 @@ std::shared_ptr parseScanSplitInfo( splitInfo->paths.emplace_back(file.uri_file()); splitInfo->starts.emplace_back(file.start()); splitInfo->lengths.emplace_back(file.length()); + facebook::velox::FileProperties fileProps = {file.properties().filesize(), file.properties().modificationtime()}; + splitInfo->properties.emplace_back(fileProps); switch (file.file_format_case()) { case SubstraitFileFormatCase::kOrc: splitInfo->format = dwio::common::FileFormat::ORC; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index f719c119c3e0..867d347cdc64 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -109,6 +109,7 @@ WholeStageResultIterator::WholeStageResultIterator( const auto& paths = scanInfo->paths; const auto& starts = scanInfo->starts; const auto& lengths = scanInfo->lengths; + const auto& properties = scanInfo->properties; const auto& format = scanInfo->format; const auto& partitionColumns = scanInfo->partitionColumns; const auto& metadataColumns = scanInfo->metadataColumns; @@ -135,7 +136,9 @@ WholeStageResultIterator::WholeStageResultIterator( std::nullopt, customSplitInfo, nullptr, - deleteFiles); + deleteFiles, + std::unordered_map(), + properties[idx]); } else { split = std::make_shared( kHiveConnectorId, @@ -149,7 +152,8 @@ WholeStageResultIterator::WholeStageResultIterator( nullptr, std::unordered_map(), 0, - metadataColumn); + metadataColumn, + properties[idx]); } connectorSplits.emplace_back(split); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 1bda6435eaee..567ebb215078 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -19,6 +19,7 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" +#include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" #include "velox/dwio/common/Options.h" @@ -51,6 +52,9 @@ struct SplitInfo { /// The file format of the files to be scanned. dwio::common::FileFormat format; + /// The file sizes and modification times of the files to be scanned. + std::vector> properties; + /// Make SplitInfo polymorphic virtual ~SplitInfo() = default; }; @@ -111,6 +115,7 @@ class SubstraitToVeloxPlanConverter { /// Index: the index of the partition this item belongs to. /// Starts: the start positions in byte to read from the items. /// Lengths: the lengths in byte to read from the items. + /// FileProperties: the file sizes and modification times of the files to be scanned. core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead); core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java index 94acc83367f5..7e085f81f4e6 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java @@ -27,6 +27,8 @@ public static LocalFilesNode makeLocalFiles( List paths, List starts, List lengths, + List fileSizes, + List modificationTimes, List> partitionColumns, List> metadataColumns, LocalFilesNode.ReadFileFormat fileFormat, @@ -36,6 +38,8 @@ public static LocalFilesNode makeLocalFiles( paths, starts, lengths, + fileSizes, + modificationTimes, partitionColumns, metadataColumns, fileFormat, diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index cbcda72dd03a..fa9f3d51612b 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -34,6 +34,8 @@ public class LocalFilesNode implements SplitInfo { private final List paths = new ArrayList<>(); private final List starts = new ArrayList<>(); private final List lengths = new ArrayList<>(); + private final List fileSizes = new ArrayList<>(); + private final List modificationTimes = new ArrayList<>(); private final List> partitionColumns = new ArrayList<>(); private final List> metadataColumns = new ArrayList<>(); private final List preferredLocations = new ArrayList<>(); @@ -60,6 +62,8 @@ public enum ReadFileFormat { List paths, List starts, List lengths, + List fileSizes, + List modificationTimes, List> partitionColumns, List> metadataColumns, ReadFileFormat fileFormat, @@ -68,6 +72,8 @@ public enum ReadFileFormat { this.paths.addAll(paths); this.starts.addAll(starts); this.lengths.addAll(lengths); + this.fileSizes.addAll(fileSizes); + this.modificationTimes.addAll(modificationTimes); this.fileFormat = fileFormat; this.partitionColumns.addAll(partitionColumns); this.metadataColumns.addAll(metadataColumns); @@ -153,6 +159,18 @@ public ReadRel.LocalFiles toProtobuf() { } fileBuilder.setLength(lengths.get(i)); fileBuilder.setStart(starts.get(i)); + + if (!fileSizes.isEmpty() + && !modificationTimes.isEmpty() + && fileSizes.size() == modificationTimes.size() + && fileSizes.size() == paths.size()) { + ReadRel.LocalFiles.FileOrFiles.fileProperties.Builder filePropsBuilder = + ReadRel.LocalFiles.FileOrFiles.fileProperties.newBuilder(); + filePropsBuilder.setFileSize(fileSizes.get(i)); + filePropsBuilder.setModificationTime(modificationTimes.get(i)); + fileBuilder.setProperties(filePropsBuilder.build()); + } + if (!metadataColumns.isEmpty()) { Map metadataColumn = metadataColumns.get(i); if (!metadataColumn.isEmpty()) { diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index 266aba4b0157..877493439f95 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -198,6 +198,13 @@ message ReadRel { string value = 2; } repeated metadataColumn metadata_columns = 19; + + // File properties contained in split + message fileProperties { + int64 fileSize = 1; + int64 modificationTime = 2; + } + fileProperties properties = 20; } } } diff --git a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java index 7d065f105a43..ba6b0ac4a029 100644 --- a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java @@ -42,6 +42,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode { paths, starts, lengths, + new ArrayList<>(), + new ArrayList<>(), partitionColumns, new ArrayList<>(), fileFormat, diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index d6acc8c27b29..8bbc6d3d18d4 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -207,6 +207,9 @@ trait SparkShims { def attributesFromStruct(structType: StructType): Seq[Attribute] + // Spark 3.3 and later only have file size and modification time in PartitionedFile + def getFileSizeAndModificationTime(file: PartitionedFile): (Option[Long], Option[Long]) + def generateMetadataColumns( file: PartitionedFile, metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String] diff --git a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala index 29fddc697b07..f24aef66a1cb 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala @@ -217,6 +217,11 @@ class Spark32Shims extends SparkShims { } } + override def getFileSizeAndModificationTime( + file: PartitionedFile): (Option[Long], Option[Long]) = { + (None, None) + } + override def generateMetadataColumns( file: PartitionedFile, metadataColumnNames: Seq[String]): JMap[String, String] = diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 7c6ce644dc74..68fc4ad0dc1a 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -202,6 +202,11 @@ class Spark33Shims extends SparkShims { case other => other } + override def getFileSizeAndModificationTime( + file: PartitionedFile): (Option[Long], Option[Long]) = { + (Some(file.fileSize), Some(file.modificationTime)) + } + override def generateMetadataColumns( file: PartitionedFile, metadataColumnNames: Seq[String]): JMap[String, String] = { diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index f2c2482949b7..7d9fc389b7cb 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -208,6 +208,11 @@ class Spark34Shims extends SparkShims { case other => other } + override def getFileSizeAndModificationTime( + file: PartitionedFile): (Option[Long], Option[Long]) = { + (Some(file.fileSize), Some(file.modificationTime)) + } + override def generateMetadataColumns( file: PartitionedFile, metadataColumnNames: Seq[String]): JMap[String, String] = { diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index e0835c3069d2..54cea6993d13 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -206,6 +206,11 @@ class Spark35Shims extends SparkShims { case other => other } + override def getFileSizeAndModificationTime( + file: PartitionedFile): (Option[Long], Option[Long]) = { + (Some(file.fileSize), Some(file.modificationTime)) + } + override def generateMetadataColumns( file: PartitionedFile, metadataColumnNames: Seq[String]): JMap[String, String] = {