Skip to content

Commit

Permalink
Add hash join replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 25, 2024
1 parent f8da123 commit 4dcada9
Show file tree
Hide file tree
Showing 14 changed files with 585 additions and 3 deletions.
3 changes: 3 additions & 0 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ void HashBuild::removeInputRowsForAntiJoinFilter() {
}

void HashBuild::addInput(RowVectorPtr input) {
if (FOLLY_UNLIKELY(!isInputFromSpill())) {
traceInput(input);
}
checkRunning();
ensureInputFits(input);

Expand Down
3 changes: 3 additions & 0 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ void HashProbe::decodeAndDetectNonNullKeys() {
}

void HashProbe::addInput(RowVectorPtr input) {
if (FOLLY_UNLIKELY(!isSpillInput())) {
traceInput(input);
}
if (skipInput_) {
VELOX_CHECK_NULL(input_);
return;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ void Operator::maybeSetTracer() {
return;
}

if (!trace::canTrace(operatorType())) {
VELOX_UNSUPPORTED("{} does not support tracing", operatorType());
}

auto& tracedOpMap = operatorCtx_->driverCtx()->tracedOperatorMap;
if (const auto iter = tracedOpMap.find(operatorId());
iter != tracedOpMap.end()) {
Expand Down
10 changes: 10 additions & 0 deletions velox/exec/QueryTraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,14 @@ getDataDir(const std::string& traceDir, int pipelineId, int driverId) {
return fmt::format("{}/{}/{}/data", traceDir, pipelineId, driverId);
}

bool canTrace(const std::string& operatorType) {
static const std::unordered_set<std::string> kSupportedOperatorTypes{
"TableWrite",
"Aggregation",
"PartialAggregation",
"PartitionedOutput",
"HashBuild",
"HashProbe"};
return kSupportedOperatorTypes.count(operatorType) > 0;
}
} // namespace facebook::velox::exec::trace
3 changes: 3 additions & 0 deletions velox/exec/QueryTraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,7 @@ folly::dynamic getMetadata(
/// given plan node, which is $traceRoot/$taskId/$nodeId.
std::string
getDataDir(const std::string& traceDir, int pipelineId, int driverId);

/// Checks whether the operator can be traced.
bool canTrace(const std::string& operatorType);
} // namespace facebook::velox::exec::trace
26 changes: 26 additions & 0 deletions velox/exec/tests/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <gtest/gtest.h>
#include <algorithm>
#include <memory>
#include <utility>

#include "velox/common/file/FileSystems.h"
#include "velox/exec/PartitionFunction.h"
Expand Down Expand Up @@ -541,4 +542,29 @@ TEST_F(QueryTracerTest, traceTableWriter) {
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}

TEST_F(QueryTracerTest, canTrace) {
struct {
const std::string operatorType;
const bool canTrace;

std::string debugString() const {
return fmt::format(
"operatorType: {}, canTrace: {}", operatorType, canTrace);
}
} testSettings[] = {
{"PartitionedOutput", true},
{"HashBuild", true},
{"HashProbe", true},
{"RowNumber", false},
{"OrderBy", false},
{"PartialAggregation", true},
{"Aggregation", true},
{"TableWrite", true},
{"TableScan", false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
ASSERT_EQ(testData.canTrace, trace::canTrace(testData.operatorType));
}
}
} // namespace facebook::velox::exec::trace::test
3 changes: 2 additions & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ velox_add_library(
AggregationReplayer.cpp
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp)
TableWriterReplayer.cpp
HashJoinReplayer.cpp)

velox_link_libraries(
velox_query_trace_replayer_base
Expand Down
45 changes: 45 additions & 0 deletions velox/tool/trace/HashJoinReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/HashJoinReplayer.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr HashJoinReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
const auto* hashJoinNode = dynamic_cast<const core::HashJoinNode*>(node);
return std::make_shared<core::HashJoinNode>(
nodeId,
hashJoinNode->joinType(),
hashJoinNode->isNullAware(),
hashJoinNode->leftKeys(),
hashJoinNode->rightKeys(),
hashJoinNode->filter(),
source,
PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeDir_, exec::trace::getDataType(planFragment_, nodeId_, 1))
.planNode(),
hashJoinNode->outputType());
}
} // namespace facebook::velox::tool::trace
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

#include <parse/PlanNodeIdGenerator.h>

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'HashJoin' operator.
class HashJoinReplayer : public OperatorReplayerBase {
public:
HashJoinReplayer(
const std::string& rootDir,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
return exec::test::PlanBuilder()
return exec::test::PlanBuilder(planNodeIdGenerator_)
.traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_))
.addNode(replayNodeFactory(replayNode))
.planNode();
Expand Down
4 changes: 4 additions & 0 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

namespace facebook::velox::exec {
class Task;
Expand Down Expand Up @@ -59,6 +60,9 @@ class OperatorReplayerBase {
const std::string taskDir_;
const std::string nodeDir_;

const std::shared_ptr<core::PlanNodeIdGenerator> planNodeIdGenerator_ =
std::make_shared<core::PlanNodeIdGenerator>();

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs_;
Expand Down
8 changes: 8 additions & 0 deletions velox/tool/trace/QueryReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "velox/parse/ExpressionsParser.h"
#include "velox/parse/TypeResolver.h"
#include "velox/tool/trace/AggregationReplayer.h"
#include "velox/tool/trace/HashJoinReplayer.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/PartitionedOutputReplayer.h"
#include "velox/tool/trace/TableWriterReplayer.h"
Expand Down Expand Up @@ -148,6 +149,13 @@ std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer() {
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else if (FLAGS_operator_type == "HashJoin") {
replayer = std::make_unique<tool::trace::HashJoinReplayer>(
FLAGS_root_dir,
FLAGS_task_id,
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else {
VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
add_executable(
velox_tool_trace_test
AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp
TableWriterReplayerTest.cpp)
TableWriterReplayerTest.cpp HashJoinReplayerTest.cpp)

add_test(
NAME velox_tool_trace_test
Expand Down
Loading

0 comments on commit 4dcada9

Please sign in to comment.