Skip to content

Commit

Permalink
Add hash join replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Nov 5, 2024
1 parent 2ade7f7 commit d265d13
Show file tree
Hide file tree
Showing 13 changed files with 702 additions and 10 deletions.
4 changes: 4 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ void Operator::maybeSetTracer() {
}
tracedOpMap.emplace(operatorId(), operatorType());

if (!trace::canTrace(operatorType())) {
VELOX_UNSUPPORTED("{} does not support tracing", operatorType());
}

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace input for operator type: " << operatorType()
Expand Down
10 changes: 8 additions & 2 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <utility>

#include "velox/exec/OperatorTraceReader.h"

#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {
Expand All @@ -32,10 +31,13 @@ OperatorTraceInputReader::OperatorTraceInputReader(
pool_(pool),
inputStream_(getInputStream()) {
VELOX_CHECK_NOT_NULL(dataType_);
VELOX_CHECK_NOT_NULL(inputStream_);
}

bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
if (inputStream_ == nullptr) {
return false;
}

if (inputStream_->atEnd()) {
batch = nullptr;
return false;
Expand All @@ -49,6 +51,10 @@ bool OperatorTraceInputReader::read(RowVectorPtr& batch) const {
std::unique_ptr<common::FileInputStream>
OperatorTraceInputReader::getInputStream() const {
auto traceFile = fs_->openFileForRead(getOpTraceInputFilePath(traceDir_));
if (traceFile->size() == 0) {
LOG(WARNING) << "Operator trace input data file is empty in " << traceDir_;
return nullptr;
}
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(traceFile), 1 << 20, pool_);
Expand Down
12 changes: 12 additions & 0 deletions velox/exec/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,16 @@ size_t getNumDrivers(
const std::shared_ptr<filesystems::FileSystem>& fs) {
return listDriverIds(nodeTraceDir, pipelineId, fs).size();
}

bool canTrace(const std::string& operatorType) {
static const std::unordered_set<std::string> kSupportedOperatorTypes{
"FilterProject",
"TableWrite",
"Aggregation",
"PartialAggregation",
"PartitionedOutput",
"HashBuild",
"HashProbe"};
return kSupportedOperatorTypes.count(operatorType) > 0;
}
} // namespace facebook::velox::exec::trace
3 changes: 3 additions & 0 deletions velox/exec/TraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,7 @@ std::vector<std::string> getTaskIds(
folly::dynamic getTaskMetadata(
const std::string& taskMetaFilePath,
const std::shared_ptr<filesystems::FileSystem>& fs);

/// Checks whether the operator can be traced.
bool canTrace(const std::string& operatorType);
} // namespace facebook::velox::exec::trace
168 changes: 166 additions & 2 deletions velox/exec/tests/OperatorTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ TEST_F(OperatorTraceTest, task) {
}

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId hashJoinNodeId;
const auto planNode =
PlanBuilder(planNodeIdGenerator)
.values(rows, false)
Expand All @@ -349,6 +350,7 @@ TEST_F(OperatorTraceTest, task) {
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.capturePlanNodeId(hashJoinNodeId)
.planNode();
const auto expectedResult =
AssertQueryBuilder(planNode).maxDrivers(1).copyResults(pool());
Expand All @@ -374,7 +376,7 @@ TEST_F(OperatorTraceTest, task) {
std::to_string(100UL << 30)},
{core::QueryConfig::kQueryTraceDir, outputDir->getPath()},
{core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr},
{core::QueryConfig::kQueryTraceNodeIds, "1,2"},
{core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId},
{"key1", "value1"},
};

Expand Down Expand Up @@ -595,7 +597,7 @@ TEST_F(OperatorTraceTest, traceTableWriter) {
continue;
}

// Query metadta file should exist.
// Query metadata file should exist.
const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir);
ASSERT_TRUE(fs->exists(traceMetaFilePath));

Expand Down Expand Up @@ -724,4 +726,166 @@ TEST_F(OperatorTraceTest, filterProject) {
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}

TEST_F(OperatorTraceTest, hashJoin) {
std::vector<RowVectorPtr> probeInput;
RowTypePtr probeType =
ROW({"c0", "c1", "c2"}, {BIGINT(), TINYINT(), VARCHAR()});
constexpr auto numBatch = 5;
probeInput.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
probeInput.push_back(vectorFuzzer_.fuzzInputFlatRow(probeType));
}

std::vector<RowVectorPtr> buildInput;
RowTypePtr buildType =
ROW({"u0", "u1", "u2"}, {BIGINT(), SMALLINT(), BIGINT()});
buildInput.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
buildInput.push_back(vectorFuzzer_.fuzzInputFlatRow(buildType));
}

