From 12b52e70ec85ae0cdb4aa990797cddda9be5be27 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Sun, 10 Nov 2024 02:38:08 -0800 Subject: [PATCH] Add TableScan Replayer (#11205) Summary: Add a split tracer to record and load the input splits from a tracing 'TableScan' operator, and for getting the traced splits when replaying 'TableScan'. Currently, it only works with 'HiveConnectorSplit'. In the future, it will be extended to handle more types of splits, such as 'IcebergHiveConnectorSplit'. part of https://github.com/facebookincubator/velox/issues/9668 Pull Request resolved: https://github.com/facebookincubator/velox/pull/11205 Reviewed By: tanjialiang Differential Revision: D64946986 Pulled By: xiaoxmeng fbshipit-source-id: 5f656e5ad9b8755484c3eb5ac42b209569a98101 --- velox/connectors/hive/HiveConnectorSplit.cpp | 34 ++- velox/connectors/hive/HiveConnectorSplit.h | 6 +- .../hive/tests/HiveConnectorSerDeTest.cpp | 30 +- velox/exec/Operator.cpp | 29 +- velox/exec/Operator.h | 17 +- velox/exec/OperatorTraceReader.cpp | 68 ++++- velox/exec/OperatorTraceReader.h | 30 +- velox/exec/OperatorTraceWriter.cpp | 70 ++++- velox/exec/OperatorTraceWriter.h | 42 ++- velox/exec/TableScan.cpp | 4 + velox/exec/Trace.h | 4 +- velox/exec/TraceUtil.cpp | 13 +- velox/exec/TraceUtil.h | 3 + velox/exec/tests/OperatorTraceTest.cpp | 270 +++++++++++++++++- velox/exec/tests/TraceUtilTest.cpp | 3 + velox/tool/trace/CMakeLists.txt | 2 + velox/tool/trace/OperatorReplayerBase.cpp | 7 + velox/tool/trace/TableScanReplayer.cpp | 80 ++++++ velox/tool/trace/TableScanReplayer.h | 60 ++++ velox/tool/trace/TraceReplayRunner.cpp | 10 + velox/tool/trace/TraceReplayRunner.h | 1 + velox/tool/trace/tests/CMakeLists.txt | 1 + .../trace/tests/TableScanReplayerTest.cpp | 251 ++++++++++++++++ 23 files changed, 985 insertions(+), 50 deletions(-) create mode 100644 velox/tool/trace/TableScanReplayer.cpp create mode 100644 velox/tool/trace/TableScanReplayer.h create mode 100644 velox/tool/trace/tests/TableScanReplayerTest.cpp diff --git a/velox/connectors/hive/HiveConnectorSplit.cpp b/velox/connectors/hive/HiveConnectorSplit.cpp index ed69eb7d6303..559f1770d6a7 100644 --- a/velox/connectors/hive/HiveConnectorSplit.cpp +++ b/velox/connectors/hive/HiveConnectorSplit.cpp @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const { customSplitInfoObj[key] = value; } obj["customSplitInfo"] = customSplitInfoObj; - obj["extraFileInfo"] = *extraFileInfo; + obj["extraFileInfo"] = + extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo); folly::dynamic serdeParametersObj = folly::dynamic::object; for (const auto& [key, value] : serdeParameters) { @@ -84,8 +85,14 @@ folly::dynamic HiveConnectorSplit::serialize() const { ? folly::dynamic(properties->modificationTime.value()) : nullptr; obj["properties"] = propertiesObj; - } else { - obj["properties"] = nullptr; + } + + if (rowIdProperties.has_value()) { + folly::dynamic rowIdObj = folly::dynamic::object; + rowIdObj["metadataVersion"] = rowIdProperties->metadataVersion; + rowIdObj["partitionId"] = rowIdProperties->partitionId; + rowIdObj["tableGuid"] = rowIdProperties->tableGuid; + obj["rowIdProperties"] = rowIdObj; } return obj; @@ -118,8 +125,9 @@ std::shared_ptr HiveConnectorSplit::create( customSplitInfo[key.asString()] = value.asString(); } - std::shared_ptr extraFileInfo = - std::make_shared(obj["extraFileInfo"].asString()); + std::shared_ptr extraFileInfo = obj["extraFileInfo"].isNull() + ? nullptr + : std::make_shared(obj["extraFileInfo"].asString()); std::unordered_map serdeParameters; for (const auto& [key, value] : obj["serdeParameters"].items()) { serdeParameters[key.asString()] = value.asString(); @@ -131,8 +139,8 @@ std::shared_ptr HiveConnectorSplit::create( } std::optional properties = std::nullopt; - const auto propertiesObj = obj["properties"]; - if (!propertiesObj.isNull()) { + const auto& propertiesObj = obj.getDefault("properties", nullptr); + if (propertiesObj != nullptr) { properties = FileProperties{ propertiesObj["fileSize"].isNull() ? std::nullopt @@ -142,6 +150,15 @@ std::shared_ptr HiveConnectorSplit::create( : std::optional(propertiesObj["modificationTime"].asInt())}; } + std::optional rowIdProperties = std::nullopt; + const auto& rowIdObj = obj.getDefault("rowIdProperties", nullptr); + if (rowIdObj != nullptr) { + rowIdProperties = RowIdProperties{ + .metadataVersion = rowIdObj["metadataVersion"].asInt(), + .partitionId = rowIdObj["partitionId"].asInt(), + .tableGuid = rowIdObj["tableGuid"].asString()}; + } + return std::make_shared( connectorId, filePath, @@ -155,7 +172,8 @@ std::shared_ptr HiveConnectorSplit::create( serdeParameters, splitWeight, infoColumns, - properties); + properties, + rowIdProperties); } // static diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 9af4d7ef357a..a06ffa668e11 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -84,7 +84,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { const std::unordered_map& _serdeParameters = {}, int64_t _splitWeight = 0, const std::unordered_map& _infoColumns = {}, - std::optional _properties = std::nullopt) + std::optional _properties = std::nullopt, + std::optional _rowIdProperties = std::nullopt) : ConnectorSplit(connectorId, _splitWeight), filePath(_filePath), fileFormat(_fileFormat), @@ -96,7 +97,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { extraFileInfo(_extraFileInfo), serdeParameters(_serdeParameters), infoColumns(_infoColumns), - properties(_properties) {} + properties(_properties), + rowIdProperties(_rowIdProperties) {} std::string toString() const override; diff --git a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp index ab65b9099556..7d022ff550bf 100644 --- a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase { ASSERT_EQ(value, clone->customSplitInfo.at(key)); } - ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + if (split.extraFileInfo != nullptr) { + ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + } else { + ASSERT_EQ(clone->extraFileInfo, nullptr); + } ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size()); for (const auto& [key, value] : split.serdeParameters) { ASSERT_EQ(value, clone->serdeParameters.at(key)); @@ -235,7 +239,9 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { FileProperties fileProperties{ .fileSize = 2048, .modificationTime = std::nullopt}; const auto properties = std::optional(fileProperties); - const auto split = HiveConnectorSplit( + RowIdProperties rowIdProperties{ + .metadataVersion = 2, .partitionId = 3, .tableGuid = "test"}; + const auto split1 = HiveConnectorSplit( connectorId, filePath, fileFormat, @@ -248,8 +254,26 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { serdeParameters, splitWeight, infoColumns, + properties, + rowIdProperties); + testSerde(split1); + + const auto split2 = HiveConnectorSplit( + connectorId, + filePath, + fileFormat, + start, + length, + {}, + tableBucketNumber, + customSplitInfo, + nullptr, + {}, + splitWeight, + {}, + std::nullopt, std::nullopt); - testSerde(split); + testSerde(split2); } } // namespace diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 577b00ab66d1..4ed954435292 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -138,11 +138,12 @@ void Operator::maybeSetTracer() { const auto opTraceDirPath = trace::getOpTraceDirectory( traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId); trace::createTraceDirectory(opTraceDirPath); - inputTracer_ = std::make_unique( - this, - opTraceDirPath, - memory::traceMemoryPool(), - traceConfig->updateAndCheckTraceLimitCB); + + if (operatorType() == "TableScan") { + setupSplitTracer(opTraceDirPath); + } else { + setupInputTracer(opTraceDirPath); + } } void Operator::traceInput(const RowVectorPtr& input) { @@ -152,9 +153,14 @@ void Operator::traceInput(const RowVectorPtr& input) { } void Operator::finishTrace() { + VELOX_CHECK(inputTracer_ == nullptr || splitTracer_ == nullptr); if (inputTracer_ != nullptr) { inputTracer_->finish(); } + + if (splitTracer_ != nullptr) { + splitTracer_->finish(); + } } std::vector>& @@ -163,6 +169,19 @@ Operator::translators() { return translators; } +void Operator::setupInputTracer(const std::string& opTraceDirPath) { + inputTracer_ = std::make_unique( + this, + opTraceDirPath, + memory::traceMemoryPool(), + operatorCtx_->driverCtx()->traceConfig()->updateAndCheckTraceLimitCB); +} + +void Operator::setupSplitTracer(const std::string& opTraceDirPath) { + splitTracer_ = + std::make_unique(this, opTraceDirPath); +} + // static std::unique_ptr Operator::fromPlanNode( DriverCtx* ctx, diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 6d0fa34c3af8..25d355802d25 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -737,8 +737,8 @@ class Operator : public BaseRuntimeStatWriter { return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; } - /// Invoked to setup query data writer for this operator if the associated - /// query plan node is configured to collect trace. + /// Invoked to setup query data or split writer for this operator if the + /// associated query plan node is configured to collect trace. void maybeSetTracer(); /// Creates output vector from 'input_' and 'results' according to @@ -778,7 +778,12 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; - std::unique_ptr inputTracer_; + + /// NOTE: only one of the two could be set for an operator for tracing . + /// 'splitTracer_' is only set for table scan to record the processed split + /// for now. + std::unique_ptr inputTracer_{nullptr}; + std::unique_ptr splitTracer_{nullptr}; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this @@ -803,6 +808,12 @@ class Operator : public BaseRuntimeStatWriter { std::unordered_map> dynamicFilters_; + + private: + // Setup 'inputTracer_' to record the processed input vectors. + void setupInputTracer(const std::string& traceDir); + // Setup 'splitTracer_' for table scan to record the processed split. + void setupSplitTracer(const std::string& traceDir); }; /// Given a row type returns indices for the specified subset of columns. diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp index e6376090d807..e08c6cf4d1a2 100644 --- a/velox/exec/OperatorTraceReader.cpp +++ b/velox/exec/OperatorTraceReader.cpp @@ -16,12 +16,12 @@ #include +#include +#include "velox/common/file/FileInputStream.h" #include "velox/exec/OperatorTraceReader.h" - #include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { - OperatorTraceInputReader::OperatorTraceInputReader( std::string traceDir, RowTypePtr dataType, @@ -84,4 +84,68 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const { summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt(); return summary; } + +OperatorTraceSplitReader::OperatorTraceSplitReader( + std::vector traceDirs, + memory::MemoryPool* pool) + : traceDirs_(std::move(traceDirs)), + fs_(filesystems::getFileSystem(traceDirs_[0], nullptr)), + pool_(pool) { + VELOX_CHECK_NOT_NULL(fs_); +} + +std::vector OperatorTraceSplitReader::read() const { + std::vector splits; + for (const auto& traceDir : traceDirs_) { + auto stream = getSplitInputStream(traceDir); + if (stream == nullptr) { + continue; + } + auto curSplits = deserialize(stream.get()); + splits.insert( + splits.end(), + std::make_move_iterator(curSplits.begin()), + std::make_move_iterator(curSplits.end())); + } + return splits; +} + +std::unique_ptr +OperatorTraceSplitReader::getSplitInputStream( + const std::string& traceDir) const { + auto splitInfoFile = fs_->openFileForRead(getOpTraceSplitFilePath(traceDir)); + if (splitInfoFile->size() == 0) { + LOG(WARNING) << "Split info is empty in " << traceDir; + return nullptr; + } + // TODO: Make the buffer size configurable. + return std::make_unique( + std::move(splitInfoFile), 1 << 20, pool_); +} + +// static +std::vector OperatorTraceSplitReader::deserialize( + common::FileInputStream* stream) { + std::vector splits; + try { + while (!stream->atEnd()) { + const auto length = stream->read(); + std::string splitStr(length, '\0'); + stream->readBytes(reinterpret_cast(splitStr.data()), length); + const auto crc32 = stream->read(); + const auto actualCrc32 = folly::crc32( + reinterpret_cast(splitStr.data()), splitStr.size()); + if (crc32 != actualCrc32) { + LOG(ERROR) << "Failed to verify the split checksum " << crc32 + << " which does not equal to the actual computed checksum " + << actualCrc32; + break; + } + splits.push_back(std::move(splitStr)); + } + } catch (const VeloxException& e) { + LOG(ERROR) << "Failed to deserialize split: " << e.message(); + } + return splits; +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/OperatorTraceReader.h b/velox/exec/OperatorTraceReader.h index 6888d5a600b8..60860e0e691e 100644 --- a/velox/exec/OperatorTraceReader.h +++ b/velox/exec/OperatorTraceReader.h @@ -18,11 +18,11 @@ #include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/Split.h" #include "velox/exec/Trace.h" #include "velox/serializers/PrestoSerializer.h" namespace facebook::velox::exec::trace { - /// Used to read an operator trace input. class OperatorTraceInputReader { public: @@ -67,4 +67,32 @@ class OperatorTraceSummaryReader { memory::MemoryPool* const pool_; const std::unique_ptr summaryFile_; }; + +/// Used to load the input splits from a set of traced 'TableScan' operators for +/// replay. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +class OperatorTraceSplitReader { + public: + /// 'traceDirs' provides a list of directories with each one containing the + /// traced split info file for one table scan operator. + explicit OperatorTraceSplitReader( + std::vector traceDirs, + memory::MemoryPool* pool); + + /// Reads and deserializes all the traced split strings. + std::vector read() const; + + private: + static std::vector deserialize(common::FileInputStream* stream); + + std::unique_ptr getSplitInputStream( + const std::string& traceDir) const; + + const std::vector traceDirs_; + const std::shared_ptr fs_; + memory::MemoryPool* const pool_; +}; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/OperatorTraceWriter.cpp b/velox/exec/OperatorTraceWriter.cpp index 38987226ccd8..409406ba2114 100644 --- a/velox/exec/OperatorTraceWriter.cpp +++ b/velox/exec/OperatorTraceWriter.cpp @@ -16,7 +16,10 @@ #include "velox/exec/OperatorTraceWriter.h" +#include +#include #include + #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/Operator.h" @@ -25,7 +28,7 @@ namespace facebook::velox::exec::trace { -OperatorTraceWriter::OperatorTraceWriter( +OperatorTraceInputWriter::OperatorTraceInputWriter( Operator* traceOp, std::string traceDir, memory::MemoryPool* pool, @@ -40,7 +43,7 @@ OperatorTraceWriter::OperatorTraceWriter( VELOX_CHECK_NOT_NULL(traceFile_); } -void OperatorTraceWriter::write(const RowVectorPtr& rows) { +void OperatorTraceInputWriter::write(const RowVectorPtr& rows) { if (FOLLY_UNLIKELY(finished_)) { return; } @@ -67,7 +70,7 @@ void OperatorTraceWriter::write(const RowVectorPtr& rows) { traceFile_->append(std::move(iobuf)); } -void OperatorTraceWriter::finish() { +void OperatorTraceInputWriter::finish() { if (finished_) { return; } @@ -81,7 +84,7 @@ void OperatorTraceWriter::finish() { finished_ = true; } -void OperatorTraceWriter::writeSummary() const { +void OperatorTraceInputWriter::writeSummary() const { const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; @@ -97,4 +100,63 @@ void OperatorTraceWriter::writeSummary() const { file->close(); } +OperatorTraceSplitWriter::OperatorTraceSplitWriter( + Operator* traceOp, + std::string traceDir) + : traceOp_(traceOp), + traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + splitFile_(fs_->openFileForWrite(getOpTraceSplitFilePath(traceDir_))) { + VELOX_CHECK_NOT_NULL(splitFile_); +} + +void OperatorTraceSplitWriter::write(const exec::Split& split) const { + // TODO: Supports group later once we have driver id mapping in trace node. + VELOX_CHECK(!split.hasGroup(), "Do not support grouped execution"); + VELOX_CHECK(split.hasConnectorSplit()); + const auto splitObj = split.connectorSplit->serialize(); + const auto splitJson = folly::toJson(splitObj); + auto ioBuf = serialize(splitJson); + splitFile_->append(std::move(ioBuf)); +} + +void OperatorTraceSplitWriter::finish() { + if (finished_) { + return; + } + + VELOX_CHECK_NOT_NULL( + splitFile_, "The query data writer has already been finished"); + splitFile_->close(); + writeSummary(); + finished_ = true; +} + +// static +std::unique_ptr OperatorTraceSplitWriter::serialize( + const std::string& split) { + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + auto ioBuf = + folly::IOBuf::create(sizeof(length) + split.size() + sizeof(crc32)); + folly::io::Appender appender(ioBuf.get(), 0); + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32); + return ioBuf; +} + +void OperatorTraceSplitWriter::writeSummary() const { + const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); + const auto file = fs_->openFileForWrite(summaryFilePath); + folly::dynamic obj = folly::dynamic::object; + obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); + const auto stats = traceOp_->stats(/*clear=*/false); + obj[OperatorTraceTraits::kPeakMemoryKey] = + stats.memoryStats.peakTotalMemoryReservation; + obj[OperatorTraceTraits::kNumSplits] = stats.numSplits; + file->append(folly::toJson(obj)); + file->close(); +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/OperatorTraceWriter.h b/velox/exec/OperatorTraceWriter.h index c0b818e93acb..451b87f792bb 100644 --- a/velox/exec/OperatorTraceWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -19,6 +19,7 @@ #include "TraceConfig.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/Split.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/VectorStream.h" @@ -28,13 +29,14 @@ class Operator; namespace facebook::velox::exec::trace { -/// Used to serialize and write the input vectors from a given operator into a -/// file. -class OperatorTraceWriter { +/// Used to serialize and write the input vectors from a particular operator +/// into a data file. Additionally, it creates a corresponding summary file that +/// contains information such as peak memory, input rows, operator type, etc. +class OperatorTraceInputWriter { public: /// 'traceOp' is the operator to trace. 'traceDir' specifies the trace /// directory for the operator. - explicit OperatorTraceWriter( + explicit OperatorTraceInputWriter( Operator* traceOp, std::string traceDir, memory::MemoryPool* pool, @@ -48,8 +50,6 @@ class OperatorTraceWriter { private: // Flushes the trace data summaries to the disk. - // - // TODO: add more summaries such as number of rows etc. void writeSummary() const; Operator* const traceOp_; @@ -71,4 +71,34 @@ class OperatorTraceWriter { bool finished_{false}; }; +/// Used to write the input splits during the execution of a traced 'TableScan' +/// operator. Additionally, it creates a corresponding summary file that +/// contains information such as peak memory, number of splits, etc. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +class OperatorTraceSplitWriter { + public: + explicit OperatorTraceSplitWriter(Operator* traceOp, std::string traceDir); + + /// Serializes and writes out each split. Each serialized split is immediately + /// flushed to ensure that we can still replay a traced operator even if a + /// crash occurs during execution. + void write(const exec::Split& split) const; + + void finish(); + + private: + static std::unique_ptr serialize(const std::string& split); + + // Flushes the trace data summaries to the disk. + void writeSummary() const; + + Operator* const traceOp_; + const std::string traceDir_; + const std::shared_ptr fs_; + const std::unique_ptr splitFile_; + bool finished_{false}; +}; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index f93d4a718afb..7586a52f763f 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -17,6 +17,7 @@ #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" #include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -135,6 +136,9 @@ RowVectorPtr TableScan::getOutput() { return nullptr; } + if (FOLLY_UNLIKELY(splitTracer_ != nullptr)) { + splitTracer_->write(split); + } const auto& connectorSplit = split.connectorSplit; currentSplitWeight_ = connectorSplit->splitWeight; needNewSplit_ = false; diff --git a/velox/exec/Trace.h b/velox/exec/Trace.h index 6bf7c6103b84..cdbe2a1384da 100644 --- a/velox/exec/Trace.h +++ b/velox/exec/Trace.h @@ -33,11 +33,13 @@ struct TraceTraits { struct OperatorTraceTraits { static inline const std::string kSummaryFileName = "op_trace_summary.json"; static inline const std::string kInputFileName = "op_input_trace.data"; + static inline const std::string kSplitFileName = "op_split_trace.split"; /// Keys for operator trace summary file. static inline const std::string kOpTypeKey = "opType"; static inline const std::string kPeakMemoryKey = "peakMemory"; - static inline const std::string kInputRowsKey = "inputhRows"; + static inline const std::string kInputRowsKey = "inputRows"; + static inline const std::string kNumSplits = "numSplits"; }; /// Contains the summary of an operator trace. diff --git a/velox/exec/TraceUtil.cpp b/velox/exec/TraceUtil.cpp index 2817f1640f46..8dd8cecb7f59 100644 --- a/velox/exec/TraceUtil.cpp +++ b/velox/exec/TraceUtil.cpp @@ -108,6 +108,10 @@ std::string getOpTraceInputFilePath(const std::string& opTraceDir) { return fmt::format("{}/{}", opTraceDir, OperatorTraceTraits::kInputFileName); } +std::string getOpTraceSplitFilePath(const std::string& opTraceDir) { + return fmt::format("{}/{}", opTraceDir, OperatorTraceTraits::kSplitFileName); +} + std::string getOpTraceSummaryFilePath(const std::string& opTraceDir) { return fmt::format( "{}/{}", opTraceDir, OperatorTraceTraits::kSummaryFileName); @@ -182,13 +186,14 @@ size_t getNumDrivers( bool canTrace(const std::string& operatorType) { static const std::unordered_set kSupportedOperatorTypes{ - "FilterProject", - "TableWrite", "Aggregation", + "FilterProject", + "HashBuild", + "HashProbe", "PartialAggregation", "PartitionedOutput", - "HashBuild", - "HashProbe"}; + "TableScan", + "TableWrite"}; return kSupportedOperatorTypes.count(operatorType) > 0; } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TraceUtil.h b/velox/exec/TraceUtil.h index 807b51c0eddb..69f26c7c7dbd 100644 --- a/velox/exec/TraceUtil.h +++ b/velox/exec/TraceUtil.h @@ -73,6 +73,9 @@ std::string getOpTraceDirectory( /// Returns the file path for a given operator's traced input file. std::string getOpTraceInputFilePath(const std::string& opTraceDir); +/// Returns the file path for a given operator's traced split file. +std::string getOpTraceSplitFilePath(const std::string& opTraceDir); + /// Returns the file path for a given operator's traced input file. std::string getOpTraceSummaryFilePath(const std::string& opTraceDir); diff --git a/velox/exec/tests/OperatorTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp index 2a8f075511a1..d4113adad8bc 100644 --- a/velox/exec/tests/OperatorTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -15,16 +15,21 @@ */ #include +#include #include #include #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/OperatorTraceReader.h" +#include "velox/exec/OperatorTraceWriter.h" #include "velox/exec/PartitionFunction.h" +#include "velox/exec/Split.h" #include "velox/exec/TaskTraceReader.h" #include "velox/exec/Trace.h" #include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -49,6 +54,7 @@ class OperatorTraceTest : public HiveConnectorTestBase { connector::hive::LocationHandle::registerSerDe(); connector::hive::HiveColumnHandle::registerSerDe(); connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); core::PlanNode::registerSerDe(); core::ITypedExpr::registerSerDe(); registerPartitionFunctionSerDe(); @@ -73,17 +79,12 @@ class OperatorTraceTest : public HiveConnectorTestBase { filesystems::registerLocalFileSystem(); } - RowTypePtr generateTypes(size_t numColumns) { - std::vector names; - names.reserve(numColumns); - std::vector types; - types.reserve(numColumns); - for (auto i = 0; i < numColumns; ++i) { - names.push_back(fmt::format("c{}", i)); - types.push_back(vectorFuzzer_.randType((2))); - } - return ROW(std::move(names), std::move(types)); - ; + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const RowTypePtr& rowType = nullptr) { + auto inputs = rowType ? rowType : dataType_; + return HiveConnectorTestBase::makeVectors(inputs, count, rowsPerVector); } bool isSamePlan( @@ -727,6 +728,252 @@ TEST_F(OperatorTraceTest, filterProject) { } } +TEST_F(OperatorTraceTest, traceSplitRoundTrip) { + constexpr auto numSplits = 5; + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + auto splits = makeHiveConnectorSplits(splitFiles); + std::sort(splits.begin(), splits.end()); + + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder().tableScan(dataType_).planNode(); + std::shared_ptr task; + AssertQueryBuilder(plan) + .maxDrivers(3) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, "0") + .splits(splits) + .copyResults(pool(), task); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + std::vector traceDirs; + for (int i = 0; i < 3; ++i) { + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + /*planNodeId=*/"0", + /*pipelineId=*/0, + /*driverId=*/i); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto splitFilePath = getOpTraceSplitFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(splitFilePath)); + + traceDirs.push_back(opTraceDir); + } + + const auto reader = exec::trace::OperatorTraceSplitReader( + traceDirs, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + std::unordered_set splitStrs; + std::transform( + splits.begin(), + splits.end(), + std::inserter(splitStrs, splitStrs.begin()), + [](const auto& s) { return s->toString(); }); + ASSERT_EQ(actualSplits.size(), splits.size()); + for (int i = 0; i < numSplits; ++i) { + folly::dynamic splitInfoObj = folly::parseJson(actualSplits[i]); + const auto actualSplit = exec::Split{ + std::const_pointer_cast( + ISerializable::deserialize( + splitInfoObj))}; + ASSERT_FALSE(actualSplit.hasGroup()); + ASSERT_TRUE(actualSplit.hasConnectorSplit()); + const auto actualConnectorSplit = actualSplit.connectorSplit; + ASSERT_EQ(splitStrs.count(actualConnectorSplit->toString()), 1); + } +} + +TEST_F(OperatorTraceTest, traceSplitPartial) { + constexpr auto numSplits = 3; + const auto vectors = makeVectors(2, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + auto splits = makeHiveConnectorSplits(splitFiles); + + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder().tableScan(dataType_).planNode(); + std::shared_ptr task; + AssertQueryBuilder(plan) + .maxDrivers(3) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, "0") + .splits(splits) + .copyResults(pool(), task); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + std::vector traceDirs; + for (int i = 0; i < 3; ++i) { + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + /*planNodeId=*/"0", + /*pipelineId=*/0, + /*driverId=*/i); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto splitFilePath = getOpTraceSplitFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(splitFilePath)); + + traceDirs.push_back(opTraceDir); + } + + // Append a partial split to the split info file. + const std::string split = "split123"; + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + const auto splitInfoFile = fs->openFileForWrite( + fmt::format( + "{}/{}", + traceDirPath->getPath(), + OperatorTraceTraits::kSplitFileName), + filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); + auto ioBuf = folly::IOBuf::create(12 + 16); + folly::io::Appender appender(ioBuf.get(), 0); + // Writes an invalid split without crc. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + // Writes a valid spilt. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32); + splitInfoFile->append(std::move(ioBuf)); + splitInfoFile->close(); + + const auto reader = exec::trace::OperatorTraceSplitReader( + traceDirs, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + std::unordered_set splitStrs; + std::transform( + splits.begin(), + splits.end(), + std::inserter(splitStrs, splitStrs.begin()), + [](const auto& s) { return s->toString(); }); + ASSERT_EQ(actualSplits.size(), splits.size()); + for (int i = 0; i < numSplits; ++i) { + folly::dynamic splitInfoObj = folly::parseJson(actualSplits[i]); + const auto actualSplit = exec::Split{ + std::const_pointer_cast( + ISerializable::deserialize( + splitInfoObj))}; + ASSERT_FALSE(actualSplit.hasGroup()); + ASSERT_TRUE(actualSplit.hasConnectorSplit()); + const auto actualConnectorSplit = actualSplit.connectorSplit; + ASSERT_EQ(splitStrs.count(actualConnectorSplit->toString()), 1); + } +} + +TEST_F(OperatorTraceTest, traceSplitCorrupted) { + constexpr auto numSplits = 3; + const auto vectors = makeVectors(2, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + auto splits = makeHiveConnectorSplits(splitFiles); + + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder().tableScan(dataType_).planNode(); + std::shared_ptr task; + AssertQueryBuilder(plan) + .maxDrivers(3) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, "0") + .splits(splits) + .copyResults(pool(), task); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + std::vector traceDirs; + for (int i = 0; i < 3; ++i) { + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + /*planNodeId=*/"0", + /*pipelineId=*/0, + /*driverId=*/i); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto splitFilePath = getOpTraceSplitFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(splitFilePath)); + + traceDirs.push_back(opTraceDir); + } + + // Append a split with wrong checksum to the split info file. + const std::string split = "split123"; + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + const auto splitInfoFile = fs->openFileForWrite( + fmt::format( + "{}/{}", + traceDirPath->getPath(), + OperatorTraceTraits::kSplitFileName), + filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); + auto ioBuf = folly::IOBuf::create(16 * 2); + folly::io::Appender appender(ioBuf.get(), 0); + // Writes an invalid split with a wrong checksum. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32 - 1); + // Writes a valid split. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32); + splitInfoFile->append(std::move(ioBuf)); + splitInfoFile->close(); + + const auto reader = exec::trace::OperatorTraceSplitReader( + traceDirs, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + std::unordered_set splitStrs; + std::transform( + splits.begin(), + splits.end(), + std::inserter(splitStrs, splitStrs.begin()), + [](const auto& s) { return s->toString(); }); + ASSERT_EQ(actualSplits.size(), splits.size()); + for (int i = 0; i < numSplits; ++i) { + folly::dynamic splitInfoObj = folly::parseJson(actualSplits[i]); + const auto actualSplit = exec::Split{ + std::const_pointer_cast( + ISerializable::deserialize( + splitInfoObj))}; + ASSERT_FALSE(actualSplit.hasGroup()); + ASSERT_TRUE(actualSplit.hasConnectorSplit()); + const auto actualConnectorSplit = actualSplit.connectorSplit; + ASSERT_EQ(splitStrs.count(actualConnectorSplit->toString()), 1); + } +} + TEST_F(OperatorTraceTest, hashJoin) { std::vector probeInput; RowTypePtr probeType = @@ -882,6 +1129,7 @@ TEST_F(OperatorTraceTest, canTrace) { {"PartialAggregation", true}, {"Aggregation", true}, {"TableWrite", true}, + {"TableScan", true}, {"FilterProject", true}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp index 3afae1b9db42..7220591378cb 100644 --- a/velox/exec/tests/TraceUtilTest.cpp +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -124,6 +124,9 @@ TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) { ASSERT_EQ( getOpTraceSummaryFilePath(opTraceDir), "/traceRoot/queryId/taskId/1/1/1/op_trace_summary.json"); + ASSERT_EQ( + getOpTraceSplitFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_split_trace.split"); } TEST_F(TraceUtilTest, getTaskIds) { diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index dc5138e1bbc5..d179b26c340e 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -19,6 +19,7 @@ velox_add_library( HashJoinReplayer.cpp OperatorReplayerBase.cpp PartitionedOutputReplayer.cpp + TableScanReplayer.cpp TableWriterReplayer.cpp TraceReplayRunner.cpp) @@ -30,6 +31,7 @@ velox_link_libraries( velox_vector_test_lib velox_exec velox_exec_test_lib + velox_hive_connector velox_tpch_connector velox_memory Folly::folly diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 463752732dae..1c70075c7917 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -67,6 +67,13 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { const auto* replayNode = core::PlanNode::findFirstNode( planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); + + if (replayNode->name() == "TableScan") { + return exec::test::PlanBuilder() + .addNode(replayNodeFactory(replayNode)) + .planNode(); + } + return exec::test::PlanBuilder(planNodeIdGenerator_) .traceScan( nodeTraceDir_, diff --git a/velox/tool/trace/TableScanReplayer.cpp b/velox/tool/trace/TableScanReplayer.cpp new file mode 100644 index 000000000000..e4076157f42e --- /dev/null +++ b/velox/tool/trace/TableScanReplayer.cpp @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TableScanReplayer.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/exec/OperatorTraceReader.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { + +RowVectorPtr TableScanReplayer::run() { + const auto plan = createPlan(); + return exec::test::AssertQueryBuilder(plan) + .maxDrivers(maxDrivers_) + .configs(queryConfigs_) + .connectorSessionProperties(connectorConfigs_) + .splits(getSplits()) + .copyResults(memory::MemoryManager::getInstance()->tracePool()); +} + +core::PlanNodePtr TableScanReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& /*source*/) const { + const auto scanNode = dynamic_cast(node); + VELOX_CHECK_NOT_NULL(scanNode); + return std::make_shared( + nodeId, + scanNode->outputType(), + scanNode->tableHandle(), + scanNode->assignments()); +} + +std::vector TableScanReplayer::getSplits() const { + std::vector splitInfoDirs; + if (driverId_ != -1) { + splitInfoDirs.push_back(exec::trace::getOpTraceDirectory( + nodeTraceDir_, pipelineId_, driverId_)); + } else { + for (auto i = 0; i < maxDrivers_; ++i) { + splitInfoDirs.push_back( + exec::trace::getOpTraceDirectory(nodeTraceDir_, pipelineId_, i)); + } + } + const auto splitStrs = + exec::trace::OperatorTraceSplitReader( + splitInfoDirs, memory::MemoryManager::getInstance()->tracePool()) + .read(); + + std::vector splits; + for (const auto& splitStr : splitStrs) { + folly::dynamic splitInfoObj = folly::parseJson(splitStr); + const auto split = + ISerializable::deserialize( + splitInfoObj); + splits.emplace_back( + std::const_pointer_cast(split)); + } + return splits; +} +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TableScanReplayer.h b/velox/tool/trace/TableScanReplayer.h new file mode 100644 index 000000000000..ea7d0d90ef93 --- /dev/null +++ b/velox/tool/trace/TableScanReplayer.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/Split.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +namespace facebook::velox::tool::trace { + +/// The replayer to replay the traced 'TableScan' operators. +class TableScanReplayer final : public OperatorReplayerBase { + public: + TableScanReplayer( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType, + const int32_t driverId = -1) + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), + driverId_(driverId) {} + + RowVectorPtr run() override; + + private: + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& /*source*/) const override; + + std::vector getSplits() const; + + const int32_t driverId_; +}; + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index 1a314f903ab1..ccc444a2aa4a 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -47,6 +47,7 @@ #include "velox/tool/trace/HashJoinReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" +#include "velox/tool/trace/TableScanReplayer.h" #include "velox/tool/trace/TableWriterReplayer.h" #include "velox/type/Type.h" @@ -70,6 +71,7 @@ DEFINE_string( "query task."); DEFINE_string(node_id, "", "Specify the target node id."); DEFINE_int32(pipeline_id, 0, "Specify the target pipeline id."); +DEFINE_int32(driver_id, -1, "Specify the target driver id."); DEFINE_string(operator_type, "", "Specify the target operator type."); DEFINE_string( table_writer_output_dir, @@ -132,6 +134,14 @@ std::unique_ptr createReplayer() { FLAGS_pipeline_id, getVectorSerdeKind(), FLAGS_operator_type); + } else if (FLAGS_operator_type == "TableScan") { + replayer = std::make_unique( + FLAGS_root_dir, + FLAGS_query_id, + FLAGS_task_id, + FLAGS_node_id, + FLAGS_pipeline_id, + FLAGS_operator_type); } else if (FLAGS_operator_type == "FilterProject") { replayer = std::make_unique( FLAGS_root_dir, diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index a0bf1f15e3ae..e1c0906fc4e8 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -27,6 +27,7 @@ DECLARE_string(query_id); DECLARE_string(task_id); DECLARE_string(node_id); DECLARE_int32(pipeline_id); +DECLARE_int32(driver_id); DECLARE_string(operator_type); DECLARE_string(table_writer_output_dir); DECLARE_double(hiveConnectorExecutorHwMultiplier); diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index fe2c66a5fd3c..f6241119110d 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable( FilterProjectReplayerTest.cpp HashJoinReplayerTest.cpp PartitionedOutputReplayerTest.cpp + TableScanReplayerTest.cpp TableWriterReplayerTest.cpp) add_test( diff --git a/velox/tool/trace/tests/TableScanReplayerTest.cpp b/velox/tool/trace/tests/TableScanReplayerTest.cpp new file mode 100644 index 000000000000..5cf1db5f21ea --- /dev/null +++ b/velox/tool/trace/tests/TableScanReplayerTest.cpp @@ -0,0 +1,251 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include "folly/dynamic.h" +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/TableScanReplayer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::common::testutil; +using namespace facebook::velox::common::hll; + +namespace facebook::velox::tool::trace::test { +class TableScanReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const RowTypePtr& rowType = nullptr) { + auto inputs = rowType ? rowType : rowType_; + return HiveConnectorTestBase::makeVectors(inputs, count, rowsPerVector); + } + + core::PlanNodePtr tableScanNode() { + return tableScanNode(rowType_); + } + + core::PlanNodePtr tableScanNode(const RowTypePtr& outputType) { + return PlanBuilder(pool_.get()) + .tableScan(outputType) + .capturePlanNodeId(traceNodeId_) + .planNode(); + } + + const RowTypePtr rowType_{ + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6"}, + {BIGINT(), + INTEGER(), + SMALLINT(), + REAL(), + DOUBLE(), + VARCHAR(), + TINYINT()})}; + core::PlanNodeId traceNodeId_; +}; + +TEST_F(TableScanReplayerTest, basic) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < 5; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = tableScanNode(); + auto results = AssertQueryBuilder(plan) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool()); + + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} + +TEST_F(TableScanReplayerTest, columnPrunning) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < 5; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = + tableScanNode(ROW({"c0", "c3", "c5"}, {BIGINT(), REAL(), VARCHAR()})); + + const auto results = AssertQueryBuilder(plan) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool()); + + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} + +TEST_F(TableScanReplayerTest, subfieldPrunning) { + // rowType: ROW + // └── "e": ROW + // ├── "c": ROW + // │ ├── "a": BIGINT + // │ └── "b": DOUBLE + // └── "d": BIGINT + auto innerType = ROW({"a", "b"}, {BIGINT(), DOUBLE()}); + auto columnType = ROW({"c", "d"}, {innerType, BIGINT()}); + auto rowType = ROW({"e"}, {columnType}); + auto vectors = makeVectors(10, 1'000, rowType); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + std::vector requiredSubfields; + requiredSubfields.emplace_back("e.c"); + std::unordered_map> + assignments; + assignments["e"] = std::make_shared( + "e", + HiveColumnHandle::ColumnType::kRegular, + columnType, + columnType, + std::move(requiredSubfields)); + const auto plan = PlanBuilder() + .startTableScan() + .outputType(rowType) + .assignments(assignments) + .endTableScan() + .capturePlanNodeId(traceNodeId_) + .planNode(); + const auto split = makeHiveConnectorSplit(filePath->getPath()); + const auto results = + AssertQueryBuilder(plan).split(split).copyResults(pool()); + + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .split(split) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} +} // namespace facebook::velox::tool::trace::test