From b096d6d81e8a9adc0cca3c37202df56130165fd8 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Wed, 16 Oct 2024 19:43:25 +0800 Subject: [PATCH] Add hash join replayer --- velox/exec/HashBuild.cpp | 1 + velox/exec/Operator.cpp | 6 +++ velox/exec/QueryTraceTraits.h | 9 +++++ velox/tool/trace/CMakeLists.txt | 3 +- velox/tool/trace/HashJoinReplayer.cpp | 49 +++++++++++++++++++++++ velox/tool/trace/HashJoinReplayer.h | 47 ++++++++++++++++++++++ velox/tool/trace/OperatorReplayerBase.cpp | 2 +- velox/tool/trace/OperatorReplayerBase.h | 3 ++ 8 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 velox/tool/trace/HashJoinReplayer.cpp create mode 100644 velox/tool/trace/HashJoinReplayer.h diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index e5f5ae6390251..24ff099dbb409 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -314,6 +314,7 @@ void HashBuild::removeInputRowsForAntiJoinFilter() { void HashBuild::addInput(RowVectorPtr input) { checkRunning(); ensureInputFits(input); + traceInput(input); TestValue::adjust("facebook::velox::exec::HashBuild::addInput", this); diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 3b65536ba04cd..9c0b39802eac9 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -121,6 +121,12 @@ void Operator::maybeSetTracer() { return; } + VELOX_CHECK( + trace::QueryTraceTraits::supportedOperatorTypes.count( + operatorCtx_->operatorType()) != 0, + "{} do not support tracing", + operatorCtx_->operatorType()); + auto& tracedOpMap = operatorCtx_->driverCtx()->tracedOperatorMap; if (const auto iter = tracedOpMap.find(operatorId()); iter != tracedOpMap.end()) { diff --git a/velox/exec/QueryTraceTraits.h b/velox/exec/QueryTraceTraits.h index ad817115b4902..9bddeb72b5193 100644 --- a/velox/exec/QueryTraceTraits.h +++ b/velox/exec/QueryTraceTraits.h @@ -31,5 +31,14 @@ struct QueryTraceTraits { static inline const std::string kQueryMetaFileName = "query_meta.json"; static inline const std::string kDataSummaryFileName = "data_summary.json"; static inline const std::string kDataFileName = "trace.data"; + + static inline const std::unordered_set supportedOperatorTypes{ + "TableWriter", + "TableScan", + "Aggregation", + "PartialAggregation", + "PartitionedOutput", + "HashBuild", + "HashProbe"}; }; } // namespace facebook::velox::exec::trace diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 3218b4ee30b0b..98c0f6d524227 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -17,7 +17,8 @@ velox_add_library( AggregationReplayer.cpp OperatorReplayerBase.cpp PartitionedOutputReplayer.cpp - TableWriterReplayer.cpp) + TableWriterReplayer.cpp + HashJoinReplayer.cpp) velox_link_libraries( velox_query_trace_replayer_base diff --git a/velox/tool/trace/HashJoinReplayer.cpp b/velox/tool/trace/HashJoinReplayer.cpp new file mode 100644 index 0000000000000..5b1359cbfb151 --- /dev/null +++ b/velox/tool/trace/HashJoinReplayer.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/HashJoinReplayer.h" + +#include + +#include "velox/exec/QueryDataReader.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { +core::PlanNodePtr HashJoinReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const { + const auto* hashJoinNode = dynamic_cast(node); + return std::make_shared( + nodeId, + hashJoinNode->joinType(), + hashJoinNode->isNullAware(), + hashJoinNode->leftKeys(), + hashJoinNode->rightKeys(), + hashJoinNode->filter(), + source, + PlanBuilder(planNodeIdGenerator_) + .traceScan( + nodeDir_, exec::trace::getDataType(planFragment_, nodeId_), 1), + hashJoinNode->outputType()); + + return nullptr; +} +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/HashJoinReplayer.h b/velox/tool/trace/HashJoinReplayer.h new file mode 100644 index 0000000000000..48d8a201ab26a --- /dev/null +++ b/velox/tool/trace/HashJoinReplayer.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/core/PlanNode.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +#include + +namespace facebook::velox::tool::trace { +/// The replayer to replay the traced 'HashAggregation' operator. +class HashJoinReplayer : public OperatorReplayerBase { + public: + HashJoinReplayer( + const std::string& rootDir, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType) + : OperatorReplayerBase( + rootDir, + taskId, + nodeId, + pipelineId, + operatorType) {} + + private: + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const override; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index b54ad323a4471..ff0af99de061b 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -68,7 +68,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { const auto* replayNode = core::PlanNode::findFirstNode( planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); - return exec::test::PlanBuilder() + return exec::test::PlanBuilder(planNodeIdGenerator_) .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) .addNode(replayNodeFactory(replayNode)) .planNode(); diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 863a8f5f80f69..fe0bbda0b6270 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -59,6 +59,9 @@ class OperatorReplayerBase { const std::string taskDir_; const std::string nodeDir_; + const auto planNodeIdGenerator_ = + std::make_shared(); + std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_;