Skip to content

Commit

Permalink
Add TableScanReplayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Nov 8, 2024
1 parent d1bf9da commit ebb7e09
Show file tree
Hide file tree
Showing 23 changed files with 986 additions and 50 deletions.
34 changes: 26 additions & 8 deletions velox/connectors/hive/HiveConnectorSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const {
customSplitInfoObj[key] = value;
}
obj["customSplitInfo"] = customSplitInfoObj;
obj["extraFileInfo"] = *extraFileInfo;
obj["extraFileInfo"] =
extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo);

folly::dynamic serdeParametersObj = folly::dynamic::object;
for (const auto& [key, value] : serdeParameters) {
Expand All @@ -84,8 +85,14 @@ folly::dynamic HiveConnectorSplit::serialize() const {
? folly::dynamic(properties->modificationTime.value())
: nullptr;
obj["properties"] = propertiesObj;
} else {
obj["properties"] = nullptr;
}

if (rowIdProperties.has_value()) {
folly::dynamic rowIdObj = folly::dynamic::object;
rowIdObj["metadataVersion"] = rowIdProperties->metadataVersion;
rowIdObj["partitionId"] = rowIdProperties->partitionId;
rowIdObj["tableGuid"] = rowIdProperties->tableGuid;
obj["rowIdProperties"] = rowIdObj;
}

return obj;
Expand Down Expand Up @@ -118,8 +125,9 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
customSplitInfo[key.asString()] = value.asString();
}

std::shared_ptr<std::string> extraFileInfo =
std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::shared_ptr<std::string> extraFileInfo = obj["extraFileInfo"].isNull()
? nullptr
: std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::unordered_map<std::string, std::string> serdeParameters;
for (const auto& [key, value] : obj["serdeParameters"].items()) {
serdeParameters[key.asString()] = value.asString();
Expand All @@ -131,8 +139,8 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
}

std::optional<FileProperties> properties = std::nullopt;
const auto propertiesObj = obj["properties"];
if (!propertiesObj.isNull()) {
const auto& propertiesObj = obj.getDefault("properties", nullptr);
if (propertiesObj != nullptr) {
properties = FileProperties{
propertiesObj["fileSize"].isNull()
? std::nullopt
Expand All @@ -142,6 +150,15 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
: std::optional(propertiesObj["modificationTime"].asInt())};
}

std::optional<RowIdProperties> rowIdProperties = std::nullopt;
const auto& rowIdObj = obj.getDefault("rowIdProperties", nullptr);
if (rowIdObj != nullptr) {
rowIdProperties = RowIdProperties{
.metadataVersion = rowIdObj["metadataVersion"].asInt(),
.partitionId = rowIdObj["partitionId"].asInt(),
.tableGuid = rowIdObj["tableGuid"].asString()};
}

return std::make_shared<HiveConnectorSplit>(
connectorId,
filePath,
Expand All @@ -155,7 +172,8 @@ std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
serdeParameters,
splitWeight,
infoColumns,
properties);
properties,
rowIdProperties);
}

// static
Expand Down
6 changes: 4 additions & 2 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
int64_t _splitWeight = 0,
const std::unordered_map<std::string, std::string>& _infoColumns = {},
std::optional<FileProperties> _properties = std::nullopt)
std::optional<FileProperties> _properties = std::nullopt,
std::optional<RowIdProperties> _rowIdProperties = std::nullopt)
: ConnectorSplit(connectorId, _splitWeight),
filePath(_filePath),
fileFormat(_fileFormat),
Expand All @@ -96,7 +97,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
extraFileInfo(_extraFileInfo),
serdeParameters(_serdeParameters),
infoColumns(_infoColumns),
properties(_properties) {}
properties(_properties),
rowIdProperties(_rowIdProperties) {}

std::string toString() const override;

