From 43fce917ac9d5043c547ffdfe99e0c4adcc0543a Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Mon, 19 Feb 2024 14:26:05 -0800 Subject: [PATCH] Allow queries for '$file_size' and '$file_modified_time' for HiveSplits --- velox/connectors/hive/HiveConnectorSplit.h | 10 ++- velox/connectors/hive/HiveConnectorUtil.cpp | 28 ++++++-- velox/connectors/hive/HiveConnectorUtil.h | 7 +- velox/connectors/hive/HiveDataSource.cpp | 7 +- velox/connectors/hive/HiveDataSource.h | 4 ++ velox/connectors/hive/SplitReader.cpp | 71 +++++++++++-------- .../connectors/hive/iceberg/IcebergSplit.cpp | 18 +++-- velox/connectors/hive/iceberg/IcebergSplit.h | 6 +- .../hive/tests/HiveConnectorTest.cpp | 17 ++++- velox/exec/tests/TableScanTest.cpp | 70 ++++++++++++++++++ .../tests/utils/HiveConnectorTestBase.cpp | 22 +++++- .../exec/tests/utils/HiveConnectorTestBase.h | 39 +++++++++- velox/exec/tests/utils/TempFilePath.h | 13 ++++ 13 files changed, 260 insertions(+), 52 deletions(-) diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 10fa9206ec2dd..48f39f64bec29 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -39,6 +39,10 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::shared_ptr extraFileInfo; std::unordered_map serdeParameters; + /// These represent columns like $file_size, $file_modified_time that are + /// associated with the HiveSplit. + std::unordered_map infoColumns; + HiveConnectorSplit( const std::string& connectorId, const std::string& _filePath, @@ -51,7 +55,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, const std::unordered_map& _serdeParameters = {}, - int64_t _splitWeight = 0) + int64_t _splitWeight = 0, + const std::unordered_map& _infoColumns = {}) : ConnectorSplit(connectorId, _splitWeight), filePath(_filePath), fileFormat(_fileFormat), @@ -61,7 +66,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { tableBucketNumber(_tableBucketNumber), customSplitInfo(_customSplitInfo), extraFileInfo(_extraFileInfo), - serdeParameters(_serdeParameters) {} + serdeParameters(_serdeParameters), + infoColumns(_infoColumns) {} std::string toString() const override { if (tableBucketNumber.has_value()) { diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 95b47de314a0b..1870bf112cb39 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -237,6 +237,13 @@ inline uint8_t parseDelimiter(const std::string& delim) { return stoi(delim); } +inline bool isSynthesizedColumn( + const std::string& name, + const std::unordered_map>& + infoColumns) { + return name == kPath || name == kBucket || infoColumns.count(name) != 0; +} + } // namespace const std::string& getColumnName(const common::Subfield& subfield) { @@ -271,9 +278,13 @@ void checkColumnNameLowerCase(const std::shared_ptr& type) { } } -void checkColumnNameLowerCase(const SubfieldFilters& filters) { +void checkColumnNameLowerCase( + const SubfieldFilters& filters, + const std::unordered_map>& + infoColumns) { for (auto& pair : filters) { - if (auto name = pair.first.toString(); name == kPath || name == kBucket) { + if (auto name = pair.first.toString(); + isSynthesizedColumn(name, infoColumns)) { continue; } auto& path = pair.first.path(); @@ -308,6 +319,8 @@ std::shared_ptr makeScanSpec( const RowTypePtr& dataColumns, const std::unordered_map>& partitionKeys, + const std::unordered_map>& + infoColumns, memory::MemoryPool* pool) { auto spec = std::make_shared("root"); folly::F14FastMap> @@ -315,7 +328,8 @@ std::shared_ptr makeScanSpec( std::vector subfieldSpecs; for (auto& [subfield, _] : filters) { if (auto name = subfield.toString(); - name != kPath && name != kBucket && partitionKeys.count(name) == 0) { + !isSynthesizedColumn(name, infoColumns) && + partitionKeys.count(name) == 0) { filterSubfields[getColumnName(subfield)].push_back(&subfield); } } @@ -362,11 +376,13 @@ std::shared_ptr makeScanSpec( // SelectiveColumnReader doesn't support constant columns with filters, // hence, we can't have a filter for a $path or $bucket column. // - // Unfortunately, Presto happens to specify a filter for $path or - // $bucket column. This filter is redundant and needs to be removed. + // Unfortunately, Presto happens to specify a filter for $path, $file_size, + // $file_modified_time or $bucket column. This filter is redundant and needs + // to be removed. // TODO Remove this check when Presto is fixed to not specify a filter // on $path and $bucket column. - if (auto name = pair.first.toString(); name == kPath || name == kBucket) { + if (auto name = pair.first.toString(); + isSynthesizedColumn(name, infoColumns)) { continue; } auto fieldSpec = spec->getOrCreateChild(pair.first); diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 329295b133d4e..ed1f86f708ce2 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -40,7 +40,10 @@ const std::string& getColumnName(const common::Subfield& subfield); void checkColumnNameLowerCase(const std::shared_ptr& type); -void checkColumnNameLowerCase(const SubfieldFilters& filters); +void checkColumnNameLowerCase( + const SubfieldFilters& filters, + const std::unordered_map>& + infoColumns); void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr); @@ -52,6 +55,8 @@ std::shared_ptr makeScanSpec( const RowTypePtr& dataColumns, const std::unordered_map>& partitionKeys, + const std::unordered_map>& + infoColumns, memory::MemoryPool* pool); void configureReaderOptions( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 6ff99105364c5..417a94731691c 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -119,6 +119,10 @@ HiveDataSource::HiveDataSource( if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) { partitionKeys_.emplace(handle->name(), handle); } + + if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) { + infoColumns_.emplace(handle->name(), handle); + } } std::vector readerRowNames; @@ -150,7 +154,7 @@ HiveDataSource::HiveDataSource( if (hiveConfig_->isFileColumnNamesReadAsLowerCase( connectorQueryCtx->sessionProperties())) { checkColumnNameLowerCase(outputType_); - checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters()); + checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters(), infoColumns_); checkColumnNameLowerCase(hiveTableHandle_->remainingFilter()); } @@ -206,6 +210,7 @@ HiveDataSource::HiveDataSource( filters, hiveTableHandle_->dataColumns(), partitionKeys_, + infoColumns_, pool_); if (remainingFilter) { metadataFilter_ = std::make_shared( diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index b2b4cac7aec97..684acf5a4b2b3 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -126,6 +126,10 @@ class HiveDataSource : public DataSource { // The row type for the data source output, not including filter-only columns const RowTypePtr outputType_; + + // Column handles for the Split info columns keyed on their column names. + std::unordered_map> + infoColumns_; std::shared_ptr metadataFilter_; std::unique_ptr remainingFilterExprSet_; RowVectorPtr emptyOutput_; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 6395d5d5f8bbd..2db929b069f7b 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -173,6 +173,33 @@ void SplitReader::prepareSplit( baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } +namespace { + +template +velox::variant convertFromString( + const std::optional& value, + const TypePtr& toType) { + if (value.has_value()) { + if constexpr (ToKind == TypeKind::VARCHAR) { + return velox::variant(value.value()); + } + if constexpr (ToKind == TypeKind::VARBINARY) { + return velox::variant::binary((value.value())); + } + if (toType->isDate()) { + return velox::variant(util::castFromDateString( + StringView(value.value()), true /*isIso8601*/)); + } + auto result = velox::util::Converter::cast(value.value()); + if constexpr (ToKind == TypeKind::TIMESTAMP) { + result.toGMT(Timestamp::defaultTimezone()); + } + return velox::variant(result); + } + return velox::variant(ToKind); +} +} // namespace + std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema) { @@ -184,9 +211,9 @@ std::vector SplitReader::adaptColumns( auto* childSpec = childrenSpecs[i].get(); const std::string& fieldName = childSpec->fieldName(); - auto iter = hiveSplit_->partitionKeys.find(fieldName); - if (iter != hiveSplit_->partitionKeys.end()) { - setPartitionValue(childSpec, fieldName, iter->second); + if (auto it = hiveSplit_->partitionKeys.find(fieldName); + it != hiveSplit_->partitionKeys.end()) { + setPartitionValue(childSpec, fieldName, it->second); } else if (fieldName == kPath) { setConstantValue( childSpec, VARCHAR(), velox::variant(hiveSplit_->filePath)); @@ -197,6 +224,16 @@ std::vector SplitReader::adaptColumns( INTEGER(), velox::variant(hiveSplit_->tableBucketNumber.value())); } + } else if (auto iter = hiveSplit_->infoColumns.find(fieldName); + iter != hiveSplit_->infoColumns.end()) { + auto infoColumnType = + readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName)); + auto value = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + convertFromString, + infoColumnType->kind(), + iter->second, + infoColumnType); + setConstantValue(childSpec, infoColumnType, value); } else { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); if (!fileTypeIdx.has_value()) { @@ -278,34 +315,6 @@ void SplitReader::setNullConstantValue( type, 1, connectorQueryCtx_->memoryPool())); } -namespace { - -template -velox::variant convertFromString( - const std::optional& value, - const TypePtr& toType) { - if (value.has_value()) { - if constexpr (ToKind == TypeKind::VARCHAR) { - return velox::variant(value.value()); - } - if constexpr (ToKind == TypeKind::VARBINARY) { - return velox::variant::binary((value.value())); - } - if (toType->isDate()) { - return velox::variant(util::castFromDateString( - StringView(value.value()), true /*isIso8601*/)); - } - auto result = velox::util::Converter::cast(value.value()); - if constexpr (ToKind == TypeKind::TIMESTAMP) { - result.toGMT(Timestamp::defaultTimezone()); - } - return velox::variant(result); - } - return velox::variant(ToKind); -} - -} // namespace - void SplitReader::setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp index 7fa9a52f2c691..747d70869f53b 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -30,7 +30,8 @@ HiveIcebergSplit::HiveIcebergSplit( _partitionKeys, std::optional _tableBucketNumber, const std::unordered_map& _customSplitInfo, - const std::shared_ptr& _extraFileInfo) + const std::shared_ptr& _extraFileInfo, + const std::unordered_map& _infoColumns) : HiveConnectorSplit( _connectorId, _filePath, @@ -38,7 +39,12 @@ HiveIcebergSplit::HiveIcebergSplit( _start, _length, _partitionKeys, - _tableBucketNumber) { + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo, + {}, + 0, + _infoColumns) { // TODO: Deserialize _extraFileInfo to get deleteFiles; } @@ -54,7 +60,8 @@ HiveIcebergSplit::HiveIcebergSplit( std::optional _tableBucketNumber, const std::unordered_map& _customSplitInfo, const std::shared_ptr& _extraFileInfo, - std::vector _deletes) + std::vector _deletes, + const std::unordered_map& _infoColumns) : HiveConnectorSplit( _connectorId, _filePath, @@ -64,6 +71,9 @@ HiveIcebergSplit::HiveIcebergSplit( _partitionKeys, _tableBucketNumber, _customSplitInfo, - _extraFileInfo), + _extraFileInfo, + {}, + 0, + _infoColumns), deleteFiles(_deletes) {} } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h index 37b8c3c3eb364..bc6183300124d 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.h +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -36,7 +36,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { _partitionKeys = {}, std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, - const std::shared_ptr& _extraFileInfo = {}); + const std::shared_ptr& _extraFileInfo = {}, + const std::unordered_map& _infoColumns = {}); // For tests only HiveIcebergSplit( @@ -50,7 +51,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, - std::vector deletes = {}); + std::vector deletes = {}, + const std::unordered_map& _infoColumns = {}); }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index 7f5a6043f52eb..4723bb0946e54 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -87,7 +87,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) { auto rowType = ROW({{"c0", columnType}}); auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"}); auto scanSpec = makeScanSpec( - rowType, groupSubfields(subfields), {}, nullptr, {}, pool_.get()); + rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); validateNullConstant(*c0c0, *BIGINT()); auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1"); @@ -122,6 +122,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) { {}, nullptr, {}, + {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant()); @@ -144,6 +145,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->maxArrayElementsCount(), 2); @@ -160,7 +162,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) { auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"}); auto groupedSubfields = groupSubfields(subfields); VELOX_ASSERT_USER_THROW( - makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, pool_.get()), + makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()), "Non-positive array subscript cannot be push down"); } @@ -175,6 +177,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); auto* keysFilter = c0->childByName(ScanSpec::kMapKeysFieldName)->filter(); @@ -200,6 +203,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter()); @@ -218,6 +222,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter()); @@ -240,6 +245,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); auto* keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -267,6 +273,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -285,6 +292,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -300,6 +308,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -335,6 +344,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) { filters, ROW({{"c0", c0Type}, {"c1", c1Type}}), {}, + {}, pool_.get()); auto c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->isConstant()); @@ -379,6 +389,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -392,7 +403,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) { SubfieldFilters filters; filters.emplace(Subfield("ds"), exec::equal("2023-10-13")); auto scanSpec = makeScanSpec( - rowType, {}, filters, rowType, {{"ds", nullptr}}, pool_.get()); + rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get()); ASSERT_TRUE(scanSpec->childByName("c0")->projectOut()); ASSERT_FALSE(scanSpec->childByName("ds")->projectOut()); } diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 691ba8edc45ca..73333de40e8cc 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2443,6 +2443,76 @@ TEST_F(TableScanTest, path) { op, {filePath}, fmt::format("SELECT '{}', * FROM tmp", pathValue)); } +TEST_F(TableScanTest, fileSizeAndModifiedTime) { + auto rowType = ROW({"a"}, {BIGINT()}); + auto filePath = makeFilePaths(1)[0]; + auto vector = makeVectors(1, 10, rowType)[0]; + writeToFile(filePath->path, vector); + createDuckDbTable({vector}); + + static const char* kSize = "$file_size"; + static const char* kModifiedTime = "$file_modified_time"; + + auto allColumns = + ROW({"a", kSize, kModifiedTime}, {BIGINT(), BIGINT(), BIGINT()}); + + auto assignments = allRegularColumns(rowType); + assignments[kSize] = synthesizedColumn(kSize, BIGINT()); + assignments[kModifiedTime] = synthesizedColumn(kModifiedTime, BIGINT()); + + auto fileSizeValue = fmt::format("{}", filePath->fileSize()); + auto fileTimeValue = fmt::format("{}", filePath->fileModifiedTime()); + + // Select and project both '$file_size', '$file_modified_time'. + auto op = PlanBuilder() + .startTableScan() + .outputType(allColumns) + .dataColumns(allColumns) + .assignments(assignments) + .endTableScan() + .planNode(); + assertQuery( + op, + {filePath}, + fmt::format("SELECT *, {}, {} FROM tmp", fileSizeValue, fileTimeValue)); + + auto filterTest = [&](const std::string& filter) { + auto tableHandle = makeTableHandle( + SubfieldFilters{}, + parseExpr(filter, allColumns), + "hive_table", + allColumns); + + // Use synthesized column in a filter but don't project it. + op = PlanBuilder() + .startTableScan() + .outputType(rowType) + .dataColumns(allColumns) + .tableHandle(tableHandle) + .assignments(assignments) + .endTableScan() + .planNode(); + assertQuery(op, {filePath}, "SELECT * FROM tmp"); + + // Use synthesized column in a filter and project it out. + op = PlanBuilder() + .startTableScan() + .outputType(allColumns) + .dataColumns(allColumns) + .tableHandle(tableHandle) + .assignments(assignments) + .endTableScan() + .planNode(); + assertQuery( + op, + {filePath}, + fmt::format("SELECT *, {}, {} FROM tmp", fileSizeValue, fileTimeValue)); + }; + + filterTest(fmt::format("\"{}\" = {}", kSize, fileSizeValue)); + filterTest(fmt::format("\"{}\" = {}", kModifiedTime, fileTimeValue)); +} + TEST_F(TableScanTest, bucket) { vector_size_t size = 1'000; int numBatches = 5; diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index ece0d3545bdfb..c3c6ccb2a1668 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -171,7 +171,12 @@ HiveConnectorTestBase::makeHiveConnectorSplits( const std::vector>& filePaths) { std::vector> splits; for (auto filePath : filePaths) { - splits.push_back(makeHiveConnectorSplit(filePath->path)); + splits.push_back(makeHiveConnectorSplit( + filePath->path, + filePath->fileSize(), + filePath->fileModifiedTime(), + 0, + std::numeric_limits::max())); } return splits; } @@ -189,6 +194,21 @@ HiveConnectorTestBase::makeHiveConnectorSplit( .build(); } +std::shared_ptr +HiveConnectorTestBase::makeHiveConnectorSplit( + const std::string& filePath, + int64_t fileSize, + int64_t fileModifiedTime, + uint64_t start, + uint64_t length) { + return HiveConnectorSplitBuilder(filePath) + .infoColumn("$file_size", fmt::format("{}", fileSize)) + .infoColumn("$file_modified_time", fmt::format("{}", fileModifiedTime)) + .start(start) + .length(length) + .build(); +} + // static std::shared_ptr HiveConnectorTestBase::makeHiveInsertTableHandle( diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 8f7a03fe6cff7..34019c5d65b21 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -74,6 +74,13 @@ class HiveConnectorTestBase : public OperatorTestBase { uint64_t length = std::numeric_limits::max(), int64_t splitWeight = 0); + static std::shared_ptr makeHiveConnectorSplit( + const std::string& filePath, + int64_t fileSize, + int64_t fileModifiedTime, + uint64_t start, + uint64_t length); + /// Split file at path 'filePath' into 'splitCount' splits. If not local file, /// file size can be given as 'externalSize'. static std::vector> @@ -208,6 +215,13 @@ class HiveConnectorSplitBuilder { return *this; } + HiveConnectorSplitBuilder& infoColumn( + const std::string& name, + const std::string& value) { + infoColumns_.emplace(std::move(name), std::move(value)); + return *this; + } + HiveConnectorSplitBuilder& partitionKey( std::string name, std::optional value) { @@ -220,6 +234,24 @@ class HiveConnectorSplitBuilder { return *this; } + HiveConnectorSplitBuilder& customSplitInfo( + const std::unordered_map& customSplitInfo) { + customSplitInfo_ = customSplitInfo; + return *this; + } + + HiveConnectorSplitBuilder& extraFileInfo( + const std::shared_ptr& extraFileInfo) { + extraFileInfo_ = extraFileInfo; + return *this; + } + + HiveConnectorSplitBuilder& serdeParameters( + const std::unordered_map& serdeParameters) { + serdeParameters_ = serdeParameters; + return *this; + } + HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { connectorId_ = connectorId; return *this; @@ -240,7 +272,8 @@ class HiveConnectorSplitBuilder { customSplitInfo, extraFileInfo, serdeParameters, - splitWeight_); + splitWeight_, + infoColumns_); } private: @@ -250,6 +283,10 @@ class HiveConnectorSplitBuilder { uint64_t length_{std::numeric_limits::max()}; std::unordered_map> partitionKeys_; std::optional tableBucketNumber_; + std::unordered_map customSplitInfo_ = {}; + std::shared_ptr extraFileInfo_ = {}; + std::unordered_map serdeParameters_ = {}; + std::unordered_map infoColumns_ = {}; std::string connectorId_ = kHiveConnectorId; int64_t splitWeight_{0}; }; diff --git a/velox/exec/tests/utils/TempFilePath.h b/velox/exec/tests/utils/TempFilePath.h index cf615b4131383..d993795f1e3a0 100644 --- a/velox/exec/tests/utils/TempFilePath.h +++ b/velox/exec/tests/utils/TempFilePath.h @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include #include @@ -47,6 +48,18 @@ class TempFilePath { file.close(); } + const int64_t fileSize() { + struct stat st; + stat(path.data(), &st); + return st.st_size; + } + + const int64_t fileModifiedTime() { + struct stat st; + stat(path.data(), &st); + return st.st_mtime; + } + private: int fd;