From 1ff68168efa8013818077a755bcea998b972f704 Mon Sep 17 00:00:00 2001 From: Ke Date: Mon, 10 Jun 2024 17:07:20 -0700 Subject: [PATCH] Add data verification support for bucketed table in writer fuzzer (#10116) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/10116 Reviewed By: xiaoxmeng Differential Revision: D58338387 Pulled By: kewang1024 fbshipit-source-id: 704703e4ffb113ec22c3427abac49b8ff4d2a3bb --- velox/exec/fuzzer/CMakeLists.txt | 1 + velox/exec/fuzzer/FuzzerUtil.cpp | 12 ++++- velox/exec/fuzzer/WriterFuzzer.cpp | 71 ++++++++++++++++++++++++------ 3 files changed, 69 insertions(+), 15 deletions(-) diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index d7025a9a8284..d481f1076e82 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries( velox_core velox_exec_test_lib cpr::cpr + Boost::regex velox_type_parser Folly::folly velox_hive_connector diff --git a/velox/exec/fuzzer/FuzzerUtil.cpp b/velox/exec/fuzzer/FuzzerUtil.cpp index 8a2c5bfeb4c3..a3efb12c5d9d 100644 --- a/velox/exec/fuzzer/FuzzerUtil.cpp +++ b/velox/exec/fuzzer/FuzzerUtil.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/exec/fuzzer/FuzzerUtil.h" +#include #include #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" @@ -37,6 +38,13 @@ std::pair extractPartition( return std::pair(partitionColumn, partitionValue); } +std::optional getBucketNum(const std::string& fileName) { + if (RE2::FullMatch(fileName, "0[0-9]+_0_TaskCursorQuery_[0-9]+")) { + return std::optional(stoi(fileName.substr(0, fileName.find("+")))); + } + return std::nullopt; +} + void writeToFile( const std::string& path, const VectorPtr& vector, @@ -77,8 +85,10 @@ void makeSplitsWithSchema( makeSplitsWithSchema(directoryName, partitionKeys, splits); partitionKeys.erase(partition.first); } else { + const auto bucketNum = + getBucketNum(entry.path().string().substr(directory.size() + 1)); splits.emplace_back( - makeSplit(entry.path().string(), partitionKeys, std::nullopt)); + makeSplit(entry.path().string(), partitionKeys, bucketNum)); } } } diff --git a/velox/exec/fuzzer/WriterFuzzer.cpp b/velox/exec/fuzzer/WriterFuzzer.cpp index 4c29c52b226d..46f95e1f52b8 100644 --- a/velox/exec/fuzzer/WriterFuzzer.cpp +++ b/velox/exec/fuzzer/WriterFuzzer.cpp @@ -120,12 +120,13 @@ class WriterFuzzer { const std::vector& partitionKeys, const std::string& outputDirectoryPath); - // Generate table column handles based on table column properties + // Generates table column handles based on table column properties std::unordered_map> getTableColumnHandles( const std::vector& names, const std::vector& types, - int32_t partitionOffset); + int32_t partitionOffset, + const int32_t bucketCount); // Executes velox query plan and returns the result. RowVectorPtr execute( @@ -149,6 +150,18 @@ class WriterFuzzer { std::map getPartitionNameAndFilecount( const std::string& tableDirectoryPath); + // Generates output data type based on table column properties. + RowTypePtr generateOutputType( + const std::vector& names, + const std::vector& types, + const int32_t partitionCount, + const int32_t bucketCount); + + // Check the table properties and see if the table is bucketed. + bool isBucketed(const int32_t partitionCount, const int32_t bucketCount) { + return partitionCount > 0 && bucketCount > 0; + } + const std::vector kRegularColumnTypes_{ BOOLEAN(), TINYINT(), @@ -390,20 +403,21 @@ void WriterFuzzer::verifyWriter( // 3. Verifies data itself. auto splits = makeSplits(outputDirectoryPath); - const auto columnHandles = - getTableColumnHandles(names, types, partitionOffset); + auto columnHandles = + getTableColumnHandles(names, types, partitionOffset, bucketCount); + const auto rowType = + generateOutputType(names, types, partitionKeys.size(), bucketCount); + auto readPlan = PlanBuilder() - .tableScan( - asRowType(input[0]->type()), - {}, - "", - asRowType(input[0]->type()), - columnHandles) - .project(names) + .tableScan(rowType, {}, "", rowType, columnHandles) .planNode(); auto actual = execute(readPlan, maxDrivers, splits); - auto reference_data = - referenceQueryRunner_->execute("SELECT * FROM tmp_write"); + std::string bucketSql = ""; + if (isBucketed(partitionKeys.size(), bucketCount)) { + bucketSql = ", \"$bucket\""; + } + auto reference_data = referenceQueryRunner_->execute( + "SELECT *" + bucketSql + " FROM tmp_write"); VELOX_CHECK( assertEqualResults(reference_data, {actual}), "Velox and reference DB results don't match"); @@ -415,7 +429,8 @@ std::unordered_map> WriterFuzzer::getTableColumnHandles( const std::vector& names, const std::vector& types, - const int32_t partitionOffset) { + const int32_t partitionOffset, + const int32_t bucketCount) { std::unordered_map> columnHandle; for (int i = 0; i < names.size(); ++i) { @@ -430,6 +445,16 @@ WriterFuzzer::getTableColumnHandles( std::make_shared( names.at(i), columnType, types.at(i), types.at(i))}); } + // If table is bucketed, add synthesized $bucket column. + if (isBucketed(names.size() - partitionOffset, bucketCount)) { + columnHandle.insert( + {"$bucket", + std::make_shared( + "$bucket", + HiveColumnHandle::ColumnType::kSynthesized, + INTEGER(), + INTEGER())}); + } return columnHandle; } @@ -553,5 +578,23 @@ std::map WriterFuzzer::getPartitionNameAndFilecount( return partitionNameAndFileCount; } +RowTypePtr WriterFuzzer::generateOutputType( + const std::vector& names, + const std::vector& types, + const int32_t partitionCount, + const int32_t bucketCount) { + std::vector outputNames{names}; + std::vector outputTypes; + for (auto type : types) { + outputTypes.emplace_back(type); + } + if (isBucketed(partitionCount, bucketCount)) { + outputNames.emplace_back("$bucket"); + outputTypes.emplace_back(INTEGER()); + } + + return {ROW(std::move(outputNames), std::move(outputTypes))}; +} + } // namespace } // namespace facebook::velox::exec::test