Expand Down
30 changes: 27 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
ASSERT_EQ(value, clone->customSplitInfo.at(key));
}

ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
if (split.extraFileInfo != nullptr) {
ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
} else {
ASSERT_EQ(clone->extraFileInfo, nullptr);
}
ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size());
for (const auto& [key, value] : split.serdeParameters) {
ASSERT_EQ(value, clone->serdeParameters.at(key));
Expand Down Expand Up @@ -235,7 +239,9 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
FileProperties fileProperties{
.fileSize = 2048, .modificationTime = std::nullopt};
const auto properties = std::optional<FileProperties>(fileProperties);
const auto split = HiveConnectorSplit(
RowIdProperties rowIdProperties{
.metadataVersion = 2, .partitionId = 3, .tableGuid = "test"};
const auto split1 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
Expand All @@ -248,8 +254,26 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
serdeParameters,
splitWeight,
infoColumns,
properties,
rowIdProperties);
testSerde(split1);

const auto split2 = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
start,
length,
{},
tableBucketNumber,
customSplitInfo,
nullptr,
{},
splitWeight,
{},
std::nullopt,
std::nullopt);
testSerde(split);
testSerde(split2);
}

} // namespace
Expand Down
29 changes: 24 additions & 5 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ void Operator::maybeSetTracer() {
const auto opTraceDirPath = trace::getOpTraceDirectory(
traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId);
trace::createTraceDirectory(opTraceDirPath);
inputTracer_ = std::make_unique<trace::OperatorTraceWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
traceConfig->updateAndCheckTraceLimitCB);

if (operatorType() == "TableScan") {
setupSplitTracer(opTraceDirPath);
} else {
setupInputTracer(opTraceDirPath);
}
}

