Skip to content

Commit

Permalink
Add hash join replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Nov 1, 2024
1 parent 1e7a82f commit 101c528
Show file tree
Hide file tree
Showing 12 changed files with 665 additions and 2 deletions.
4 changes: 4 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ void Operator::maybeSetTracer() {
return;
}

if (!trace::canTrace(operatorType())) {
VELOX_UNSUPPORTED("{} does not support tracing", operatorType());
}

auto& tracedOpMap = operatorCtx_->driverCtx()->tracedOperatorMap;
if (const auto iter = tracedOpMap.find(operatorId());
iter != tracedOpMap.end()) {
Expand Down
12 changes: 12 additions & 0 deletions velox/exec/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,16 @@ size_t getNumDrivers(
const std::shared_ptr<filesystems::FileSystem>& fs) {
return listDriverIds(nodeTraceDir, pipelineId, fs).size();
}

bool canTrace(const std::string& operatorType) {
static const std::unordered_set<std::string> kSupportedOperatorTypes{
"FilterProject",
"TableWrite",
"Aggregation",
"PartialAggregation",
"PartitionedOutput",
"HashBuild",
"HashProbe"};
return kSupportedOperatorTypes.count(operatorType) > 0;
}
} // namespace facebook::velox::exec::trace
3 changes: 3 additions & 0 deletions velox/exec/TraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,7 @@ std::vector<std::string> getTaskIds(
folly::dynamic getTaskMetadata(
const std::string& taskMetaFilePath,
const std::shared_ptr<filesystems::FileSystem>& fs);

/// Checks whether the operator can be traced.
bool canTrace(const std::string& operatorType);
} // namespace facebook::velox::exec::trace
142 changes: 142 additions & 0 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <folly/init/Init.h>
#include <gtest/gtest.h>
#include <memory>
#include <utility>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
Expand Down Expand Up @@ -620,4 +621,145 @@ TEST_F(OperatorTraceTest, traceTableWriter) {
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}

TEST_F(OperatorTraceTest, hashJoin) {
std::vector<RowVectorPtr> inputVectors;
constexpr auto numBatch = 5;
inputVectors.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, true},
{"wrong id", 10UL << 30, 0, false},
{"test_cursor \\d+", 10UL << 30, numBatch, false},
{"test_cursor \\d+", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
const auto planNodeIdGenerator{
std::make_shared<core::PlanNodeIdGenerator>()};
core::PlanNodeId hashJoinNodeId;
const auto planNode =
PlanBuilder(planNodeIdGenerator)
.values(inputVectors, false)
.project({"a AS c0", "b AS c1", "c AS c2"})
.hashJoin(
{"c0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.values(inputVectors, true)
.project({"a AS u0", "b AS u1", "c AS u2"})
.planNode(),
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.capturePlanNodeId(hashJoinNodeId)
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
if (testData.limitExceeded) {
VELOX_ASSERT_THROW(
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(
core::QueryConfig::kQueryTraceMaxBytes,
testData.maxTracedBytes)
.config(
core::QueryConfig::kQueryTraceTaskRegExp,
testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId)
.copyResults(pool(), task),
"Query exceeded per-query local trace limit of");
continue;
}
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId)
.copyResults(pool(), task);

const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task);
const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadata file should exist.
const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir);
ASSERT_TRUE(fs->exists(traceMetaFilePath));

const auto opTraceDir =
getOpTraceDirectory(taskTraceDir, hashJoinNodeId, 0, 0);

ASSERT_EQ(fs->list(opTraceDir).size(), 2);

const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read();
const auto reader =
trace::OperatorTraceInputReader(opTraceDir, dataType_, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
while (reader.read(actual)) {
const auto& expected = inputVectors[numOutputVectors];
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}

TEST_F(OperatorTraceTest, canTrace) {
struct {
const std::string operatorType;
const bool canTrace;

std::string debugString() const {
return fmt::format(
"operatorType: {}, canTrace: {}", operatorType, canTrace);
}
} testSettings[] = {
{"PartitionedOutput", true},
{"HashBuild", true},
{"HashProbe", true},
{"RowNumber", false},
{"OrderBy", false},
{"PartialAggregation", true},
{"Aggregation", true},
{"TableWrite", true},
{"FilterProject", true}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
ASSERT_EQ(testData.canTrace, trace::canTrace(testData.operatorType));
}
}
} // namespace facebook::velox::exec::trace::test
1 change: 1 addition & 0 deletions velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ velox_add_library(
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp
HashJoinReplayer.cpp
TraceReplayRunner.cpp)

velox_link_libraries(
Expand Down
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/HashJoinReplayer.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr HashJoinReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
const auto* hashJoinNode = dynamic_cast<const core::HashJoinNode*>(node);
return std::make_shared<core::HashJoinNode>(
nodeId,
hashJoinNode->joinType(),
hashJoinNode->isNullAware(),
hashJoinNode->leftKeys(),
hashJoinNode->rightKeys(),
hashJoinNode->filter(),
source,
PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeTraceDir_,
pipelineId_,
exec::trace::getDataType(planFragment_, nodeId_, 1))
.planNode(),
hashJoinNode->outputType());
}
} // namespace facebook::velox::tool::trace
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'HashJoin' operator.
class HashJoinReplayer final : public OperatorReplayerBase {
public:
HashJoinReplayer(
const std::string& rootDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
return exec::test::PlanBuilder()
return exec::test::PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeTraceDir_,
pipelineId_,
Expand Down
3 changes: 3 additions & 0 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/parse/PlanNodeIdGenerator.h"

namespace facebook::velox::exec {
class Task;
Expand Down Expand Up @@ -61,6 +62,8 @@ class OperatorReplayerBase {
const std::string nodeTraceDir_;
const std::shared_ptr<filesystems::FileSystem> fs_;
const int32_t maxDrivers_;
const std::shared_ptr<core::PlanNodeIdGenerator> planNodeIdGenerator_{
std::make_shared<core::PlanNodeIdGenerator>()};

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
Expand Down
9 changes: 9 additions & 0 deletions velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/parse/TypeResolver.h"
#include "velox/tool/trace/AggregationReplayer.h"
#include "velox/tool/trace/HashJoinReplayer.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/PartitionedOutputReplayer.h"
#include "velox/tool/trace/TableWriterReplayer.h"
Expand Down Expand Up @@ -109,6 +110,14 @@ std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer() {
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else if (FLAGS_operator_type == "HashJoin") {
replayer = std::make_unique<tool::trace::HashJoinReplayer>(
FLAGS_root_dir,
FLAGS_query_id,
FLAGS_task_id,
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else {
VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
add_executable(
velox_tool_trace_test
AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp
TableWriterReplayerTest.cpp)
TableWriterReplayerTest.cpp HashJoinReplayerTest.cpp)

add_test(
NAME velox_tool_trace_test
Expand Down
Loading

0 comments on commit 101c528

Please sign in to comment.