Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FilterProjectReplayer #11351

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,108 @@ TEST_F(OperatorTraceTest, traceTableWriter) {
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}

TEST_F(OperatorTraceTest, filterProject) {
std::vector<RowVectorPtr> inputVectors;
constexpr auto numBatch = 5;
inputVectors.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, true},
{"wrong id", 10UL << 30, 0, false},
{"test_cursor \\d+", 10UL << 30, numBatch, false},
{"test_cursor \\d+", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
core::PlanNodeId projectNodeId;
const auto planNode = PlanBuilder()
.values(inputVectors)
.filter("a % 10 < 9")
.project({"a", "b", "a % 100 + c % 50 AS d"})
.capturePlanNodeId(projectNodeId)
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
if (testData.limitExceeded) {
VELOX_ASSERT_THROW(
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(
core::QueryConfig::kQueryTraceMaxBytes,
testData.maxTracedBytes)
.config(
core::QueryConfig::kQueryTraceTaskRegExp,
testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, projectNodeId)
.copyResults(pool(), task),
"Query exceeded per-query local trace limit of");
continue;
}
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, projectNodeId)
.copyResults(pool(), task);

const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task);
const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadata file should exist.
const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir);
ASSERT_TRUE(fs->exists(traceMetaFilePath));

const auto opTraceDir =
getOpTraceDirectory(taskTraceDir, projectNodeId, 0, 0);

ASSERT_EQ(fs->list(opTraceDir).size(), 2);

const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read();
const auto reader =
trace::OperatorTraceInputReader(opTraceDir, dataType_, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
while (reader.read(actual)) {
const auto& expected = inputVectors[numOutputVectors];
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}
} // namespace facebook::velox::exec::trace::test
3 changes: 2 additions & 1 deletion velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ velox_add_library(
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp
FilterProjectReplayer.cpp
TraceReplayRunner.cpp)

velox_link_libraries(
Expand All @@ -34,7 +35,7 @@ velox_link_libraries(
glog::glog
gflags::gflags)

add_executable(velox_query_replayer TraceReplayerMain.cpp)
add_executable(velox_query_replayer TraceReplayerMain.cpp TraceReplayRunner.cpp)

target_link_libraries(
velox_query_replayer
Expand Down
76 changes: 76 additions & 0 deletions velox/tool/trace/FilterProjectReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/FilterProjectReplayer.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr FilterProjectReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
if (node->name() == "Filter") {
const auto* filterNode = dynamic_cast<const core::FilterNode*>(node);
VELOX_CHECK(
!isFilterProject(dynamic_cast<const core::FilterNode*>(node)),
"If the target node is a FilterNode, it must be a standalone FilterNode");

// A standalone FilterNode.
return std::make_shared<core::FilterNode>(
nodeId, filterNode->filter(), source);
}

const auto* projectNode = dynamic_cast<const core::ProjectNode*>(node);

// A standalone ProjectNode.
if (node->sources().empty() || node->sources().front()->name() != "Filter") {
return std::make_shared<core::ProjectNode>(
nodeId, projectNode->names(), projectNode->projections(), source);
}

// A ProjectNode with a FilterNode as its source.
// -- ProjectNode [nodeId]
// -- FilterNode [nodeId - 1]
const auto originalFilterNode =
std::dynamic_pointer_cast<const core::FilterNode>(
node->sources().front());
const auto filterNode = std::make_shared<core::FilterNode>(
nodeId, originalFilterNode->filter(), source);
const auto projectNodeId = planNodeIdGenerator_->next();
return std::make_shared<core::ProjectNode>(
projectNodeId,
projectNode->names(),
projectNode->projections(),
filterNode);
}

bool FilterProjectReplayer::isFilterProject(
const core::PlanNode* filterNode) const {
const auto* projectNode =
dynamic_cast<const core::ProjectNode*>(core::PlanNode::findFirstNode(
planFragment_.get(), [this](const core::PlanNode* node) {
return node->id() == std::to_string(std::stoull(nodeId_) + 1);
}));
return projectNode != nullptr && projectNode->name() == "Project" &&
projectNode->sources().size() == 1 &&
projectNode->sources().front()->id() == nodeId_;
}
} // namespace facebook::velox::tool::trace
60 changes: 60 additions & 0 deletions velox/tool/trace/FilterProjectReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'FilterProject' operator.
///
/// NOTE: For the plan fragment involving FilterNode->ProjectNode, users must
/// use the ProjectNode ID for tracing. This is because the planner will combine
/// these two operators into a single FilterProject operator. During replay,
/// the ProjectNode ID will be used to locate the trace data directory.
class FilterProjectReplayer : public OperatorReplayerBase {
public:
FilterProjectReplayer(
const std::string& rootDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
// Create either a standalone FilterNode, a standalone ProjectNode, or a
// ProjectNode with a FilterNode as its source.
//
// NOTE: If the target node is a FilterNode, it must be a standalone
// FilterNode, without a ProjectNode as its parent.
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;

// Checks whether the FilterNode is a source node of a ProjectNode.
bool isFilterProject(const core::PlanNode* filterNode) const;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
const auto* replayNode = core::PlanNode::findFirstNode(
planFragment_.get(),
[this](const core::PlanNode* node) { return node->id() == nodeId_; });
return exec::test::PlanBuilder()
return exec::test::PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeTraceDir_,
pipelineId_,
Expand Down
4 changes: 4 additions & 0 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

namespace facebook::velox::exec {
class Task;
Expand Down Expand Up @@ -62,6 +63,9 @@ class OperatorReplayerBase {
const std::shared_ptr<filesystems::FileSystem> fs_;
const int32_t maxDrivers_;

const std::shared_ptr<core::PlanNodeIdGenerator> planNodeIdGenerator_{
std::make_shared<core::PlanNodeIdGenerator>()};

std::unordered_map<std::string, std::string> queryConfigs_;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs_;
Expand Down
9 changes: 9 additions & 0 deletions velox/tool/trace/TraceReplayRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/parse/TypeResolver.h"
#include "velox/tool/trace/AggregationReplayer.h"
#include "velox/tool/trace/FilterProjectReplayer.h"
#include "velox/tool/trace/OperatorReplayerBase.h"
#include "velox/tool/trace/PartitionedOutputReplayer.h"
#include "velox/tool/trace/TableWriterReplayer.h"
Expand Down Expand Up @@ -109,6 +110,14 @@ std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer() {
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else if (FLAGS_operator_type == "FilterProject") {
replayer = std::make_unique<tool::trace::FilterProjectReplayer>(
FLAGS_root_dir,
FLAGS_query_id,
FLAGS_task_id,
FLAGS_node_id,
FLAGS_pipeline_id,
FLAGS_operator_type);
} else {
VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/tool/trace/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
add_executable(
velox_tool_trace_test
AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp
TableWriterReplayerTest.cpp)
TableWriterReplayerTest.cpp FilterProjectReplayerTest.cpp)

add_test(
NAME velox_tool_trace_test
Expand Down
Loading
Loading