Skip to content

Commit

Permalink
Add FilterProject Tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 30, 2024
1 parent df5cff5 commit bbcfc21
Show file tree
Hide file tree
Showing 8 changed files with 422 additions and 4 deletions.
5 changes: 3 additions & 2 deletions velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ velox_add_library(
AggregationReplayer.cpp
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp)
TableWriterReplayer.cpp
FilterProjectReplayer.cpp)

velox_link_libraries(
velox_query_trace_replayer_base
Expand All @@ -33,7 +34,7 @@ velox_link_libraries(
glog::glog
gflags::gflags)

add_executable(velox_query_replayer TraceReplayerMain.cpp)
add_executable(velox_query_replayer TraceReplayerMain.cpp TraceReplayRunner.cpp)

target_link_libraries(
velox_query_replayer
Expand Down
84 changes: 84 additions & 0 deletions velox/tool/trace/FilterProjectReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/FilterProjectReplayer.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr FilterProjectReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
if (node->name() == "Filter") {
const auto* filterNode = dynamic_cast<const core::FilterNode*>(node);
const auto* parentNode = core::PlanNode::findFirstNode(
planFragment_.get(), [this](const core::PlanNode* node) {
return node->id() == std::to_string(std::stoull(nodeId_) + 1);
});
const auto* projectNode =
dynamic_cast<const core::ProjectNode*>(parentNode);
VELOX_CHECK(
!isFilterProject(filterNode),
"If the target node is aFilterNode, it must be a standalone "
"FilterNode, without a ProjectNode as its parent.");

// A standalone FilterNode.
return std::make_shared<core::FilterNode>(
nodeId, filterNode->filter(), source);
}

const auto* projectNode = dynamic_cast<const core::ProjectNode*>(node);

// A standalone ProjectNode.
if (node->sources().size() != 1 ||
node->sources().front()->name() != "Filter") {
return std::make_shared<core::ProjectNode>(
nodeId, projectNode->names(), projectNode->projections(), source);
}

// A ProjectNode with a FilterNode as its source.
// -- ProjectNode [nodeId]
// -- FilterNode [nodeId - 1]
const auto originalFilterNode =
std::dynamic_pointer_cast<const core::FilterNode>(
node->sources().front());
const auto filterNode = std::make_shared<core::FilterNode>(
nodeId, originalFilterNode->filter(), source);
const auto projectNodeId = planNodeIdGenerator_->next();
return std::make_shared<core::ProjectNode>(
projectNodeId,
projectNode->names(),
projectNode->projections(),
filterNode);
}

bool FilterProjectReplayer::isFilterProject(
const core::PlanNode* filterNode) const {
const auto* projectNode =
dynamic_cast<const core::ProjectNode*>(core::PlanNode::findFirstNode(
planFragment_.get(), [this](const core::PlanNode* node) {
return node->id() == std::to_string(std::stoull(nodeId_) + 1);
}));
return projectNode != nullptr && projectNode->name() == "Project" &&
projectNode->sources().size() == 1 &&
projectNode->sources().front()->id() == nodeId_;
}
} // namespace facebook::velox::tool::trace
62 changes: 62 additions & 0 deletions velox/tool/trace/FilterProjectReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

#include <parse/PlanNodeIdGenerator.h>

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'HashJoin' operator.
///
/// NOTE: For the plan fragment involving FilterNode->ProjectNode, users must
/// use the ProjectNode ID for tracing. This is because the planner will combine
/// these two operators into a single FilterProject operator. During replay,
/// the ProjectNode ID will be used to locate the trace data directory.
class FilterProjectReplayer : public OperatorReplayerBase {
public:
FilterProjectReplayer(
const std::string& rootDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
// Create either a standalone FilterNode, a standalone ProjectNode, or a
// ProjectNode with a FilterNode as its source.
//
// NOTE: If the target node is a FilterNode, it must be a standalone
// FilterNode, without a ProjectNode as its parent.
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;

// Checks whether the FilterNode is a source node of a ProjectNode.
bool isFilterProject(const core::PlanNode* filterNode) const;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
return exec::test::PlanBuilder()
return exec::test::PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeTraceDir_,
pipelineId_,
Expand Down
4 changes: 4 additions & 0 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

namespace facebook::velox::exec {
class Task;
Expand Down Expand Up @@ -62,6 +63,9 @@ class OperatorReplayerBase {
const std::shared_ptr<filesystems::FileSystem> fs_;
const int32_t maxDrivers_;

const std::shared_ptr<core::PlanNodeIdGenerator> planNodeIdGenerator_ =
std::make_shared<core::PlanNodeIdGenerator>();

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs_;
Expand Down
9 changes: 9 additions & 0 deletions velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/parse/TypeResolver.h"
#include "velox/tool/trace/AggregationReplayer.h"
#include "velox/tool/trace/FilterProjectReplayer.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/PartitionedOutputReplayer.h"
#include "velox/tool/trace/TableWriterReplayer.h"
Expand Down Expand Up @@ -109,6 +110,14 @@ std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer() {
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else if (FLAGS_operator_type == "FilterProject") {
replayer = std::make_unique<tool::trace::FilterProjectReplayer>(
FLAGS_root_dir,
FLAGS_query_id,
FLAGS_task_id,
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else {
VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
add_executable(
velox_tool_trace_test
AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp
TableWriterReplayerTest.cpp)
TableWriterReplayerTest.cpp FilterProjectReplayerTest.cpp)

add_test(
NAME velox_tool_trace_test
Expand Down
Loading

0 comments on commit bbcfc21

Please sign in to comment.