From 67616893558bda3280dd0c57beba8ad6dab236ac Mon Sep 17 00:00:00 2001 From: Yangyang Gao Date: Tue, 28 Nov 2023 20:04:02 +0800 Subject: [PATCH 1/5] add metadatacolumns support for spark hive connector --- velox/connectors/hive/HiveConnectorSplit.h | 7 +++++-- velox/connectors/hive/HiveDataSource.cpp | 8 +++++++- velox/connectors/hive/HiveDataSource.h | 7 +++++++ velox/connectors/hive/SplitReader.cpp | 15 ++++++++++++--- velox/connectors/hive/TableHandle.cpp | 1 + velox/connectors/hive/TableHandle.h | 2 +- .../connectors/hive/tests/HiveConnectorTest.cpp | 17 ++++++++++++++--- 7 files changed, 47 insertions(+), 10 deletions(-) diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 5b46c5329a1e..e48f6f9d8789 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -33,6 +33,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::unordered_map customSplitInfo; std::shared_ptr extraFileInfo; std::unordered_map serdeParameters; + std::unordered_map metadataColumns; HiveConnectorSplit( const std::string& connectorId, @@ -45,7 +46,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, - const std::unordered_map& _serdeParameters = {}) + const std::unordered_map& _serdeParameters = {}, + const std::unordered_map& _metadataColumns = {}) : ConnectorSplit(connectorId), filePath(_filePath), fileFormat(_fileFormat), @@ -55,7 +57,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { tableBucketNumber(_tableBucketNumber), customSplitInfo(_customSplitInfo), extraFileInfo(_extraFileInfo), - serdeParameters(_serdeParameters) {} + serdeParameters(_serdeParameters), + metadataColumns(_metadataColumns) {} std::string toString() const override { if (tableBucketNumber.has_value()) { diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 448f673ac65a..e484465bb524 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -382,6 +382,9 @@ HiveDataSource::HiveDataSource( if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) { partitionKeys_.emplace(handle->name(), handle); } + if (handle->columnType() == HiveColumnHandle::ColumnType::kMetadata) { + metadataColumns_.emplace(handle->name(), handle); + } } std::vector readerRowNames; @@ -468,6 +471,7 @@ HiveDataSource::HiveDataSource( filters, hiveTableHandle_->dataColumns(), partitionKeys_, + metadataColumns_, pool_); if (remainingFilter) { metadataFilter_ = std::make_shared( @@ -724,6 +728,8 @@ std::shared_ptr HiveDataSource::makeScanSpec( const RowTypePtr& dataColumns, const std::unordered_map>& partitionKeys, + const std::unordered_map>& + metadataColumns, memory::MemoryPool* pool) { auto spec = std::make_shared("root"); folly::F14FastMap> @@ -782,7 +788,7 @@ std::shared_ptr HiveDataSource::makeScanSpec( // $bucket column. This filter is redundant and needs to be removed. // TODO Remove this check when Presto is fixed to not specify a filter // on $path and $bucket column. - if (auto name = pair.first.toString(); name == kPath || name == kBucket) { + if (auto name = pair.first.toString(); name == kPath || name == kBucket || metadataColumns.count(name) != 0) { continue; } auto fieldSpec = spec->getOrCreateChild(pair.first); diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 3d3f6d733d04..aafcdc9e88d9 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -82,6 +82,8 @@ class HiveDataSource : public DataSource { const RowTypePtr& dataColumns, const std::unordered_map>& partitionKeys, + const std::unordered_map>& + metadataColumns, memory::MemoryPool* pool); // Internal API, made public to be accessible in unit tests. Do not use in @@ -117,6 +119,11 @@ class HiveDataSource : public DataSource { std::unordered_map> partitionKeys_; + // Column handles for the metadata columns keyed on metadata column + // name. + std::unordered_map> + metadataColumns_; + private: // Evaluates remainingFilter_ on the specified vector. Returns number of rows // passed. Populates filterEvalCtx_.selectedIndices and selectedBits if only diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index a42b64e37141..df7fc0f111fc 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -204,9 +204,18 @@ std::vector SplitReader::adaptColumns( auto* childSpec = childrenSpecs[i].get(); const std::string& fieldName = childSpec->fieldName(); - auto iter = hiveSplit_->partitionKeys.find(fieldName); - if (iter != hiveSplit_->partitionKeys.end()) { - setPartitionValue(childSpec, fieldName, iter->second); + auto partitionKey = hiveSplit_->partitionKeys.find(fieldName); + auto metadataColumn = hiveSplit_->metadataColumns.find(fieldName); + if (partitionKey != hiveSplit_->partitionKeys.end()) { + setPartitionValue(childSpec, fieldName, partitionKey->second); + } else if (metadataColumn != hiveSplit_->metadataColumns.end()) { + auto metadataColumnOutputType = + readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName)); + auto constValue = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + convertFromString, + metadataColumnOutputType->kind(), + std::make_optional(metadataColumn->second)); + setConstantValue(childSpec, metadataColumnOutputType, constValue); } else if (fieldName == kPath) { setConstantValue( childSpec, VARCHAR(), velox::variant(hiveSplit_->filePath)); diff --git a/velox/connectors/hive/TableHandle.cpp b/velox/connectors/hive/TableHandle.cpp index d03b2f57e43d..edf35ff36c1b 100644 --- a/velox/connectors/hive/TableHandle.cpp +++ b/velox/connectors/hive/TableHandle.cpp @@ -25,6 +25,7 @@ columnTypeNames() { {HiveColumnHandle::ColumnType::kPartitionKey, "PartitionKey"}, {HiveColumnHandle::ColumnType::kRegular, "Regular"}, {HiveColumnHandle::ColumnType::kSynthesized, "Synthesized"}, + {HiveColumnHandle::ColumnType::kMetadata, "Metadata"}, }; } diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index ee62a0892d7c..b44baba208b7 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -28,7 +28,7 @@ using SubfieldFilters = class HiveColumnHandle : public ColumnHandle { public: - enum class ColumnType { kPartitionKey, kRegular, kSynthesized }; + enum class ColumnType { kPartitionKey, kRegular, kSynthesized, kMetadata }; /// NOTE: 'dataType' is the column type in target write table. 'hiveType' is /// converted type of the corresponding column in source table which might not diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index 7d97e4469f87..c5a0236de881 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -87,7 +87,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) { auto rowType = ROW({{"c0", columnType}}); auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"}); auto scanSpec = HiveDataSource::makeScanSpec( - rowType, groupSubfields(subfields), {}, nullptr, {}, pool_.get()); + rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); validateNullConstant(*c0c0, *BIGINT()); auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1"); @@ -122,6 +122,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) { {}, nullptr, {}, + {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant()); @@ -144,6 +145,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->maxArrayElementsCount(), 2); @@ -161,7 +163,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) { auto groupedSubfields = groupSubfields(subfields); VELOX_ASSERT_USER_THROW( HiveDataSource::makeScanSpec( - rowType, groupedSubfields, {}, nullptr, {}, pool_.get()), + rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()), "Non-positive array subscript cannot be push down"); } @@ -176,6 +178,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); auto* keysFilter = c0->childByName(ScanSpec::kMapKeysFieldName)->filter(); @@ -201,6 +204,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter()); @@ -219,6 +223,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter()); @@ -241,6 +246,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); auto* keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -268,6 +274,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -286,6 +293,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -301,6 +309,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { {}, nullptr, {}, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -336,6 +345,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) { filters, ROW({{"c0", c0Type}, {"c1", c1Type}}), {}, + {}, pool_.get()); auto c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->isConstant()); @@ -380,6 +390,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) { {}, nullptr, {}, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -393,7 +404,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) { SubfieldFilters filters; filters.emplace(Subfield("ds"), exec::equal("2023-10-13")); auto scanSpec = HiveDataSource::makeScanSpec( - rowType, {}, filters, rowType, {{"ds", nullptr}}, pool_.get()); + rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get()); ASSERT_TRUE(scanSpec->childByName("c0")->projectOut()); ASSERT_FALSE(scanSpec->childByName("ds")->projectOut()); } From 93cce0df0a885af0561f244173b8935ba8fd7eba Mon Sep 17 00:00:00 2001 From: Yangyang Gao Date: Wed, 6 Dec 2023 12:24:03 +0800 Subject: [PATCH 2/5] code format fix --- velox/connectors/hive/HiveDataSource.cpp | 5 +++-- velox/connectors/hive/SplitReader.cpp | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index e484465bb524..1fe93d34b109 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -729,7 +729,7 @@ std::shared_ptr HiveDataSource::makeScanSpec( const std::unordered_map>& partitionKeys, const std::unordered_map>& - metadataColumns, + metadataColumns, memory::MemoryPool* pool) { auto spec = std::make_shared("root"); folly::F14FastMap> @@ -788,7 +788,8 @@ std::shared_ptr HiveDataSource::makeScanSpec( // $bucket column. This filter is redundant and needs to be removed. // TODO Remove this check when Presto is fixed to not specify a filter // on $path and $bucket column. - if (auto name = pair.first.toString(); name == kPath || name == kBucket || metadataColumns.count(name) != 0) { + if (auto name = pair.first.toString(); + name == kPath || name == kBucket || metadataColumns.count(name) != 0) { continue; } auto fieldSpec = spec->getOrCreateChild(pair.first); diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index df7fc0f111fc..4c884470b1e4 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -209,7 +209,7 @@ std::vector SplitReader::adaptColumns( if (partitionKey != hiveSplit_->partitionKeys.end()) { setPartitionValue(childSpec, fieldName, partitionKey->second); } else if (metadataColumn != hiveSplit_->metadataColumns.end()) { - auto metadataColumnOutputType = + auto metadataColumnOutputType = readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName)); auto constValue = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( convertFromString, From df5e279f644ef8f669c90ab0faf51ef5170389be Mon Sep 17 00:00:00 2001 From: gayangya Date: Tue, 30 Jan 2024 21:10:37 +0800 Subject: [PATCH 3/5] address comment --- velox/connectors/hive/HiveDataSource.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 1fe93d34b109..068daaa60b8f 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -788,8 +788,7 @@ std::shared_ptr HiveDataSource::makeScanSpec( // $bucket column. This filter is redundant and needs to be removed. // TODO Remove this check when Presto is fixed to not specify a filter // on $path and $bucket column. - if (auto name = pair.first.toString(); - name == kPath || name == kBucket || metadataColumns.count(name) != 0) { + if (auto name = pair.first.toString(); name == kPath || name == kBucket) { continue; } auto fieldSpec = spec->getOrCreateChild(pair.first); From e4613574660547ca3cbf0d3abf812bd89c6c3cae Mon Sep 17 00:00:00 2001 From: gayangya Date: Tue, 30 Jan 2024 21:51:57 +0800 Subject: [PATCH 4/5] fix build --- velox/connectors/hive/SplitReader.cpp | 44 +++++++++++++-------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index cd52468a923d..efc2152b349d 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -142,6 +142,28 @@ void SplitReader::prepareSplit( baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } +namespace { + +template +velox::variant convertFromString(const std::optional& value) { + if (value.has_value()) { + if constexpr (ToKind == TypeKind::VARCHAR) { + return velox::variant(value.value()); + } + if constexpr (ToKind == TypeKind::VARBINARY) { + return velox::variant::binary((value.value())); + } + auto result = velox::util::Converter::cast(value.value()); + if constexpr (ToKind == TypeKind::TIMESTAMP) { + result.toGMT(Timestamp::defaultTimezone()); + } + return velox::variant(result); + } + return velox::variant(ToKind); +} + +} // namespace + std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema) { @@ -256,28 +278,6 @@ void SplitReader::setNullConstantValue( type, 1, connectorQueryCtx_->memoryPool())); } -namespace { - -template -velox::variant convertFromString(const std::optional& value) { - if (value.has_value()) { - if constexpr (ToKind == TypeKind::VARCHAR) { - return velox::variant(value.value()); - } - if constexpr (ToKind == TypeKind::VARBINARY) { - return velox::variant::binary((value.value())); - } - auto result = velox::util::Converter::cast(value.value()); - if constexpr (ToKind == TypeKind::TIMESTAMP) { - result.toGMT(Timestamp::defaultTimezone()); - } - return velox::variant(result); - } - return velox::variant(ToKind); -} - -} // namespace - void SplitReader::setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, From 84ae9619c206c72892af93e4bcaa158432318f24 Mon Sep 17 00:00:00 2001 From: gayangya Date: Tue, 30 Jan 2024 23:19:09 +0800 Subject: [PATCH 5/5] format fix --- velox/connectors/hive/tests/HiveConnectorTest.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index 487e33152e9a..4723bb0946e5 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -162,8 +162,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) { auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"}); auto groupedSubfields = groupSubfields(subfields); VELOX_ASSERT_USER_THROW( - makeScanSpec( - rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()), + makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()), "Non-positive array subscript cannot be push down"); }