void Operator::traceInput(const RowVectorPtr& input) {
Expand All @@ -152,9 +153,14 @@ void Operator::traceInput(const RowVectorPtr& input) {
}

void Operator::finishTrace() {
VELOX_CHECK(inputTracer_ == nullptr || splitTracer_ == nullptr);
if (inputTracer_ != nullptr) {
inputTracer_->finish();
}

if (splitTracer_ != nullptr) {
splitTracer_->finish();
}
}

std::vector<std::unique_ptr<Operator::PlanNodeTranslator>>&
Expand All @@ -163,6 +169,19 @@ Operator::translators() {
return translators;
}

void Operator::setupInputTracer(const std::string& opTraceDirPath) {
inputTracer_ = std::make_unique<trace::OperatorTraceInputWriter>(
this,
opTraceDirPath,
memory::traceMemoryPool(),
operatorCtx_->driverCtx()->traceConfig()->updateAndCheckTraceLimitCB);
}

void Operator::setupSplitTracer(const std::string& opTraceDirPath) {
splitTracer_ =
std::make_unique<trace::OperatorTraceSplitWriter>(this, opTraceDirPath);
}

// static
std::unique_ptr<Operator> Operator::fromPlanNode(
DriverCtx* ctx,
Expand Down
17 changes: 14 additions & 3 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,8 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
/// Invoked to setup query data or split writer for this operator if the
/// associated query plan node is configured to collect trace.
void maybeSetTracer();

/// Creates output vector from 'input_' and 'results' according to
Expand Down Expand Up @@ -774,7 +774,12 @@ class Operator : public BaseRuntimeStatWriter {

folly::Synchronized<OperatorStats> stats_;
folly::Synchronized<common::SpillStats> spillStats_;
std::unique_ptr<trace::OperatorTraceWriter> inputTracer_;

/// NOTE: only one of the two could be set for an operator for tracing .
/// 'splitTracer_' is only set for table scan to record the processed split
/// for now.
std::unique_ptr<trace::OperatorTraceInputWriter> inputTracer_{nullptr};
std::unique_ptr<trace::OperatorTraceSplitWriter> splitTracer_{nullptr};

/// Indicates if an operator is under a non-reclaimable execution section.
/// This prevents the memory arbitrator from reclaiming memory from this
Expand All @@ -799,6 +804,12 @@ class Operator : public BaseRuntimeStatWriter {

std::unordered_map<column_index_t, std::shared_ptr<common::Filter>>
dynamicFilters_;

private:
// Setup 'inputTracer_' to record the processed input vectors.
void setupInputTracer(const std::string& traceDir);
// Setup 'splitTracer_' for table scan to record the processed split.
void setupSplitTracer(const std::string& traceDir);
};

/// Given a row type returns indices for the specified subset of columns.
Expand Down
68 changes: 66 additions & 2 deletions velox/exec/OperatorTraceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

#include <utility>

#include <folly/hash/Checksum.h>
#include "velox/common/file/FileInputStream.h"
#include "velox/exec/OperatorTraceReader.h"

#include "velox/exec/TraceUtil.h"

namespace facebook::velox::exec::trace {

OperatorTraceInputReader::OperatorTraceInputReader(
std::string traceDir,
RowTypePtr dataType,
Expand Down Expand Up @@ -83,4 +83,68 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const {
summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt();
return summary;
}

OperatorTraceSplitReader::OperatorTraceSplitReader(
std::vector<std::string> traceDirs,
memory::MemoryPool* pool)
: traceDirs_(std::move(traceDirs)),
fs_(filesystems::getFileSystem(traceDirs_[0], nullptr)),
pool_(pool) {
VELOX_CHECK_NOT_NULL(fs_);
}

std::vector<std::string> OperatorTraceSplitReader::read() const {
std::vector<std::string> splits;
for (const auto& traceDir : traceDirs_) {
auto stream = getSplitInputStream(traceDir);
if (stream == nullptr) {
continue;
}
auto curSplits = deserialize(stream.get());
splits.insert(
splits.end(),
std::make_move_iterator(curSplits.begin()),
std::make_move_iterator(curSplits.end()));
}
return splits;
}

std::unique_ptr<common::FileInputStream>
OperatorTraceSplitReader::getSplitInputStream(
const std::string& traceDir) const {
auto splitInfoFile = fs_->openFileForRead(getOpTraceSplitFilePath(traceDir));
if (splitInfoFile->size() == 0) {
LOG(WARNING) << "Split info is empty in " << traceDir;
return nullptr;
}
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(splitInfoFile), 1 << 20, pool_);
}

// static
std::vector<std::string> OperatorTraceSplitReader::deserialize(
common::FileInputStream* stream) {
std::vector<std::string> splits;
try {
while (!stream->atEnd()) {
const auto length = stream->read<uint32_t>();
std::string splitStr(length, '\0');
stream->readBytes(reinterpret_cast<uint8_t*>(splitStr.data()), length);
const auto crc32 = stream->read<uint32_t>();
const auto actualCrc32 = folly::crc32(
reinterpret_cast<const uint8_t*>(splitStr.data()), splitStr.size());
if (crc32 != actualCrc32) {
LOG(ERROR) << "Failed to verify the split checksum " << crc32
<< " which does not equal to the actual computed checksum "
<< actualCrc32;
break;
}
splits.push_back(std::move(splitStr));
}
} catch (const VeloxException& e) {
LOG(ERROR) << "Failed to deserialize split: " << e.message();
}
return splits;
}
} // namespace facebook::velox::exec::trace
30 changes: 29 additions & 1 deletion velox/exec/OperatorTraceReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

#include "velox/common/file/FileInputStream.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/Split.h"
#include "velox/exec/Trace.h"
#include "velox/serializers/PrestoSerializer.h"

namespace facebook::velox::exec::trace {

/// Used to read an operator trace input.
class OperatorTraceInputReader {
public:
Expand Down Expand Up @@ -66,4 +66,32 @@ class OperatorTraceSummaryReader {
memory::MemoryPool* const pool_;
const std::unique_ptr<ReadFile> summaryFile_;
};

/// Used to load the input splits from a set of traced 'TableScan' operators for
/// replay.
///
/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will
/// be extended to handle more types of splits, such as
/// 'IcebergHiveConnectorSplit'.
class OperatorTraceSplitReader {
public:
/// 'traceDirs' provides a list of directories with each one containing the
/// traced split info file for one table scan operator.
explicit OperatorTraceSplitReader(
std::vector<std::string> traceDirs,
memory::MemoryPool* pool);

/// Reads and deserializes all the traced split strings.
std::vector<std::string> read() const;

private:
static std::vector<std::string> deserialize(common::FileInputStream* stream);

std::unique_ptr<common::FileInputStream> getSplitInputStream(
const std::string& traceDir) const;

const std::vector<std::string> traceDirs_;
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
};
} // namespace facebook::velox::exec::trace
Loading

0 comments on commit ebb7e09

Please sign in to comment.