Skip to content

Commit

Permalink
Add data verification support for bucketed table in writer fuzzer (#1…
Browse files Browse the repository at this point in the history
…0116)

Summary: Pull Request resolved: #10116

Reviewed By: xiaoxmeng

Differential Revision: D58338387

Pulled By: kewang1024

fbshipit-source-id: 704703e4ffb113ec22c3427abac49b8ff4d2a3bb
  • Loading branch information
kewang1024 authored and facebook-github-bot committed Jun 11, 2024
1 parent c622f91 commit 1ff6816
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 15 deletions.
1 change: 1 addition & 0 deletions velox/exec/fuzzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ target_link_libraries(
velox_core
velox_exec_test_lib
cpr::cpr
Boost::regex
velox_type_parser
Folly::folly
velox_hive_connector
Expand Down
12 changes: 11 additions & 1 deletion velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include <re2/re2.h>
#include <filesystem>
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
Expand All @@ -37,6 +38,13 @@ std::pair<std::string, std::string> extractPartition(
return std::pair(partitionColumn, partitionValue);
}

std::optional<int32_t> getBucketNum(const std::string& fileName) {
if (RE2::FullMatch(fileName, "0[0-9]+_0_TaskCursorQuery_[0-9]+")) {
return std::optional(stoi(fileName.substr(0, fileName.find("+"))));
}
return std::nullopt;
}

void writeToFile(
const std::string& path,
const VectorPtr& vector,
Expand Down Expand Up @@ -77,8 +85,10 @@ void makeSplitsWithSchema(
makeSplitsWithSchema(directoryName, partitionKeys, splits);
partitionKeys.erase(partition.first);
} else {
const auto bucketNum =
getBucketNum(entry.path().string().substr(directory.size() + 1));
splits.emplace_back(
makeSplit(entry.path().string(), partitionKeys, std::nullopt));
makeSplit(entry.path().string(), partitionKeys, bucketNum));
}
}
}
Expand Down
71 changes: 57 additions & 14 deletions velox/exec/fuzzer/WriterFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ class WriterFuzzer {
const std::vector<std::string>& partitionKeys,
const std::string& outputDirectoryPath);

// Generate table column handles based on table column properties
// Generates table column handles based on table column properties
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
getTableColumnHandles(
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
int32_t partitionOffset);
int32_t partitionOffset,
const int32_t bucketCount);

// Executes velox query plan and returns the result.
RowVectorPtr execute(
Expand All @@ -149,6 +150,18 @@ class WriterFuzzer {
std::map<std::string, int32_t> getPartitionNameAndFilecount(
const std::string& tableDirectoryPath);

// Generates output data type based on table column properties.
RowTypePtr generateOutputType(
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
const int32_t partitionCount,
const int32_t bucketCount);

// Check the table properties and see if the table is bucketed.
bool isBucketed(const int32_t partitionCount, const int32_t bucketCount) {
return partitionCount > 0 && bucketCount > 0;
}

const std::vector<TypePtr> kRegularColumnTypes_{
BOOLEAN(),
TINYINT(),
Expand Down Expand Up @@ -390,20 +403,21 @@ void WriterFuzzer::verifyWriter(

// 3. Verifies data itself.
auto splits = makeSplits(outputDirectoryPath);
const auto columnHandles =
getTableColumnHandles(names, types, partitionOffset);
auto columnHandles =
getTableColumnHandles(names, types, partitionOffset, bucketCount);
const auto rowType =
generateOutputType(names, types, partitionKeys.size(), bucketCount);

auto readPlan = PlanBuilder()
.tableScan(
asRowType(input[0]->type()),
{},
"",
asRowType(input[0]->type()),
columnHandles)
.project(names)
.tableScan(rowType, {}, "", rowType, columnHandles)
.planNode();
auto actual = execute(readPlan, maxDrivers, splits);
auto reference_data =
referenceQueryRunner_->execute("SELECT * FROM tmp_write");
std::string bucketSql = "";
if (isBucketed(partitionKeys.size(), bucketCount)) {
bucketSql = ", \"$bucket\"";
}
auto reference_data = referenceQueryRunner_->execute(
"SELECT *" + bucketSql + " FROM tmp_write");
VELOX_CHECK(
assertEqualResults(reference_data, {actual}),
"Velox and reference DB results don't match");
Expand All @@ -415,7 +429,8 @@ std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
WriterFuzzer::getTableColumnHandles(
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
const int32_t partitionOffset) {
const int32_t partitionOffset,
const int32_t bucketCount) {
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
columnHandle;
for (int i = 0; i < names.size(); ++i) {
Expand All @@ -430,6 +445,16 @@ WriterFuzzer::getTableColumnHandles(
std::make_shared<HiveColumnHandle>(
names.at(i), columnType, types.at(i), types.at(i))});
}
// If table is bucketed, add synthesized $bucket column.
if (isBucketed(names.size() - partitionOffset, bucketCount)) {
columnHandle.insert(
{"$bucket",
std::make_shared<HiveColumnHandle>(
"$bucket",
HiveColumnHandle::ColumnType::kSynthesized,
INTEGER(),
INTEGER())});
}
return columnHandle;
}

Expand Down Expand Up @@ -553,5 +578,23 @@ std::map<std::string, int32_t> WriterFuzzer::getPartitionNameAndFilecount(
return partitionNameAndFileCount;
}

RowTypePtr WriterFuzzer::generateOutputType(
const std::vector<std::string>& names,
const std::vector<TypePtr>& types,
const int32_t partitionCount,
const int32_t bucketCount) {
std::vector<std::string> outputNames{names};
std::vector<TypePtr> outputTypes;
for (auto type : types) {
outputTypes.emplace_back(type);
}
if (isBucketed(partitionCount, bucketCount)) {
outputNames.emplace_back("$bucket");
outputTypes.emplace_back(INTEGER());
}

return {ROW(std::move(outputNames), std::move(outputTypes))};
}

} // namespace
} // namespace facebook::velox::exec::test

0 comments on commit 1ff6816

Please sign in to comment.