Skip to content

Commit

Permalink
Add hash join replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 16, 2024
1 parent 47a9328 commit 94c30f1
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 2 deletions.
1 change: 1 addition & 0 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ void HashBuild::removeInputRowsForAntiJoinFilter() {
void HashBuild::addInput(RowVectorPtr input) {
checkRunning();
ensureInputFits(input);
traceInput(input);

TestValue::adjust("facebook::velox::exec::HashBuild::addInput", this);

Expand Down
6 changes: 6 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ void Operator::maybeSetTracer() {
return;
}

VELOX_CHECK(
trace::QueryTraceTraits::supportedOperatorTypes.count(
operatorCtx_->operatorType()) != 0,
"{} do not support tracing",
operatorCtx_->operatorType());

auto& tracedOpMap = operatorCtx_->driverCtx()->tracedOperatorMap;
if (const auto iter = tracedOpMap.find(operatorId());
iter != tracedOpMap.end()) {
Expand Down
9 changes: 9 additions & 0 deletions velox/exec/QueryTraceTraits.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,14 @@ struct QueryTraceTraits {
static inline const std::string kQueryMetaFileName = "query_meta.json";
static inline const std::string kDataSummaryFileName = "data_summary.json";
static inline const std::string kDataFileName = "trace.data";

static inline const std::unordered_set<std::string> supportedOperatorTypes{
"TableWriter",
"TableScan",
"Aggregation",
"PartialAggregation",
"PartitionedOutput",
"HashBuild",
"HashProbe"};
};
} // namespace facebook::velox::exec::trace
3 changes: 2 additions & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ velox_add_library(
AggregationReplayer.cpp
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp)
TableWriterReplayer.cpp
HashJoinReplayer.cpp)

velox_link_libraries(
velox_query_trace_replayer_base
Expand Down
50 changes: 50 additions & 0 deletions velox/tool/trace/HashJoinReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/HashJoinReplayer.h"

#include <exec/QueryTraceUtil.h>

#include "velox/exec/QueryDataReader.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr HashJoinReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
const auto* hashJoinNode = dynamic_cast<const core::HashJoinNode*>(node);
return std::make_shared<core::HashJoinNode>(
nodeId,
hashJoinNode->joinType(),
hashJoinNode->isNullAware(),
hashJoinNode->leftKeys(),
hashJoinNode->rightKeys(),
hashJoinNode->filter(),
source,
PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeDir_, exec::trace::getDataType(planFragment_, nodeId_, 1))
.planNode(),
hashJoinNode->outputType());

return nullptr;
}
} // namespace facebook::velox::tool::trace
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

#include <parse/PlanNodeIdGenerator.h>

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'HashAggregation' operator.
class HashJoinReplayer : public OperatorReplayerBase {
public:
HashJoinReplayer(
const std::string& rootDir,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
return exec::test::PlanBuilder()
return exec::test::PlanBuilder(planNodeIdGenerator_)
.traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_))
.addNode(replayNodeFactory(replayNode))
.planNode();
Expand Down
4 changes: 4 additions & 0 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

namespace facebook::velox::exec {
class Task;
Expand Down Expand Up @@ -59,6 +60,9 @@ class OperatorReplayerBase {
const std::string taskDir_;
const std::string nodeDir_;

std::shared_ptr<core::PlanNodeIdGenerator> planNodeIdGenerator_ =
std::make_shared<core::PlanNodeIdGenerator>();

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs_;
Expand Down

0 comments on commit 94c30f1

Please sign in to comment.