Skip to content

Commit

Permalink
Add replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 15, 2024
1 parent 94fdd31 commit 73387d0
Show file tree
Hide file tree
Showing 19 changed files with 618 additions and 104 deletions.
14 changes: 10 additions & 4 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,27 @@ void Operator::maybeSetTracer() {
return;
}
tracedOpMap.emplace(operatorId(), operatorType());

const auto pipelineId = operatorCtx_->driverCtx()->pipelineId;
const auto driverId = operatorCtx_->driverCtx()->driverId;
LOG(INFO) << "Trace data for operator type: " << operatorType()
<< ", operator id: " << operatorId() << ", pipeline: " << pipelineId
<< ", driver: " << driverId << ", task: " << taskId();
const auto opTraceDirPath = fmt::format(
"{}/{}/{}/{}/data",
"{}/{}/{}/{}",
queryTraceConfig->queryTraceDir,
planNodeId(),
pipelineId,
driverId);
trace::createTraceDirectory(opTraceDirPath);
setupTracer(opTraceDirPath);
}

void Operator::setupTracer(const std::string& traceDir) {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
const auto opTraceDataPath = fmt::format(
"{}/{}", traceDir, trace::QueryTraceTraits::kTraceDataDirName);
trace::createTraceDirectory(opTraceDataPath);
inputTracer_ = std::make_unique<trace::QueryDataWriter>(
opTraceDirPath,
opTraceDataPath,
memory::traceMemoryPool(),
queryTraceConfig->updateAndCheckTraceLimitCB);
}
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -740,10 +740,12 @@ class Operator : public BaseRuntimeStatWriter {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

/// Invoked to setup query data writer for this operator if the associated
/// query plan node is configured to collect trace.
/// Invoked to setup query data/split writer for this operator if the
/// associated query plan node is configured to collect trace.
void maybeSetTracer();

virtual void setupTracer(const std::string& traceDir);

/// Creates output vector from 'input_' and 'results' according to
/// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to
/// nullptr, the children of the output vector will be identical to their
Expand Down
66 changes: 36 additions & 30 deletions velox/exec/QuerySplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,47 +28,53 @@ using namespace facebook::velox::connector::hive;
namespace facebook::velox::exec::trace {

QuerySplitReader::QuerySplitReader(
std::string traceDir,
std::vector<std::string> traceDir,
memory::MemoryPool* pool)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
pool_(pool),
splitInfoStream_(getSplitInputStream()) {
: traceDirs_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDirs_[0], nullptr)),
pool_(pool) {
VELOX_CHECK_NOT_NULL(fs_);
VELOX_CHECK_NOT_NULL(splitInfoStream_);
}

std::vector<exec::Split> QuerySplitReader::read() const {
const auto splitStrings = getSplitInfos(splitInfoStream_.get());
std::vector<exec::Split> splits;
for (const auto& splitString : splitStrings) {
folly::dynamic splitInfoObj = folly::parseJson(splitString);
const auto split =
ISerializable::deserialize<HiveConnectorSplit>(splitInfoObj);
splits.emplace_back(
std::make_shared<HiveConnectorSplit>(
split->connectorId,
split->filePath,
split->fileFormat,
split->start,
split->length,
split->partitionKeys,
split->tableBucketNumber,
split->customSplitInfo,
split->extraFileInfo,
split->serdeParameters,
split->splitWeight,
split->infoColumns,
split->properties),
-1);
for (const auto& traceDir : traceDirs_) {
auto splitInfoStream = getSplitInputStream(traceDir);
if (splitInfoStream == nullptr) {
continue;
}
const auto splitStrs = getSplitInfos(splitInfoStream.get());
for (const auto& splitStr : splitStrs) {
folly::dynamic splitInfoObj = folly::parseJson(splitStr);
const auto split =
ISerializable::deserialize<HiveConnectorSplit>(splitInfoObj);
splits.emplace_back(std::make_shared<HiveConnectorSplit>(
split->connectorId,
split->filePath,
split->fileFormat,
split->start,
split->length,
split->partitionKeys,
split->tableBucketNumber,
split->customSplitInfo,
split->extraFileInfo,
split->serdeParameters,
split->splitWeight,
split->infoColumns,
split->properties));
}
}
return splits;
}

std::unique_ptr<common::FileInputStream> QuerySplitReader::getSplitInputStream()
const {
std::unique_ptr<common::FileInputStream> QuerySplitReader::getSplitInputStream(
const std::string& traceDir) const {
auto splitInfoFile = fs_->openFileForRead(
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kSplitInfoFileName));
fmt::format("{}/{}", traceDir, QueryTraceTraits::kSplitInfoFileName));
if (splitInfoFile->size() == 0) {
LOG(ERROR) << "Split info is empty in " << traceDir;
return nullptr;
}
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(splitInfoFile), 1 << 20, pool_);
Expand Down
12 changes: 7 additions & 5 deletions velox/exec/QuerySplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,23 @@ namespace facebook::velox::exec::trace {
/// 'IcebergHiveConnectorSplit'.
class QuerySplitReader {
public:
explicit QuerySplitReader(std::string traceDir, memory::MemoryPool* pool);
explicit QuerySplitReader(
std::vector<std::string> traceDir,
memory::MemoryPool* pool);

/// Reads from 'splitInfoStream_' and deserializes to 'splitInfos'. Returns
/// all the correctly traced splits.
/// all the collected tracing splits.
std::vector<exec::Split> read() const;

private:
static std::vector<std::string> getSplitInfos(
common::FileInputStream* stream);

std::unique_ptr<common::FileInputStream> getSplitInputStream() const;
std::unique_ptr<common::FileInputStream> getSplitInputStream(
const std::string& traceDir) const;

const std::string traceDir_;
const std::vector<std::string> traceDirs_;
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
const std::unique_ptr<common::FileInputStream> splitInfoStream_;
};
} // namespace facebook::velox::exec::trace
4 changes: 2 additions & 2 deletions velox/exec/QuerySplitWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void QuerySplitWriter::write(const exec::Split& split) const {
VELOX_CHECK(split.hasConnectorSplit());
const auto splitObj = split.connectorSplit->serialize();
const auto splitJson = folly::toJson(splitObj);
auto ioBuf = appendToBuffer(splitJson);
auto ioBuf = serialize(splitJson);
splitInfoFile_->append(std::move(ioBuf));
}

Expand All @@ -63,7 +63,7 @@ void QuerySplitWriter::finish() {
}

// static
std::unique_ptr<folly::IOBuf> QuerySplitWriter::appendToBuffer(
std::unique_ptr<folly::IOBuf> QuerySplitWriter::serialize(
const std::string& split) {
const uint32_t length = split.length();
const uint32_t crc32 = folly::crc32(
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/QuerySplitWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class QuerySplitWriter {

void finish();

static std::unique_ptr<folly::IOBuf> appendToBuffer(const std::string& split);
private:
static std::unique_ptr<folly::IOBuf> serialize(const std::string& split);

const std::string traceDir_;
const std::shared_ptr<filesystems::FileSystem> fs_;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/QueryTraceTraits.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ struct QueryTraceTraits {

static inline const std::string kQueryMetaFileName = "query_meta.json";
static inline const std::string kDataSummaryFileName = "data_summary.json";
static inline const std::string kTraceDataDirName = "data";
static inline const std::string kTraceSplitDirName = "split";
static inline const std::string kDataFileName = "trace.data";
static inline const std::string kSplitInfoFileName = "trace.split";
};
Expand Down
16 changes: 15 additions & 1 deletion velox/exec/QueryTraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,21 @@ uint8_t getNumDrivers(

std::string
getDataDir(const std::string& traceDir, int pipelineId, int driverId) {
return fmt::format("{}/{}/{}/data", traceDir, pipelineId, driverId);
return fmt::format(
"{}/{}/{}/{}",
traceDir,
pipelineId,
driverId,
trace::QueryTraceTraits::kTraceDataDirName);
}

std::string
getSplitDir(const std::string& traceDir, int pipelineId, int driverId) {
return fmt::format(
"{}/{}/{}/{}",
traceDir,
pipelineId,
driverId,
trace::QueryTraceTraits::kTraceSplitDirName);
}
} // namespace facebook::velox::exec::trace
5 changes: 5 additions & 0 deletions velox/exec/QueryTraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ folly::dynamic getMetadata(
/// given plan node, which is $traceRoot/$taskId/$nodeId.
std::string
getDataDir(const std::string& traceDir, int pipelineId, int driverId);

/// Gets the traced split directory. 'traceaDir' is the trace directory for a
/// given plan node, which is $traceRoot/$taskId/$nodeId.
std::string
getSplitDir(const std::string& traceDir, int pipelineId, int driverId);
} // namespace facebook::velox::exec::trace
15 changes: 15 additions & 0 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "velox/exec/TableScan.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/common/time/Timer.h"
#include "velox/exec/QueryTraceUtil.h"
#include "velox/exec/Task.h"
#include "velox/expression/Expr.h"

Expand Down Expand Up @@ -117,6 +118,9 @@ RowVectorPtr TableScan::getOutput() {
if (!split.hasConnectorSplit()) {
noMoreSplits_ = true;
dynamicFilters_.clear();
if (splitTracer_ != nullptr) {
splitTracer_->finish();
}
if (dataSource_) {
curStatus_ = "getOutput: noMoreSplits_=1, updating stats_";
const auto connectorStats = dataSource_->runtimeStats();
Expand All @@ -135,6 +139,9 @@ RowVectorPtr TableScan::getOutput() {
return nullptr;
}

if (FOLLY_UNLIKELY(splitTracer_ != nullptr)) {
splitTracer_->write(split);
}
const auto& connectorSplit = split.connectorSplit;
currentSplitWeight_ = connectorSplit->splitWeight;
needNewSplit_ = false;
Expand Down Expand Up @@ -376,4 +383,12 @@ void TableScan::addDynamicFilter(
stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer);
}

void TableScan::setupTracer(const std::string& traceDir) {
const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig();
const auto opTraceSplitPath = fmt::format(
"{}/{}", traceDir, trace::QueryTraceTraits::kTraceSplitDirName);
trace::createTraceDirectory(opTraceSplitPath);
splitTracer_ = std::make_unique<trace::QuerySplitWriter>(opTraceSplitPath);
}

} // namespace facebook::velox::exec
5 changes: 5 additions & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "velox/core/PlanNode.h"
#include "velox/exec/Operator.h"
#include "velox/exec/QuerySplitWriter.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -50,6 +51,8 @@ class TableScan : public SourceOperator {
column_index_t outputChannel,
const std::shared_ptr<common::Filter>& filter) override;

void setupTracer(const std::string& traceDir) override;

private:
// Checks if this table scan operator needs to yield before processing the
// next split.
Expand Down Expand Up @@ -120,5 +123,7 @@ class TableScan : public SourceOperator {
// Holds the current status of the operator. Used when debugging to understand
// what operator is doing.
std::atomic<const char*> curStatus_{""};

std::unique_ptr<trace::QuerySplitWriter> splitTracer_;
};
} // namespace facebook::velox::exec
Loading

0 comments on commit 73387d0

Please sign in to comment.