From bbcfc21fbfce75f5cc63cd3334e3fdd3f5431151 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Fri, 25 Oct 2024 23:18:42 +0800 Subject: [PATCH] Add FilterProject Tracing --- velox/tool/trace/CMakeLists.txt | 5 +- velox/tool/trace/FilterProjectReplayer.cpp | 84 ++++++ velox/tool/trace/FilterProjectReplayer.h | 62 +++++ velox/tool/trace/OperatorReplayerBase.cpp | 2 +- velox/tool/trace/OperatorReplayerBase.h | 4 + velox/tool/trace/TraceReplayRunner.cpp | 9 + velox/tool/trace/tests/CMakeLists.txt | 2 +- .../trace/tests/FilterProjectReplayerTest.cpp | 258 ++++++++++++++++++ 8 files changed, 422 insertions(+), 4 deletions(-) create mode 100644 velox/tool/trace/FilterProjectReplayer.cpp create mode 100644 velox/tool/trace/FilterProjectReplayer.h create mode 100644 velox/tool/trace/tests/FilterProjectReplayerTest.cpp diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 8c68c9967db31..befd11e9387f3 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -17,7 +17,8 @@ velox_add_library( AggregationReplayer.cpp OperatorReplayerBase.cpp PartitionedOutputReplayer.cpp - TableWriterReplayer.cpp) + TableWriterReplayer.cpp + FilterProjectReplayer.cpp) velox_link_libraries( velox_query_trace_replayer_base @@ -33,7 +34,7 @@ velox_link_libraries( glog::glog gflags::gflags) -add_executable(velox_query_replayer TraceReplayerMain.cpp) +add_executable(velox_query_replayer TraceReplayerMain.cpp TraceReplayRunner.cpp) target_link_libraries( velox_query_replayer diff --git a/velox/tool/trace/FilterProjectReplayer.cpp b/velox/tool/trace/FilterProjectReplayer.cpp new file mode 100644 index 0000000000000..db3588e528afc --- /dev/null +++ b/velox/tool/trace/FilterProjectReplayer.cpp @@ -0,0 +1,84 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/FilterProjectReplayer.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { +core::PlanNodePtr FilterProjectReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const { + if (node->name() == "Filter") { + const auto* filterNode = dynamic_cast(node); + const auto* parentNode = core::PlanNode::findFirstNode( + planFragment_.get(), [this](const core::PlanNode* node) { + return node->id() == std::to_string(std::stoull(nodeId_) + 1); + }); + const auto* projectNode = + dynamic_cast(parentNode); + VELOX_CHECK( + !isFilterProject(filterNode), + "If the target node is aFilterNode, it must be a standalone " + "FilterNode, without a ProjectNode as its parent."); + + // A standalone FilterNode. + return std::make_shared( + nodeId, filterNode->filter(), source); + } + + const auto* projectNode = dynamic_cast(node); + + // A standalone ProjectNode. + if (node->sources().size() != 1 || + node->sources().front()->name() != "Filter") { + return std::make_shared( + nodeId, projectNode->names(), projectNode->projections(), source); + } + + // A ProjectNode with a FilterNode as its source. + // -- ProjectNode [nodeId] + // -- FilterNode [nodeId - 1] + const auto originalFilterNode = + std::dynamic_pointer_cast( + node->sources().front()); + const auto filterNode = std::make_shared( + nodeId, originalFilterNode->filter(), source); + const auto projectNodeId = planNodeIdGenerator_->next(); + return std::make_shared( + projectNodeId, + projectNode->names(), + projectNode->projections(), + filterNode); +} + +bool FilterProjectReplayer::isFilterProject( + const core::PlanNode* filterNode) const { + const auto* projectNode = + dynamic_cast(core::PlanNode::findFirstNode( + planFragment_.get(), [this](const core::PlanNode* node) { + return node->id() == std::to_string(std::stoull(nodeId_) + 1); + })); + return projectNode != nullptr && projectNode->name() == "Project" && + projectNode->sources().size() == 1 && + projectNode->sources().front()->id() == nodeId_; +} +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/FilterProjectReplayer.h b/velox/tool/trace/FilterProjectReplayer.h new file mode 100644 index 0000000000000..c38541a4d6c9c --- /dev/null +++ b/velox/tool/trace/FilterProjectReplayer.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/core/PlanNode.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +#include + +namespace facebook::velox::tool::trace { +/// The replayer to replay the traced 'HashJoin' operator. +/// +/// NOTE: For the plan fragment involving FilterNode->ProjectNode, users must +/// use the ProjectNode ID for tracing. This is because the planner will combine +/// these two operators into a single FilterProject operator. During replay, +/// the ProjectNode ID will be used to locate the trace data directory. +class FilterProjectReplayer : public OperatorReplayerBase { + public: + FilterProjectReplayer( + const std::string& rootDir, + const std::string& queryId, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType) + : OperatorReplayerBase( + rootDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType) {} + + private: + // Create either a standalone FilterNode, a standalone ProjectNode, or a + // ProjectNode with a FilterNode as its source. + // + // NOTE: If the target node is a FilterNode, it must be a standalone + // FilterNode, without a ProjectNode as its parent. + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const override; + + // Checks whether the FilterNode is a source node of a ProjectNode. + bool isFilterProject(const core::PlanNode* filterNode) const; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 2083e676f6781..463752732dae5 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -67,7 +67,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { const auto* replayNode = core::PlanNode::findFirstNode( planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); - return exec::test::PlanBuilder() + return exec::test::PlanBuilder(planNodeIdGenerator_) .traceScan( nodeTraceDir_, pipelineId_, diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 50e31fdaf02f5..52092d98af8c2 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -18,6 +18,7 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +#include "velox/exec/tests/utils/PlanBuilder.h" namespace facebook::velox::exec { class Task; @@ -62,6 +63,9 @@ class OperatorReplayerBase { const std::shared_ptr fs_; const int32_t maxDrivers_; + const std::shared_ptr planNodeIdGenerator_ = + std::make_shared(); + std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_; diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index b2cd0d4a0be3b..7ed56c6ca653e 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -41,6 +41,7 @@ #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/parse/TypeResolver.h" #include "velox/tool/trace/AggregationReplayer.h" +#include "velox/tool/trace/FilterProjectReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" #include "velox/tool/trace/TableWriterReplayer.h" @@ -109,6 +110,14 @@ std::unique_ptr createReplayer() { FLAGS_node_id, FLAGS_pipeline_id, FLAGS_operator_type); + } else if (FLAGS_operator_type == "FilterProject") { + replayer = std::make_unique( + FLAGS_root_dir, + FLAGS_query_id, + FLAGS_task_id, + FLAGS_node_id, + FLAGS_pipeline_id, + FLAGS_operator_type); } else { VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type); } diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index 03d1705eebe76..91b9dcfc73f90 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -15,7 +15,7 @@ add_executable( velox_tool_trace_test AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp - TableWriterReplayerTest.cpp) + TableWriterReplayerTest.cpp FilterProjectReplayerTest.cpp) add_test( NAME velox_tool_trace_test diff --git a/velox/tool/trace/tests/FilterProjectReplayerTest.cpp b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp new file mode 100644 index 0000000000000..42657528791d7 --- /dev/null +++ b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp @@ -0,0 +1,258 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#include + +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/common/testutil/TestValue.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/ArbitratorTestUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/FilterProjectReplayer.h" + +#include "velox/common/file/Utils.h" +#include "velox/exec/PlanNodeStats.h" + +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::common::testutil; +using namespace facebook::velox::common::hll; + +namespace facebook::velox::tool::trace::test { +class FilterProjectReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + void TearDown() override { + input_.clear(); + HiveConnectorTestBase::TearDown(); + } + + struct PlanWithSplits { + core::PlanNodePtr plan; + std::vector splits; + + explicit PlanWithSplits( + core::PlanNodePtr _plan, + std::vector _splits = {}) + : plan(std::move(_plan)), splits(std::move(_splits)) {} + }; + + std::vector + makeVectors(int32_t count, int32_t rowsPerVector, const RowTypePtr& rowType) { + return HiveConnectorTestBase::makeVectors(rowType, count, rowsPerVector); + } + + std::vector makeSplits( + const std::vector& inputs, + const std::string& path, + memory::MemoryPool* writerPool) { + std::vector splits; + for (auto i = 0; i < 4; ++i) { + const std::string filePath = fmt::format("{}/{}", path, i); + writeToFile(filePath, inputs); + splits.emplace_back(makeHiveConnectorSplit(filePath)); + } + + return splits; + } + + enum class PlanMode { + FilterProject = 0, + FilterOnly = 1, + ProjectOnly = 2, + }; + + PlanWithSplits createPlan(PlanMode planMode) { + core::PlanNodePtr plan = nullptr; + if (planMode == PlanMode::FilterProject) { + plan = PlanBuilder() + .tableScan(inputType_) + .filter("c0 % 10 < 9") + .capturePlanNodeId(filterNodeId_) + .project({"c0", "c1", "c0 % 100 + c1 % 50 AS e1"}) + .capturePlanNodeId(projectNodeId_) + .planNode(); + } else if (planMode == PlanMode::FilterOnly) { + plan = PlanBuilder() + .tableScan(inputType_) + .filter("c0 % 10 < 9") + .capturePlanNodeId(filterNodeId_) + .planNode(); + } else { + plan = PlanBuilder() + .tableScan(inputType_) + .project({"c0", "c1", "c0 % 100 + c1 % 50 AS e1"}) + .capturePlanNodeId(projectNodeId_) + .planNode(); + } + const std::vector splits = makeSplits( + input_, fmt::format("{}/splits", testDir_->getPath()), pool()); + return PlanWithSplits{plan, splits}; + } + + core::PlanNodeId traceNodeId_; + core::PlanNodeId filterNodeId_; + core::PlanNodeId projectNodeId_; + RowTypePtr inputType_{ + ROW({"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), SMALLINT(), REAL()})}; + std::vector input_ = makeVectors(5, 100, inputType_); + + const std::shared_ptr testDir_ = + TempDirectoryPath::create(); +}; + +TEST_F(FilterProjectReplayerTest, filterProject) { + const auto planWithSplits = createPlan(PlanMode::FilterProject); + AssertQueryBuilder builder(planWithSplits.plan); + const auto result = builder.splits(planWithSplits.splits).copyResults(pool()); + + const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "basic"); + const auto tracePlanWithSplits = createPlan(PlanMode::FilterProject); + std::shared_ptr task; + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(2) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config( + core::QueryConfig::kQueryTraceNodeIds, + fmt::format("{},{}", filterNodeId_, projectNodeId_)); + auto traceResult = + traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + auto replayingResult = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + projectNodeId_, + 0, + "HashJoin") + .run(); + assertEqualResults({result}, {replayingResult}); +} + +TEST_F(FilterProjectReplayerTest, filterOnly) { + const auto planWithSplits = createPlan(PlanMode::FilterOnly); + AssertQueryBuilder builder(planWithSplits.plan); + const auto result = builder.splits(planWithSplits.splits).copyResults(pool()); + + const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "filter"); + const auto tracePlanWithSplits = createPlan(PlanMode::FilterOnly); + std::shared_ptr task; + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(2) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config( + core::QueryConfig::kQueryTraceNodeIds, + fmt::format("{},{}", filterNodeId_, projectNodeId_)); + auto traceResult = + traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + auto replayingResult = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + filterNodeId_, + 0, + "HashJoin") + .run(); + assertEqualResults({result}, {replayingResult}); +} + +TEST_F(FilterProjectReplayerTest, projectOnly) { + const auto planWithSplits = createPlan(PlanMode::ProjectOnly); + AssertQueryBuilder builder(planWithSplits.plan); + const auto result = builder.splits(planWithSplits.splits).copyResults(pool()); + + const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "project"); + const auto tracePlanWithSplits = createPlan(PlanMode::ProjectOnly); + std::shared_ptr task; + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(2) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config( + core::QueryConfig::kQueryTraceNodeIds, + fmt::format("{},{}", filterNodeId_, projectNodeId_)); + auto traceResult = + traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + auto replayingResult = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + projectNodeId_, + 0, + "HashJoin") + .run(); + assertEqualResults({result}, {replayingResult}); +} +} // namespace facebook::velox::tool::trace::test