struct {
std::string taskRegExpr;
uint64_t maxTracedBytes;
uint8_t numTracedBatches;
bool limitExceeded;

std::string debugString() const {
return fmt::format(
"taskRegExpr: {}, maxTracedBytes: {}, numTracedBatches: {}, limitExceeded {}",
taskRegExpr,
maxTracedBytes,
numTracedBatches,
limitExceeded);
}
} testSettings[]{
{".*", 10UL << 30, numBatch, false},
{".*", 0, numBatch, true},
{"wrong id", 10UL << 30, 0, false},
{"test_cursor \\d+", 10UL << 30, numBatch, false},
{"test_cursor \\d+", 800, 2, true}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto outputDir = TempDirectoryPath::create();
const auto planNodeIdGenerator{
std::make_shared<core::PlanNodeIdGenerator>()};
core::PlanNodeId hashJoinNodeId;
const auto planNode = PlanBuilder(planNodeIdGenerator)
.values(probeInput, false)
.hashJoin(
{"c0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.values(buildInput, true)
.planNode(),
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.capturePlanNodeId(hashJoinNodeId)
.planNode();
const auto testDir = TempDirectoryPath::create();
const auto traceRoot =
fmt::format("{}/{}", testDir->getPath(), "traceRoot");
std::shared_ptr<Task> task;
if (testData.limitExceeded) {
VELOX_ASSERT_THROW(
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(
core::QueryConfig::kQueryTraceMaxBytes,
testData.maxTracedBytes)
.config(
core::QueryConfig::kQueryTraceTaskRegExp,
testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId)
.copyResults(pool(), task),
"Query exceeded per-query local trace limit of");
continue;
}
AssertQueryBuilder(planNode)
.maxDrivers(1)
.config(core::QueryConfig::kQueryTraceEnabled, true)
.config(core::QueryConfig::kQueryTraceDir, traceRoot)
.config(core::QueryConfig::kQueryTraceMaxBytes, testData.maxTracedBytes)
.config(core::QueryConfig::kQueryTraceTaskRegExp, testData.taskRegExpr)
.config(core::QueryConfig::kQueryTraceNodeIds, hashJoinNodeId)
.copyResults(pool(), task);

const auto taskTraceDir = getTaskTraceDirectory(traceRoot, *task);
const auto fs = filesystems::getFileSystem(taskTraceDir, nullptr);

if (testData.taskRegExpr == "wrong id") {
ASSERT_FALSE(fs->exists(traceRoot));
continue;
}

// Query metadata file should exist.
const auto traceMetaFilePath = getTaskTraceMetaFilePath(taskTraceDir);
ASSERT_TRUE(fs->exists(traceMetaFilePath));

for (uint32_t pipelineId = 0; pipelineId < 2; ++pipelineId) {
const auto opTraceProbeDir =
getOpTraceDirectory(taskTraceDir, hashJoinNodeId, pipelineId, 0);

ASSERT_EQ(fs->list(opTraceProbeDir).size(), 2);

const auto summary =
OperatorTraceSummaryReader(opTraceProbeDir, pool()).read();
RowTypePtr dataType;
if (pipelineId == 0) {
dataType = probeType;
} else {
dataType = buildType;
}
const auto reader =
trace::OperatorTraceInputReader(opTraceProbeDir, dataType, pool());
RowVectorPtr actual;
size_t numOutputVectors{0};
RowVectorPtr expected;
if (pipelineId == 0) {
expected = probeInput[numOutputVectors];
} else {
expected = buildInput[numOutputVectors];
}
while (reader.read(actual)) {
const auto size = actual->size();
ASSERT_EQ(size, expected->size());
for (auto i = 0; i < size; ++i) {
actual->compare(expected.get(), i, i, {.nullsFirst = true});
}
++numOutputVectors;
}
ASSERT_EQ(numOutputVectors, testData.numTracedBatches);
}
}
}

TEST_F(OperatorTraceTest, canTrace) {
struct {
const std::string operatorType;
const bool canTrace;

std::string debugString() const {
return fmt::format(
"operatorType: {}, canTrace: {}", operatorType, canTrace);
}
} testSettings[] = {
{"PartitionedOutput", true},
{"HashBuild", true},
{"HashProbe", true},
{"RowNumber", false},
{"OrderBy", false},
{"PartialAggregation", true},
{"Aggregation", true},
{"TableWrite", true},
{"FilterProject", true}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
ASSERT_EQ(testData.canTrace, trace::canTrace(testData.operatorType));
}
}
} // namespace facebook::velox::exec::trace::test
1 change: 1 addition & 0 deletions velox/tool/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ velox_add_library(
velox_query_trace_replayer_base
AggregationReplayer.cpp
FilterProjectReplayer.cpp
HashJoinReplayer.cpp
OperatorReplayerBase.cpp
PartitionedOutputReplayer.cpp
TableWriterReplayer.cpp
Expand Down
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/HashJoinReplayer.h"
#include "velox/exec/TraceUtil.h"
#include "velox/exec/tests/utils/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;

namespace facebook::velox::tool::trace {
core::PlanNodePtr HashJoinReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
const auto* hashJoinNode = dynamic_cast<const core::HashJoinNode*>(node);
return std::make_shared<core::HashJoinNode>(
nodeId,
hashJoinNode->joinType(),
hashJoinNode->isNullAware(),
hashJoinNode->leftKeys(),
hashJoinNode->rightKeys(),
hashJoinNode->filter(),
source,
PlanBuilder(planNodeIdGenerator_)
.traceScan(
nodeTraceDir_,
pipelineId_ + 1, // Build side
exec::trace::getDataType(planFragment_, nodeId_, 1))
.planNode(),
hashJoinNode->outputType());
}
} // namespace facebook::velox::tool::trace
47 changes: 47 additions & 0 deletions velox/tool/trace/HashJoinReplayer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {
/// The replayer to replay the traced 'HashJoin' operator.
class HashJoinReplayer final : public OperatorReplayerBase {
public:
HashJoinReplayer(
const std::string& rootDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const int32_t pipelineId,
const std::string& operatorType)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
pipelineId,
operatorType) {}

private:
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;
};
} // namespace facebook::velox::tool::trace
2 changes: 1 addition & 1 deletion velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "velox/common/file/FileSystems.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/parse/PlanNodeIdGenerator.h"

namespace facebook::velox::exec {
class Task;
Expand Down
Loading

0 comments on commit d265d13

Please sign in to comment.