diff --git a/velox/exec/tests/OperatorTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp index 36d9eb467d5e..a766b7794db8 100644 --- a/velox/exec/tests/OperatorTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -620,4 +620,108 @@ TEST_F(OperatorTraceTest, traceTableWriter) { ASSERT_EQ(numOutputVectors, testData.numTracedBatches); } } + +TEST_F(OperatorTraceTest, filterProject) { + std::vector inputVectors; + constexpr auto numBatch = 5; + inputVectors.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + inputVectors.push_back(vectorFuzzer_.fuzzInputFlatRow(dataType_)); + } + + struct { + std::string taskRegExpr; + uint64_t maxTracedBytes; + uint8_t numTracedBatches; + bool limitExceeded; + + std::string debugString() const { + return fmt::format( + "taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}", + taskRegExpr, + maxTracedBytes, + numTracedBatches, + limitExceeded); + } + } testSettings[]{ + {".*", 10UL << 30, numBatch, false}, + {".*", 0, numBatch, true}, + {"wrong id", 10UL << 30, 0, false}, + {"test_cursor \\d+", 10UL << 30, numBatch, false}, + {"test_cursor \\d+", 800, 2, true}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + const auto outputDir = TempDirectoryPath::create(); + core::PlanNodeId projectNodeId; + const auto planNode = PlanBuilder() + .values(inputVectors) + .filter("a % 10 < 9") + .project({"a", "b", "a % 100 + c % 50 AS d"}) + .capturePlanNodeId(projectNodeId) + .planNode(); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = + fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + if (testData.limitExceeded) { + VELOX_ASSERT_THROW( + AssertQueryBuilder(planNode) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config( + core::QueryConfig::kQueryTraceMaxBytes, + testData.maxTracedBytes) + .config( + core::QueryConfig::kQueryTraceTaskRegExp, + testData.taskRegExpr) + .config(core::QueryConfig::kQueryTraceNodeIds, projectNodeId) + .copyResults(pool(), task), + "Query exceeded per-query local trace limit of"); + continue; + } + AssertQueryBuilder(planNode) + .maxDrivers(1) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes) + .config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr) + .config(core::QueryConfig::kQueryTraceNodeIds, projectNodeId) + .copyResults(pool(), task); + + const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task); + const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr); + + if (testData.taskRegExpr == "wrong id") { + ASSERT_FALSE(fs->exists(traceRoot)); + continue; + } + + // Query metadata file should exist. + const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir); + ASSERT_TRUE(fs->exists(traceMetaFilePath)); + + const auto opTraceDir = + getOpTraceDirectory(taskTraceDir, projectNodeId, 0, 0); + + ASSERT_EQ(fs->list(opTraceDir).size(), 2); + + const auto summary = OperatorTraceSummaryReader(opTraceDir, pool()).read(); + const auto reader = + trace::OperatorTraceInputReader(opTraceDir, dataType_, pool()); + RowVectorPtr actual; + size_t numOutputVectors{0}; + while (reader.read(actual)) { + const auto& expected = inputVectors[numOutputVectors]; + const auto size = actual->size(); + ASSERT_EQ(size, expected->size()); + for (auto i = 0; i < size; ++i) { + actual->compare(expected.get(), i, i, {.nullsFirst = true}); + } + ++numOutputVectors; + } + ASSERT_EQ(numOutputVectors, testData.numTracedBatches); + } +} } // namespace facebook::velox::exec::trace::test diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 0e0c1169e61e..7390bba8afa4 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -18,6 +18,7 @@ velox_add_library( OperatorReplayerBase.cpp PartitionedOutputReplayer.cpp TableWriterReplayer.cpp + FilterProjectReplayer.cpp TraceReplayRunner.cpp) velox_link_libraries( @@ -34,7 +35,7 @@ velox_link_libraries( glog::glog gflags::gflags) -add_executable(velox_query_replayer TraceReplayerMain.cpp) +add_executable(velox_query_replayer TraceReplayerMain.cpp TraceReplayRunner.cpp) target_link_libraries( velox_query_replayer diff --git a/velox/tool/trace/FilterProjectReplayer.cpp b/velox/tool/trace/FilterProjectReplayer.cpp new file mode 100644 index 000000000000..0a7a4b523a0a --- /dev/null +++ b/velox/tool/trace/FilterProjectReplayer.cpp @@ -0,0 +1,76 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/FilterProjectReplayer.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { +core::PlanNodePtr FilterProjectReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const { + if (node->name() == "Filter") { + const auto* filterNode = dynamic_cast(node); + VELOX_CHECK( + !isFilterProject(dynamic_cast(node)), + "If the target node is a FilterNode, it must be a standalone FilterNode"); + + // A standalone FilterNode. + return std::make_shared( + nodeId, filterNode->filter(), source); + } + + const auto* projectNode = dynamic_cast(node); + + // A standalone ProjectNode. + if (node->sources().empty() || node->sources().front()->name() != "Filter") { + return std::make_shared( + nodeId, projectNode->names(), projectNode->projections(), source); + } + + // A ProjectNode with a FilterNode as its source. + // -- ProjectNode [nodeId] + // -- FilterNode [nodeId - 1] + const auto originalFilterNode = + std::dynamic_pointer_cast( + node->sources().front()); + const auto filterNode = std::make_shared( + nodeId, originalFilterNode->filter(), source); + const auto projectNodeId = planNodeIdGenerator_->next(); + return std::make_shared( + projectNodeId, + projectNode->names(), + projectNode->projections(), + filterNode); +} + +bool FilterProjectReplayer::isFilterProject( + const core::PlanNode* filterNode) const { + const auto* projectNode = + dynamic_cast(core::PlanNode::findFirstNode( + planFragment_.get(), [this](const core::PlanNode* node) { + return node->id() == std::to_string(std::stoull(nodeId_) + 1); + })); + return projectNode != nullptr && projectNode->name() == "Project" && + projectNode->sources().size() == 1 && + projectNode->sources().front()->id() == nodeId_; +} +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/FilterProjectReplayer.h b/velox/tool/trace/FilterProjectReplayer.h new file mode 100644 index 000000000000..775ef5fbf27d --- /dev/null +++ b/velox/tool/trace/FilterProjectReplayer.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/core/PlanNode.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +namespace facebook::velox::tool::trace { +/// The replayer to replay the traced 'FilterProject' operator. +/// +/// NOTE: For the plan fragment involving FilterNode->ProjectNode, users must +/// use the ProjectNode ID for tracing. This is because the planner will combine +/// these two operators into a single FilterProject operator. During replay, +/// the ProjectNode ID will be used to locate the trace data directory. +class FilterProjectReplayer : public OperatorReplayerBase { + public: + FilterProjectReplayer( + const std::string& rootDir, + const std::string& queryId, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType) + : OperatorReplayerBase( + rootDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType) {} + + private: + // Create either a standalone FilterNode, a standalone ProjectNode, or a + // ProjectNode with a FilterNode as its source. + // + // NOTE: If the target node is a FilterNode, it must be a standalone + // FilterNode, without a ProjectNode as its parent. + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& source) const override; + + // Checks whether the FilterNode is a source node of a ProjectNode. + bool isFilterProject(const core::PlanNode* filterNode) const; +}; +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 2083e676f678..463752732dae 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -67,7 +67,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { const auto* replayNode = core::PlanNode::findFirstNode( planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); - return exec::test::PlanBuilder() + return exec::test::PlanBuilder(planNodeIdGenerator_) .traceScan( nodeTraceDir_, pipelineId_, diff --git a/velox/tool/trace/OperatorReplayerBase.h b/velox/tool/trace/OperatorReplayerBase.h index 50e31fdaf02f..6fa1db7adce8 100644 --- a/velox/tool/trace/OperatorReplayerBase.h +++ b/velox/tool/trace/OperatorReplayerBase.h @@ -18,6 +18,7 @@ #include "velox/common/file/FileSystems.h" #include "velox/core/PlanNode.h" +#include "velox/exec/tests/utils/PlanBuilder.h" namespace facebook::velox::exec { class Task; @@ -62,6 +63,9 @@ class OperatorReplayerBase { const std::shared_ptr fs_; const int32_t maxDrivers_; + const std::shared_ptr planNodeIdGenerator_{ + std::make_shared()}; + std::unordered_map queryConfigs_; std::unordered_map> connectorConfigs_; diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index b2cd0d4a0be3..7ed56c6ca653 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -41,6 +41,7 @@ #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/parse/TypeResolver.h" #include "velox/tool/trace/AggregationReplayer.h" +#include "velox/tool/trace/FilterProjectReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" #include "velox/tool/trace/TableWriterReplayer.h" @@ -109,6 +110,14 @@ std::unique_ptr createReplayer() { FLAGS_node_id, FLAGS_pipeline_id, FLAGS_operator_type); + } else if (FLAGS_operator_type == "FilterProject") { + replayer = std::make_unique( + FLAGS_root_dir, + FLAGS_query_id, + FLAGS_task_id, + FLAGS_node_id, + FLAGS_pipeline_id, + FLAGS_operator_type); } else { VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type); } diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index 03d1705eebe7..91b9dcfc73f9 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -15,7 +15,7 @@ add_executable( velox_tool_trace_test AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp - TableWriterReplayerTest.cpp) + TableWriterReplayerTest.cpp FilterProjectReplayerTest.cpp) add_test( NAME velox_tool_trace_test diff --git a/velox/tool/trace/tests/FilterProjectReplayerTest.cpp b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp new file mode 100644 index 000000000000..42657528791d --- /dev/null +++ b/velox/tool/trace/tests/FilterProjectReplayerTest.cpp @@ -0,0 +1,258 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#include + +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/common/testutil/TestValue.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/ArbitratorTestUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/FilterProjectReplayer.h" + +#include "velox/common/file/Utils.h" +#include "velox/exec/PlanNodeStats.h" + +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::common::testutil; +using namespace facebook::velox::common::hll; + +namespace facebook::velox::tool::trace::test { +class FilterProjectReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + void TearDown() override { + input_.clear(); + HiveConnectorTestBase::TearDown(); + } + + struct PlanWithSplits { + core::PlanNodePtr plan; + std::vector splits; + + explicit PlanWithSplits( + core::PlanNodePtr _plan, + std::vector _splits = {}) + : plan(std::move(_plan)), splits(std::move(_splits)) {} + }; + + std::vector + makeVectors(int32_t count, int32_t rowsPerVector, const RowTypePtr& rowType) { + return HiveConnectorTestBase::makeVectors(rowType, count, rowsPerVector); + } + + std::vector makeSplits( + const std::vector& inputs, + const std::string& path, + memory::MemoryPool* writerPool) { + std::vector splits; + for (auto i = 0; i < 4; ++i) { + const std::string filePath = fmt::format("{}/{}", path, i); + writeToFile(filePath, inputs); + splits.emplace_back(makeHiveConnectorSplit(filePath)); + } + + return splits; + } + + enum class PlanMode { + FilterProject = 0, + FilterOnly = 1, + ProjectOnly = 2, + }; + + PlanWithSplits createPlan(PlanMode planMode) { + core::PlanNodePtr plan = nullptr; + if (planMode == PlanMode::FilterProject) { + plan = PlanBuilder() + .tableScan(inputType_) + .filter("c0 % 10 < 9") + .capturePlanNodeId(filterNodeId_) + .project({"c0", "c1", "c0 % 100 + c1 % 50 AS e1"}) + .capturePlanNodeId(projectNodeId_) + .planNode(); + } else if (planMode == PlanMode::FilterOnly) { + plan = PlanBuilder() + .tableScan(inputType_) + .filter("c0 % 10 < 9") + .capturePlanNodeId(filterNodeId_) + .planNode(); + } else { + plan = PlanBuilder() + .tableScan(inputType_) + .project({"c0", "c1", "c0 % 100 + c1 % 50 AS e1"}) + .capturePlanNodeId(projectNodeId_) + .planNode(); + } + const std::vector splits = makeSplits( + input_, fmt::format("{}/splits", testDir_->getPath()), pool()); + return PlanWithSplits{plan, splits}; + } + + core::PlanNodeId traceNodeId_; + core::PlanNodeId filterNodeId_; + core::PlanNodeId projectNodeId_; + RowTypePtr inputType_{ + ROW({"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), SMALLINT(), REAL()})}; + std::vector input_ = makeVectors(5, 100, inputType_); + + const std::shared_ptr testDir_ = + TempDirectoryPath::create(); +}; + +TEST_F(FilterProjectReplayerTest, filterProject) { + const auto planWithSplits = createPlan(PlanMode::FilterProject); + AssertQueryBuilder builder(planWithSplits.plan); + const auto result = builder.splits(planWithSplits.splits).copyResults(pool()); + + const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "basic"); + const auto tracePlanWithSplits = createPlan(PlanMode::FilterProject); + std::shared_ptr task; + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(2) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config( + core::QueryConfig::kQueryTraceNodeIds, + fmt::format("{},{}", filterNodeId_, projectNodeId_)); + auto traceResult = + traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + auto replayingResult = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + projectNodeId_, + 0, + "HashJoin") + .run(); + assertEqualResults({result}, {replayingResult}); +} + +TEST_F(FilterProjectReplayerTest, filterOnly) { + const auto planWithSplits = createPlan(PlanMode::FilterOnly); + AssertQueryBuilder builder(planWithSplits.plan); + const auto result = builder.splits(planWithSplits.splits).copyResults(pool()); + + const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "filter"); + const auto tracePlanWithSplits = createPlan(PlanMode::FilterOnly); + std::shared_ptr task; + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(2) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config( + core::QueryConfig::kQueryTraceNodeIds, + fmt::format("{},{}", filterNodeId_, projectNodeId_)); + auto traceResult = + traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + auto replayingResult = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + filterNodeId_, + 0, + "HashJoin") + .run(); + assertEqualResults({result}, {replayingResult}); +} + +TEST_F(FilterProjectReplayerTest, projectOnly) { + const auto planWithSplits = createPlan(PlanMode::ProjectOnly); + AssertQueryBuilder builder(planWithSplits.plan); + const auto result = builder.splits(planWithSplits.splits).copyResults(pool()); + + const auto traceRoot = fmt::format("{}/{}", testDir_->getPath(), "project"); + const auto tracePlanWithSplits = createPlan(PlanMode::ProjectOnly); + std::shared_ptr task; + AssertQueryBuilder traceBuilder(tracePlanWithSplits.plan); + traceBuilder.maxDrivers(2) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config( + core::QueryConfig::kQueryTraceNodeIds, + fmt::format("{},{}", filterNodeId_, projectNodeId_)); + auto traceResult = + traceBuilder.splits(tracePlanWithSplits.splits).copyResults(pool(), task); + + assertEqualResults({result}, {traceResult}); + + const auto taskId = task->taskId(); + auto replayingResult = FilterProjectReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + projectNodeId_, + 0, + "HashJoin") + .run(); + assertEqualResults({result}, {replayingResult}); +} +} // namespace facebook::velox::tool::trace::test