Skip to content

Commit

Permalink
add row number plan for memory arbitration fuzzer
Browse files Browse the repository at this point in the history
  • Loading branch information
yanngyoung committed Jun 14, 2024
1 parent 8c9d2d5 commit 5be8465
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class MemoryArbitrationFuzzer {
// Returns a list of randomly generated key types for join and aggregation.
std::vector<TypePtr> generateKeyTypes(int32_t numKeys);

std::pair<std::vector<std::string>, std::vector<TypePtr>>
generatePartitionKeys();

// Returns randomly generated probe input with up to 3 additional payload
// columns.
std::vector<RowVectorPtr> generateProbeInput(
Expand All @@ -132,6 +135,12 @@ class MemoryArbitrationFuzzer {
const std::vector<std::string>& keyNames,
const std::vector<TypePtr>& keyTypes);

// Reuses the 'generateProbeInput' method to return randomly generated
// row number input.
std::vector<RowVectorPtr> generateRowNumberInput(
const std::vector<std::string>& keyNames,
const std::vector<TypePtr>& keyTypes);

// Same as generateProbeInput() but copies over 10% of the input in the probe
// columns to ensure some matches during joining. Also generates an empty
// input with a 10% chance.
Expand All @@ -153,6 +162,8 @@ class MemoryArbitrationFuzzer {

std::vector<PlanWithSplits> aggregatePlans(const std::string& tableDir);

std::vector<PlanWithSplits> rowNumberPlans(const std::string& tableDir);

void verify();

static VectorFuzzer::Options getFuzzerOptions() {
Expand All @@ -171,6 +182,7 @@ class MemoryArbitrationFuzzer {
{core::QueryConfig::kJoinSpillEnabled, "true"},
{core::QueryConfig::kSpillStartPartitionBit, "29"},
{core::QueryConfig::kAggregationSpillEnabled, "true"},
{core::QueryConfig::kRowNumberSpillEnabled, "true"},
};

std::shared_ptr<memory::MemoryPool> rootPool_{
Expand Down Expand Up @@ -228,6 +240,18 @@ std::vector<TypePtr> MemoryArbitrationFuzzer::generateKeyTypes(
return types;
}

std::pair<std::vector<std::string>, std::vector<TypePtr>>
MemoryArbitrationFuzzer::generatePartitionKeys() {
const auto numKeys = randInt(1, 3);
std::vector<std::string> names;
std::vector<TypePtr> types;
for (auto i = 0; i < numKeys; ++i) {
names.push_back(fmt::format("c{}", i));
types.push_back(vectorFuzzer_.randType(/*maxDepth=*/1));
}
return std::make_pair(names, types);
}

std::vector<RowVectorPtr> MemoryArbitrationFuzzer::generateProbeInput(
const std::vector<std::string>& keyNames,
const std::vector<TypePtr>& keyTypes) {
Expand Down Expand Up @@ -326,6 +350,12 @@ std::vector<RowVectorPtr> MemoryArbitrationFuzzer::generateAggregateInput(
return generateProbeInput(keyNames, keyTypes);
}

std::vector<RowVectorPtr> MemoryArbitrationFuzzer::generateRowNumberInput(
const std::vector<std::string>& keyNames,
const std::vector<TypePtr>& keyTypes) {
return generateProbeInput(keyNames, keyTypes);
}

std::vector<MemoryArbitrationFuzzer::PlanWithSplits>
MemoryArbitrationFuzzer::hashJoinPlans(
const core::JoinType& joinType,
Expand Down Expand Up @@ -525,6 +555,46 @@ MemoryArbitrationFuzzer::aggregatePlans(const std::string& tableDir) {
return plans;
}

std::vector<MemoryArbitrationFuzzer::PlanWithSplits>
MemoryArbitrationFuzzer::rowNumberPlans(const std::string& tableDir) {
const auto [keyNames, keyTypes] = generatePartitionKeys();
const auto input = generateRowNumberInput(keyNames, keyTypes);

std::vector<PlanWithSplits> plans;

std::vector<std::string> projectFields = keyNames;
projectFields.emplace_back("row_number");
auto plan = PlanWithSplits{
PlanBuilder()
.values(input)
.rowNumber(keyNames)
.project(projectFields)
.planNode(),
{}};
plans.push_back(std::move(plan));

if (!isTableScanSupported(input[0]->type())) {
return plans;
}

const std::vector<Split> splits =
makeSplits(input, fmt::format("{}/row_number", tableDir), writerPool_);

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId scanId;
plan = PlanWithSplits{
PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(input[0]->type()))
.capturePlanNodeId(scanId)
.rowNumber(keyNames)
.project(projectFields)
.planNode(),
{{scanId, splits}}};
plans.push_back(std::move(plan));

return plans;
}

void MemoryArbitrationFuzzer::verify() {
const auto outputDirectory = TempDirectoryPath::create();
const auto spillDirectory = exec::test::TempDirectoryPath::create();
Expand All @@ -537,6 +607,9 @@ void MemoryArbitrationFuzzer::verify() {
for (const auto& plan : aggregatePlans(tableScanDir->getPath())) {
plans.push_back(plan);
}
for (const auto& plan : rowNumberPlans(tableScanDir->getPath())) {
plans.push_back(plan);
}

SCOPE_EXIT {
waitForAllTasksToBeDeleted();
Expand Down

0 comments on commit 5be8465

Please sign in to comment.