From b7be3106c77ed8fc603121c129a3ecbc927042fc Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Mon, 19 Feb 2024 13:41:07 -0800 Subject: [PATCH] Allow '$file_size' and '$file_modified_time' for HiveSplits as columns in a query --- velox/connectors/hive/HiveConnectorSplit.h | 8 ++- velox/connectors/hive/HiveConnectorUtil.cpp | 16 +++-- velox/connectors/hive/HiveConnectorUtil.h | 2 + velox/connectors/hive/SplitReader.cpp | 6 ++ .../connectors/hive/iceberg/IcebergSplit.cpp | 20 ++++-- velox/connectors/hive/iceberg/IcebergSplit.h | 8 ++- .../hive/iceberg/tests/IcebergReadTest.cpp | 4 +- .../hive/tests/HiveConnectorUtilTest.cpp | 4 +- velox/exec/tests/TableScanTest.cpp | 70 +++++++++++++++++++ .../tests/utils/HiveConnectorTestBase.cpp | 23 +++++- .../exec/tests/utils/HiveConnectorTestBase.h | 47 ++++++++++++- velox/exec/tests/utils/TempFilePath.h | 13 ++++ 12 files changed, 205 insertions(+), 16 deletions(-) diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 15be998884b0a..1e2d7fc28ff5e 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -25,6 +25,8 @@ namespace facebook::velox::connector::hive { struct HiveConnectorSplit : public connector::ConnectorSplit { const std::string filePath; dwio::common::FileFormat fileFormat; + const int64_t fileSize; + const int64_t fileModifiedTime; const uint64_t start; const uint64_t length; @@ -50,10 +52,14 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, - const std::unordered_map& _serdeParameters = {}) + const std::unordered_map& _serdeParameters = {}, + int64_t _fileSize = 0, + int64_t _fileModifiedTime = 0) : ConnectorSplit(connectorId), filePath(_filePath), fileFormat(_fileFormat), + fileSize(_fileSize), + fileModifiedTime(_fileModifiedTime), start(_start), length(_length), partitionKeys(_partitionKeys), diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 95b47de314a0b..57bc6d3d7d61f 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -237,6 +237,11 @@ inline uint8_t parseDelimiter(const std::string& delim) { return stoi(delim); } +inline bool isSynthesizedColumn(const std::string& name) { + return name == kPath || name == kBucket || name == kFileSize || + name == kFileModifiedTime; +} + } // namespace const std::string& getColumnName(const common::Subfield& subfield) { @@ -273,7 +278,7 @@ void checkColumnNameLowerCase(const std::shared_ptr& type) { void checkColumnNameLowerCase(const SubfieldFilters& filters) { for (auto& pair : filters) { - if (auto name = pair.first.toString(); name == kPath || name == kBucket) { + if (auto name = pair.first.toString(); isSynthesizedColumn(name)) { continue; } auto& path = pair.first.path(); @@ -315,7 +320,7 @@ std::shared_ptr makeScanSpec( std::vector subfieldSpecs; for (auto& [subfield, _] : filters) { if (auto name = subfield.toString(); - name != kPath && name != kBucket && partitionKeys.count(name) == 0) { + !isSynthesizedColumn(name) && partitionKeys.count(name) == 0) { filterSubfields[getColumnName(subfield)].push_back(&subfield); } } @@ -362,11 +367,12 @@ std::shared_ptr makeScanSpec( // SelectiveColumnReader doesn't support constant columns with filters, // hence, we can't have a filter for a $path or $bucket column. // - // Unfortunately, Presto happens to specify a filter for $path or - // $bucket column. This filter is redundant and needs to be removed. + // Unfortunately, Presto happens to specify a filter for $path, $file_size, + // $file_modified_time or $bucket column. This filter is redundant and needs + // to be removed. // TODO Remove this check when Presto is fixed to not specify a filter // on $path and $bucket column. - if (auto name = pair.first.toString(); name == kPath || name == kBucket) { + if (auto name = pair.first.toString(); isSynthesizedColumn(name)) { continue; } auto fieldSpec = spec->getOrCreateChild(pair.first); diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 329295b133d4e..c9fe41b4185a8 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -35,6 +35,8 @@ using SubfieldFilters = constexpr const char* kPath = "$path"; constexpr const char* kBucket = "$bucket"; +constexpr const char* kFileSize = "$file_size"; +constexpr const char* kFileModifiedTime = "$file_modified_time"; const std::string& getColumnName(const common::Subfield& subfield); diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 6395d5d5f8bbd..f3e5f147fad53 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -197,6 +197,12 @@ std::vector SplitReader::adaptColumns( INTEGER(), velox::variant(hiveSplit_->tableBucketNumber.value())); } + } else if (fieldName == kFileSize) { + setConstantValue( + childSpec, BIGINT(), velox::variant(hiveSplit_->fileSize)); + } else if (fieldName == kFileModifiedTime) { + setConstantValue( + childSpec, BIGINT(), velox::variant(hiveSplit_->fileModifiedTime)); } else { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); if (!fileTypeIdx.has_value()) { diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp index 7fa9a52f2c691..31738f65d3c9e 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -30,7 +30,9 @@ HiveIcebergSplit::HiveIcebergSplit( _partitionKeys, std::optional _tableBucketNumber, const std::unordered_map& _customSplitInfo, - const std::shared_ptr& _extraFileInfo) + const std::shared_ptr& _extraFileInfo, + int64_t _fileSize, + int64_t _fileModifiedTime) : HiveConnectorSplit( _connectorId, _filePath, @@ -38,7 +40,12 @@ HiveIcebergSplit::HiveIcebergSplit( _start, _length, _partitionKeys, - _tableBucketNumber) { + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo, + {}, + _fileSize, + _fileModifiedTime) { // TODO: Deserialize _extraFileInfo to get deleteFiles; } @@ -54,7 +61,9 @@ HiveIcebergSplit::HiveIcebergSplit( std::optional _tableBucketNumber, const std::unordered_map& _customSplitInfo, const std::shared_ptr& _extraFileInfo, - std::vector _deletes) + std::vector _deletes, + int64_t _fileSize, + int64_t _fileModifiedTime) : HiveConnectorSplit( _connectorId, _filePath, @@ -64,6 +73,9 @@ HiveIcebergSplit::HiveIcebergSplit( _partitionKeys, _tableBucketNumber, _customSplitInfo, - _extraFileInfo), + _extraFileInfo, + {}, + _fileSize, + _fileModifiedTime), deleteFiles(_deletes) {} } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h index 37b8c3c3eb364..fda960c881b57 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.h +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -36,7 +36,9 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { _partitionKeys = {}, std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, - const std::shared_ptr& _extraFileInfo = {}); + const std::shared_ptr& _extraFileInfo = {}, + int64_t _fileSize = 0, + int64_t _fileModifiedTime = 0); // For tests only HiveIcebergSplit( @@ -50,7 +52,9 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, - std::vector deletes = {}); + std::vector deletes = {}, + int64_t _fileSize = 0, + int64_t _fileModifiedTime = 0); }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 79443c73b3cea..c9b24010a4fe0 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -121,7 +121,9 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::nullopt, customSplitInfo, nullptr, - deleteFiles); + deleteFiles, + fileSize, + 0); } std::vector makeVectors(int32_t count, int32_t rowsPerVector) { diff --git a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp index 5cd33e13d9758..4bf1ac452466f 100644 --- a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp @@ -78,7 +78,9 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) { std::nullopt, customSplitInfo, nullptr, - serdeParameters); + serdeParameters, + 65535, + 1342143134); }; auto performConfigure = [&]() { diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 76485ebb37fa4..462674612c625 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2291,6 +2291,76 @@ TEST_F(TableScanTest, path) { op, {filePath}, fmt::format("SELECT '{}', * FROM tmp", pathValue)); } +TEST_F(TableScanTest, fileSizeAndModifiedTime) { + auto rowType = ROW({"a"}, {BIGINT()}); + auto filePath = makeFilePaths(1)[0]; + auto vector = makeVectors(1, 10, rowType)[0]; + writeToFile(filePath->path, vector); + createDuckDbTable({vector}); + + static const char* kSize = "$file_size"; + static const char* kModifiedTime = "$file_modified_time"; + + auto allColumns = + ROW({"a", kSize, kModifiedTime}, {BIGINT(), BIGINT(), BIGINT()}); + + auto assignments = allRegularColumns(rowType); + assignments[kSize] = synthesizedColumn(kSize, BIGINT()); + assignments[kModifiedTime] = synthesizedColumn(kModifiedTime, BIGINT()); + + auto fileSizeValue = fmt::format("{}", filePath->fileSize()); + auto fileTimeValue = fmt::format("{}", filePath->fileModifiedTime()); + + // Select and project both '$file_size', '$file_modified_time'. + auto op = PlanBuilder() + .startTableScan() + .outputType(allColumns) + .dataColumns(allColumns) + .assignments(assignments) + .endTableScan() + .planNode(); + assertQuery( + op, + {filePath}, + fmt::format("SELECT *, {}, {} FROM tmp", fileSizeValue, fileTimeValue)); + + auto filterTest = [&](const std::string& filter) { + auto tableHandle = makeTableHandle( + SubfieldFilters{}, + parseExpr(filter, allColumns), + "hive_table", + allColumns); + + // Use synthesized column in a filter but don't project it. + op = PlanBuilder() + .startTableScan() + .outputType(rowType) + .dataColumns(allColumns) + .tableHandle(tableHandle) + .assignments(assignments) + .endTableScan() + .planNode(); + assertQuery(op, {filePath}, "SELECT * FROM tmp"); + + // Use synthesized column in a filter and project it out. + op = PlanBuilder() + .startTableScan() + .outputType(allColumns) + .dataColumns(allColumns) + .tableHandle(tableHandle) + .assignments(assignments) + .endTableScan() + .planNode(); + assertQuery( + op, + {filePath}, + fmt::format("SELECT *, {}, {} FROM tmp", fileSizeValue, fileTimeValue)); + }; + + filterTest(fmt::format("\"{}\" = {}", kSize, fileSizeValue)); + filterTest(fmt::format("\"{}\" = {}", kModifiedTime, fileTimeValue)); +} + TEST_F(TableScanTest, bucket) { vector_size_t size = 1'000; int numBatches = 5; diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index 70d7c630abc92..99bbff3475919 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -132,6 +132,7 @@ HiveConnectorTestBase::makeHiveConnectorSplits( .fileFormat(format) .start(i * splitSize) .length(splitSize) + .fileSize(fileSize) .build(); splits.push_back(std::move(split)); } @@ -171,7 +172,12 @@ HiveConnectorTestBase::makeHiveConnectorSplits( const std::vector>& filePaths) { std::vector> splits; for (auto filePath : filePaths) { - splits.push_back(makeHiveConnectorSplit(filePath->path)); + splits.push_back(makeHiveConnectorSplit( + filePath->path, + filePath->fileSize(), + filePath->fileModifiedTime(), + 0, + std::numeric_limits::max())); } return splits; } @@ -187,6 +193,21 @@ HiveConnectorTestBase::makeHiveConnectorSplit( .build(); } +std::shared_ptr +HiveConnectorTestBase::makeHiveConnectorSplit( + const std::string& filePath, + int64_t fileSize, + int64_t fileModifiedTime, + uint64_t start, + uint64_t length) { + return HiveConnectorSplitBuilder(filePath) + .fileSize(fileSize) + .fileModifiedTime(fileModifiedTime) + .start(start) + .length(length) + .build(); +} + // static std::shared_ptr HiveConnectorTestBase::makeHiveInsertTableHandle( diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 34ad0be177134..1bf54eb544b4a 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -73,6 +73,13 @@ class HiveConnectorTestBase : public OperatorTestBase { uint64_t start = 0, uint64_t length = std::numeric_limits::max()); + static std::shared_ptr makeHiveConnectorSplit( + const std::string& filePath, + int64_t fileSize, + int64_t fileModifiedTime, + uint64_t start, + uint64_t length); + /// Split file at path 'filePath' into 'splitCount' splits. If not local file, /// file size can be given as 'externalSize'. static std::vector> @@ -202,6 +209,16 @@ class HiveConnectorSplitBuilder { return *this; } + HiveConnectorSplitBuilder& fileSize(int64_t fileSize) { + fileSize_ = fileSize; + return *this; + } + + HiveConnectorSplitBuilder& fileModifiedTime(int64_t fileModifiedTime) { + fileModifiedTime_ = fileModifiedTime; + return *this; + } + HiveConnectorSplitBuilder& partitionKey( std::string name, std::optional value) { @@ -214,6 +231,24 @@ class HiveConnectorSplitBuilder { return *this; } + HiveConnectorSplitBuilder& customSplitInfo( + const std::unordered_map& customSplitInfo) { + customSplitInfo_ = customSplitInfo; + return *this; + } + + HiveConnectorSplitBuilder& extraFileInfo( + const std::shared_ptr& extraFileInfo) { + extraFileInfo_ = extraFileInfo; + return *this; + } + + HiveConnectorSplitBuilder& serdeParameters( + const std::unordered_map& serdeParameters) { + serdeParameters_ = serdeParameters; + return *this; + } + HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { connectorId_ = connectorId; return *this; @@ -227,7 +262,12 @@ class HiveConnectorSplitBuilder { start_, length_, partitionKeys_, - tableBucketNumber_); + tableBucketNumber_, + customSplitInfo_, + extraFileInfo_, + serdeParameters_, + fileSize_, + fileModifiedTime_); } private: @@ -235,8 +275,13 @@ class HiveConnectorSplitBuilder { dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; uint64_t start_{0}; uint64_t length_{std::numeric_limits::max()}; + int64_t fileSize_; + int64_t fileModifiedTime_; std::unordered_map> partitionKeys_; std::optional tableBucketNumber_; + std::unordered_map customSplitInfo_ = {}; + std::shared_ptr extraFileInfo_ = {}; + std::unordered_map serdeParameters_ = {}; std::string connectorId_ = kHiveConnectorId; }; diff --git a/velox/exec/tests/utils/TempFilePath.h b/velox/exec/tests/utils/TempFilePath.h index cf615b4131383..ce99bfc084af5 100644 --- a/velox/exec/tests/utils/TempFilePath.h +++ b/velox/exec/tests/utils/TempFilePath.h @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -47,6 +48,18 @@ class TempFilePath { file.close(); } + int64_t fileSize() { + struct stat st; + stat(path.data(), &st); + return st.st_size; + } + + int64_t fileModifiedTime() { + struct stat st; + stat(path.data(), &st); + return st.st_mtime; + } + private: int fd;