Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HashJoinReplayer #11271

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ void Operator::maybeSetTracer() {
}
tracedOpMap.emplace(operatorId(), operatorType());

if (!trace::canTrace(operatorType())) {
VELOX_UNSUPPORTED("{} does not support tracing", operatorType());
}

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace input for operator type: " << operatorType()
Expand Down
10 changes: 8 additions & 2 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <utility>

#include "velox/exec/OperatorTraceReader.h"

#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {
Expand All @@ -32,10 +31,13 @@ OperatorTraceInputReader::OperatorTraceInputReader(
pool_(pool),
inputStream_(getInputStream()) {
VELOX_CHECK_NOT_NULL(dataType_);
VELOX_CHECK_NOT_NULL(inputStream_);
}

bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
if (inputStream_ == nullptr) {
return false;
}

if (inputStream_->atEnd()) {
batch = nullptr;
return false;
Expand All @@ -49,6 +51,10 @@ bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
std::unique_ptr<common::FileInputStream>
OperatorTraceInputReader::getInputStream() const {
auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_));
if (traceFile->size() == 0) {
LOG(WARNING) << "Operator trace input data file is empty in " << traceDir_;
return nullptr;
}
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(traceFile), 1 << 20, pool_);
Expand Down
12 changes: 12 additions & 0 deletions velox/exec/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,16 @@ size_t getNumDrivers(
const std::shared_ptr<filesystems::FileSystem>& fs) {
return listDriverIds(nodeTraceDir, pipelineId, fs).size();
}

bool canTrace(const std::string& operatorType) {
static const std::unordered_set<std::string> kSupportedOperatorTypes{
"FilterProject",
"TableWrite",
"Aggregation",
"PartialAggregation",
"PartitionedOutput",
"HashBuild",
"HashProbe"};
return kSupportedOperatorTypes.count(operatorType) > 0;
}
} // namespace facebook::velox::exec::trace
3 changes: 3 additions & 0 deletions velox/exec/TraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,7 @@ std::vector<std::string> getTaskIds(
folly::dynamic getTaskMetadata(
const std::string& taskMetaFilePath,
const std::shared_ptr<filesystems::FileSystem>& fs);

/// Checks whether the operator can be traced.
bool canTrace(const std::string& operatorType);
} // namespace facebook::velox::exec::trace
168 changes: 166 additions & 2 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ TEST_F(OperatorTraceTest, task) {
}

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId hashJoinNodeId;
const auto planNode =
PlanBuilder(planNodeIdGenerator)
.values(rows, false)
Expand All @@ -349,6 +350,7 @@ TEST_F(OperatorTraceTest, task) {
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.capturePlanNodeId(hashJoinNodeId)
.planNode();
const auto expectedResult =
AssertQueryBuilder(planNode).maxDrivers(1).copyResults(pool());
Expand All @@ -374,7 +376,7 @@ TEST_F(OperatorTraceTest, task) {
std::to_string(100UL << 30)},
{core::QueryConfig::kQueryTraceDir, outputDir->getPath()},
{core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr},
{core::QueryConfig::kQueryTraceNodeIds, "1,2"},
{core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId},
{"key1", "value1"},
};

Expand Down Expand Up @@ -595,7 +597,7 @@ TEST_F(OperatorTraceTest, traceTableWriter) {
continue;
}

// Query metadta file should exist.
// Query metadata file should exist.
const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir);
ASSERT_TRUE(fs->exists(traceMetaFilePath));

Expand Down Expand Up @@ -724,4 +726,166 @@ TEST_F(OperatorTraceTest, filterProject) {
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}

TEST_F(OperatorTraceTest, hashJoin) {
std::vector<RowVectorPtr> probeInput;
RowTypePtr probeType =
ROW({"c0", "c1", "c2"}, {BIGINT(), TINYINT(), VARCHAR()});
constexpr auto numBatch = 5;
probeInput.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
probeInput.push_back(vectorFuzzer_.fuzzInputFlatRow(probeType));
}

