Skip to content

Commit

Permalink
[VL] Pass file size and modification time in split (#6029)
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor authored Jun 11, 2024
1 parent df627be commit 13babf3
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,13 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
paths,
starts,
lengths,
new JArrayList[JLong](),
new JArrayList[JLong](),
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
preferredLocations.toList.asJava)
preferredLocations.toList.asJava
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition: $partition.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ class VeloxIteratorApi extends IteratorApi with Logging {
metadataColumnNames: Seq[String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (paths, starts, lengths, partitionColumns, metadataColumns) =
val (
paths,
starts,
lengths,
fileSizes,
modificationTimes,
partitionColumns,
metadataColumns) =
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
Expand All @@ -65,6 +72,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
paths,
starts,
lengths,
fileSizes,
modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
Expand Down Expand Up @@ -100,6 +109,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
val fileSizes = new JArrayList[JLong]()
val modificationTimes = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
var metadataColumns = new JArrayList[JMap[String, String]]
files.foreach {
Expand All @@ -111,6 +122,14 @@ class VeloxIteratorApi extends IteratorApi with Logging {
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val (fileSize, modificationTime) =
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
(fileSize, modificationTime) match {
case (Some(size), Some(time)) =>
fileSizes.add(JLong.valueOf(size))
modificationTimes.add(JLong.valueOf(time))
case _ => // Do nothing
}
val metadataColumn =
SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames)
metadataColumns.add(metadataColumn)
Expand Down Expand Up @@ -138,7 +157,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
partitionColumns.add(partitionColumn)
}
(paths, starts, lengths, partitionColumns, metadataColumns)
(paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns)
}

override def injectWriteFilesTempPath(path: String): Unit = {
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
splitInfo->partitionColumns.reserve(fileList.size());
splitInfo->properties.reserve(fileList.size());
splitInfo->metadataColumns.reserve(fileList.size());
for (const auto& file : fileList) {
// Expect all Partitions share the same index.
Expand All @@ -80,6 +81,8 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
splitInfo->paths.emplace_back(file.uri_file());
splitInfo->starts.emplace_back(file.start());
splitInfo->lengths.emplace_back(file.length());
facebook::velox::FileProperties fileProps = {file.properties().filesize(), file.properties().modificationtime()};
splitInfo->properties.emplace_back(fileProps);
switch (file.file_format_case()) {
case SubstraitFileFormatCase::kOrc:
splitInfo->format = dwio::common::FileFormat::ORC;
Expand Down
8 changes: 6 additions & 2 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ WholeStageResultIterator::WholeStageResultIterator(
const auto& paths = scanInfo->paths;
const auto& starts = scanInfo->starts;
const auto& lengths = scanInfo->lengths;
const auto& properties = scanInfo->properties;
const auto& format = scanInfo->format;
const auto& partitionColumns = scanInfo->partitionColumns;
const auto& metadataColumns = scanInfo->metadataColumns;
Expand All @@ -135,7 +136,9 @@ WholeStageResultIterator::WholeStageResultIterator(
std::nullopt,
customSplitInfo,
nullptr,
deleteFiles);
deleteFiles,
std::unordered_map<std::string, std::string>(),
properties[idx]);
} else {
split = std::make_shared<velox::connector::hive::HiveConnectorSplit>(
kHiveConnectorId,
Expand All @@ -149,7 +152,8 @@ WholeStageResultIterator::WholeStageResultIterator(
nullptr,
std::unordered_map<std::string, std::string>(),
0,
metadataColumn);
metadataColumn,
properties[idx]);
}
connectorSplits.emplace_back(split);
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "SubstraitToVeloxExpr.h"
#include "TypeUtils.h"
#include "velox/connectors/hive/FileProperties.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/PlanNode.h"
#include "velox/dwio/common/Options.h"
Expand Down Expand Up @@ -51,6 +52,9 @@ struct SplitInfo {
/// The file format of the files to be scanned.
dwio::common::FileFormat format;

/// The file sizes and modification times of the files to be scanned.
std::vector<std::optional<facebook::velox::FileProperties>> properties;

/// Make SplitInfo polymorphic
virtual ~SplitInfo() = default;
};
Expand Down Expand Up @@ -111,6 +115,7 @@ class SubstraitToVeloxPlanConverter {
/// Index: the index of the partition this item belongs to.
/// Starts: the start positions in byte to read from the items.
/// Lengths: the lengths in byte to read from the items.
/// FileProperties: the file sizes and modification times of the files to be scanned.
core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead);

core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public static LocalFilesNode makeLocalFiles(
List<String> paths,
List<Long> starts,
List<Long> lengths,
List<Long> fileSizes,
List<Long> modificationTimes,
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
LocalFilesNode.ReadFileFormat fileFormat,
Expand All @@ -36,6 +38,8 @@ public static LocalFilesNode makeLocalFiles(
paths,
starts,
lengths,
fileSizes,
modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class LocalFilesNode implements SplitInfo {
private final List<String> paths = new ArrayList<>();
private final List<Long> starts = new ArrayList<>();
private final List<Long> lengths = new ArrayList<>();
private final List<Long> fileSizes = new ArrayList<>();
private final List<Long> modificationTimes = new ArrayList<>();
private final List<Map<String, String>> partitionColumns = new ArrayList<>();
private final List<Map<String, String>> metadataColumns = new ArrayList<>();
private final List<String> preferredLocations = new ArrayList<>();
Expand All @@ -60,6 +62,8 @@ public enum ReadFileFormat {
List<String> paths,
List<Long> starts,
List<Long> lengths,
List<Long> fileSizes,
List<Long> modificationTimes,
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
ReadFileFormat fileFormat,
Expand All @@ -68,6 +72,8 @@ public enum ReadFileFormat {
this.paths.addAll(paths);
this.starts.addAll(starts);
this.lengths.addAll(lengths);
this.fileSizes.addAll(fileSizes);
this.modificationTimes.addAll(modificationTimes);
this.fileFormat = fileFormat;
this.partitionColumns.addAll(partitionColumns);
this.metadataColumns.addAll(metadataColumns);
Expand Down Expand Up @@ -153,6 +159,18 @@ public ReadRel.LocalFiles toProtobuf() {
}
fileBuilder.setLength(lengths.get(i));
fileBuilder.setStart(starts.get(i));

if (!fileSizes.isEmpty()
&& !modificationTimes.isEmpty()
&& fileSizes.size() == modificationTimes.size()
&& fileSizes.size() == paths.size()) {
ReadRel.LocalFiles.FileOrFiles.fileProperties.Builder filePropsBuilder =
ReadRel.LocalFiles.FileOrFiles.fileProperties.newBuilder();
filePropsBuilder.setFileSize(fileSizes.get(i));
filePropsBuilder.setModificationTime(modificationTimes.get(i));
fileBuilder.setProperties(filePropsBuilder.build());
}

if (!metadataColumns.isEmpty()) {
Map<String, String> metadataColumn = metadataColumns.get(i);
if (!metadataColumn.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ message ReadRel {
string value = 2;
}
repeated metadataColumn metadata_columns = 19;

// File properties contained in split
message fileProperties {
int64 fileSize = 1;
int64 modificationTime = 2;
}
fileProperties properties = 20;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
paths,
starts,
lengths,
new ArrayList<>(),
new ArrayList<>(),
partitionColumns,
new ArrayList<>(),
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ trait SparkShims {

def attributesFromStruct(structType: StructType): Seq[Attribute]

// Spark 3.3 and later only have file size and modification time in PartitionedFile
def getFileSizeAndModificationTime(file: PartitionedFile): (Option[Long], Option[Long])

def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String] = Seq.empty): JMap[String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class Spark32Shims extends SparkShims {
}
}

override def getFileSizeAndModificationTime(
file: PartitionedFile): (Option[Long], Option[Long]) = {
(None, None)
}

override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ class Spark33Shims extends SparkShims {
case other => other
}

override def getFileSizeAndModificationTime(
file: PartitionedFile): (Option[Long], Option[Long]) = {
(Some(file.fileSize), Some(file.modificationTime))
}

override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ class Spark34Shims extends SparkShims {
case other => other
}

override def getFileSizeAndModificationTime(
file: PartitionedFile): (Option[Long], Option[Long]) = {
(Some(file.fileSize), Some(file.modificationTime))
}

override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ class Spark35Shims extends SparkShims {
case other => other
}

override def getFileSizeAndModificationTime(
file: PartitionedFile): (Option[Long], Option[Long]) = {
(Some(file.fileSize), Some(file.modificationTime))
}

override def generateMetadataColumns(
file: PartitionedFile,
metadataColumnNames: Seq[String]): JMap[String, String] = {
Expand Down

0 comments on commit 13babf3

Please sign in to comment.