std::vector<RowVectorPtr> buildInput;
RowTypePtr buildType =
ROW({"u0", "u1", "u2"}, {BIGINT(), SMALLINT(), BIGINT()});
buildInput.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
buildInput.push_back(vectorFuzzer_.fuzzInputFlatRow(buildType));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, true},
{"wrong id", 10UL << 30, 0, false},
{"test_cursor \\d+", 10UL << 30, numBatch, false},
{"test_cursor \\d+", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
const auto planNodeIdGenerator{
std::make_shared<core::PlanNodeIdGenerator>()};
core::PlanNodeId hashJoinNodeId;
const auto planNode = PlanBuilder(planNodeIdGenerator)
.values(probeInput, false)
.hashJoin(
{"c0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.values(buildInput, true)
.planNode(),
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.capturePlanNodeId(hashJoinNodeId)
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
if (testData.limitExceeded) {
VELOX_ASSERT_THROW(
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(
core::QueryConfig::kQueryTraceMaxBytes,
testData.maxTracedBytes)
.config(
core::QueryConfig::kQueryTraceTaskRegExp,
testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId)
.copyResults(pool(), task),
"Query exceeded per-query local trace limit of");
continue;
}
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId)
.copyResults(pool(), task);

const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task);
const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadata file should exist.
const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir);
ASSERT_TRUE(fs->exists(traceMetaFilePath));

for (uint32_t pipelineId = 0; pipelineId < 2; ++pipelineId) {
const auto opTraceProbeDir =
getOpTraceDirectory(taskTraceDir, hashJoinNodeId, pipelineId, 0);

ASSERT_EQ(fs->list(opTraceProbeDir).size(), 2);

const auto summary =
OperatorTraceSummaryReader(opTraceProbeDir, pool()).read();
RowTypePtr dataType;
if (pipelineId == 0) {
dataType = probeType;
} else {
dataType = buildType;
}
const auto reader =
trace::OperatorTraceInputReader(opTraceProbeDir, dataType, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
RowVectorPtr expected;
if (pipelineId == 0) {
expected = probeInput[numOutputVectors];
} else {
expected = buildInput[numOutputVectors];
}
while (reader.read(actual)) {
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}
}

TEST_F(OperatorTraceTest, canTrace) {
struct {
const std::string operatorType;
const bool canTrace;

std::string debugString() const {
return fmt::format(
"operatorType: {}, canTrace: {}", operatorType, canTrace);
}
} testSettings[] = {
{"PartitionedOutput", true},
{"HashBuild", true},
{"HashProbe", true},
{"RowNumber", false},
{"OrderBy", false},
{"PartialAggregation", true},
{"Aggregation", true},
{"TableWrite", true},
{"FilterProject", true}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
ASSERT_EQ(testData.canTrace, trace::canTrace(testData.operatorType));
}
}
} // namespace facebook::velox::exec::trace::test
1 change: 1 addition & 0 deletions velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ velox_add_library(
velox_query_trace_replayer_base
AggregationReplayer.cpp
FilterProjectReplayer.cpp
HashJoinReplayer.cpp
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp
Expand Down
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/HashJoinReplayer.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr HashJoinReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
duanmeng marked this conversation as resolved.
Show resolved Hide resolved
const auto* hashJoinNode = dynamic_cast<const core::HashJoinNode*>(node);
return std::make_shared<core::HashJoinNode>(
nodeId,
hashJoinNode->joinType(),
hashJoinNode->isNullAware(),
hashJoinNode->leftKeys(),
hashJoinNode->rightKeys(),
hashJoinNode->filter(),
source,
PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeTraceDir_,
pipelineId_ + 1, // Build side
exec::trace::getDataType(planFragment_, nodeId_, 1))
.planNode(),
hashJoinNode->outputType());
}
} // namespace facebook::velox::tool::trace
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'HashJoin' operator.
class HashJoinReplayer final : public OperatorReplayerBase {
public:
HashJoinReplayer(
const std::string& rootDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/parse/PlanNodeIdGenerator.h"

namespace facebook::velox::exec {
class Task;
Expand Down
Loading
Loading