diff --git a/velox/connectors/hive/HiveConnectorSplit.cpp b/velox/connectors/hive/HiveConnectorSplit.cpp index ed69eb7d6303..559f1770d6a7 100644 --- a/velox/connectors/hive/HiveConnectorSplit.cpp +++ b/velox/connectors/hive/HiveConnectorSplit.cpp @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const { customSplitInfoObj[key] = value; } obj["customSplitInfo"] = customSplitInfoObj; - obj["extraFileInfo"] = *extraFileInfo; + obj["extraFileInfo"] = + extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo); folly::dynamic serdeParametersObj = folly::dynamic::object; for (const auto& [key, value] : serdeParameters) { @@ -84,8 +85,14 @@ folly::dynamic HiveConnectorSplit::serialize() const { ? folly::dynamic(properties->modificationTime.value()) : nullptr; obj["properties"] = propertiesObj; - } else { - obj["properties"] = nullptr; + } + + if (rowIdProperties.has_value()) { + folly::dynamic rowIdObj = folly::dynamic::object; + rowIdObj["metadataVersion"] = rowIdProperties->metadataVersion; + rowIdObj["partitionId"] = rowIdProperties->partitionId; + rowIdObj["tableGuid"] = rowIdProperties->tableGuid; + obj["rowIdProperties"] = rowIdObj; } return obj; @@ -118,8 +125,9 @@ std::shared_ptr HiveConnectorSplit::create( customSplitInfo[key.asString()] = value.asString(); } - std::shared_ptr extraFileInfo = - std::make_shared(obj["extraFileInfo"].asString()); + std::shared_ptr extraFileInfo = obj["extraFileInfo"].isNull() + ? nullptr + : std::make_shared(obj["extraFileInfo"].asString()); std::unordered_map serdeParameters; for (const auto& [key, value] : obj["serdeParameters"].items()) { serdeParameters[key.asString()] = value.asString(); @@ -131,8 +139,8 @@ std::shared_ptr HiveConnectorSplit::create( } std::optional properties = std::nullopt; - const auto propertiesObj = obj["properties"]; - if (!propertiesObj.isNull()) { + const auto& propertiesObj = obj.getDefault("properties", nullptr); + if (propertiesObj != nullptr) { properties = FileProperties{ propertiesObj["fileSize"].isNull() ? std::nullopt @@ -142,6 +150,15 @@ std::shared_ptr HiveConnectorSplit::create( : std::optional(propertiesObj["modificationTime"].asInt())}; } + std::optional rowIdProperties = std::nullopt; + const auto& rowIdObj = obj.getDefault("rowIdProperties", nullptr); + if (rowIdObj != nullptr) { + rowIdProperties = RowIdProperties{ + .metadataVersion = rowIdObj["metadataVersion"].asInt(), + .partitionId = rowIdObj["partitionId"].asInt(), + .tableGuid = rowIdObj["tableGuid"].asString()}; + } + return std::make_shared( connectorId, filePath, @@ -155,7 +172,8 @@ std::shared_ptr HiveConnectorSplit::create( serdeParameters, splitWeight, infoColumns, - properties); + properties, + rowIdProperties); } // static diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 9af4d7ef357a..a06ffa668e11 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -84,7 +84,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { const std::unordered_map& _serdeParameters = {}, int64_t _splitWeight = 0, const std::unordered_map& _infoColumns = {}, - std::optional _properties = std::nullopt) + std::optional _properties = std::nullopt, + std::optional _rowIdProperties = std::nullopt) : ConnectorSplit(connectorId, _splitWeight), filePath(_filePath), fileFormat(_fileFormat), @@ -96,7 +97,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { extraFileInfo(_extraFileInfo), serdeParameters(_serdeParameters), infoColumns(_infoColumns), - properties(_properties) {} + properties(_properties), + rowIdProperties(_rowIdProperties) {} std::string toString() const override; diff --git a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp index ab65b9099556..7d022ff550bf 100644 --- a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase { ASSERT_EQ(value, clone->customSplitInfo.at(key)); } - ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + if (split.extraFileInfo != nullptr) { + ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + } else { + ASSERT_EQ(clone->extraFileInfo, nullptr); + } ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size()); for (const auto& [key, value] : split.serdeParameters) { ASSERT_EQ(value, clone->serdeParameters.at(key)); @@ -235,7 +239,9 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { FileProperties fileProperties{ .fileSize = 2048, .modificationTime = std::nullopt}; const auto properties = std::optional(fileProperties); - const auto split = HiveConnectorSplit( + RowIdProperties rowIdProperties{ + .metadataVersion = 2, .partitionId = 3, .tableGuid = "test"}; + const auto split1 = HiveConnectorSplit( connectorId, filePath, fileFormat, @@ -248,8 +254,26 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { serdeParameters, splitWeight, infoColumns, + properties, + rowIdProperties); + testSerde(split1); + + const auto split2 = HiveConnectorSplit( + connectorId, + filePath, + fileFormat, + start, + length, + {}, + tableBucketNumber, + customSplitInfo, + nullptr, + {}, + splitWeight, + {}, + std::nullopt, std::nullopt); - testSerde(split); + testSerde(split2); } } // namespace diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 577b00ab66d1..4ed954435292 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -138,11 +138,12 @@ void Operator::maybeSetTracer() { const auto opTraceDirPath = trace::getOpTraceDirectory( traceConfig->queryTraceDir, planNodeId(), pipelineId, driverId); trace::createTraceDirectory(opTraceDirPath); - inputTracer_ = std::make_unique( - this, - opTraceDirPath, - memory::traceMemoryPool(), - traceConfig->updateAndCheckTraceLimitCB); + + if (operatorType() == "TableScan") { + setupSplitTracer(opTraceDirPath); + } else { + setupInputTracer(opTraceDirPath); + } } void Operator::traceInput(const RowVectorPtr& input) { @@ -152,9 +153,14 @@ void Operator::traceInput(const RowVectorPtr& input) { } void Operator::finishTrace() { + VELOX_CHECK(inputTracer_ == nullptr || splitTracer_ == nullptr); if (inputTracer_ != nullptr) { inputTracer_->finish(); } + + if (splitTracer_ != nullptr) { + splitTracer_->finish(); + } } std::vector>& @@ -163,6 +169,19 @@ Operator::translators() { return translators; } +void Operator::setupInputTracer(const std::string& opTraceDirPath) { + inputTracer_ = std::make_unique( + this, + opTraceDirPath, + memory::traceMemoryPool(), + operatorCtx_->driverCtx()->traceConfig()->updateAndCheckTraceLimitCB); +} + +void Operator::setupSplitTracer(const std::string& opTraceDirPath) { + splitTracer_ = + std::make_unique(this, opTraceDirPath); +} + // static std::unique_ptr Operator::fromPlanNode( DriverCtx* ctx, diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 6d0fa34c3af8..25d355802d25 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -737,8 +737,8 @@ class Operator : public BaseRuntimeStatWriter { return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; } - /// Invoked to setup query data writer for this operator if the associated - /// query plan node is configured to collect trace. + /// Invoked to setup query data or split writer for this operator if the + /// associated query plan node is configured to collect trace. void maybeSetTracer(); /// Creates output vector from 'input_' and 'results' according to @@ -778,7 +778,12 @@ class Operator : public BaseRuntimeStatWriter { folly::Synchronized stats_; folly::Synchronized spillStats_; - std::unique_ptr inputTracer_; + + /// NOTE: only one of the two could be set for an operator for tracing . + /// 'splitTracer_' is only set for table scan to record the processed split + /// for now. + std::unique_ptr inputTracer_{nullptr}; + std::unique_ptr splitTracer_{nullptr}; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this @@ -803,6 +808,12 @@ class Operator : public BaseRuntimeStatWriter { std::unordered_map> dynamicFilters_; + + private: + // Setup 'inputTracer_' to record the processed input vectors. + void setupInputTracer(const std::string& traceDir); + // Setup 'splitTracer_' for table scan to record the processed split. + void setupSplitTracer(const std::string& traceDir); }; /// Given a row type returns indices for the specified subset of columns. diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp index e6376090d807..e08c6cf4d1a2 100644 --- a/velox/exec/OperatorTraceReader.cpp +++ b/velox/exec/OperatorTraceReader.cpp @@ -16,12 +16,12 @@ #include +#include +#include "velox/common/file/FileInputStream.h" #include "velox/exec/OperatorTraceReader.h" - #include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { - OperatorTraceInputReader::OperatorTraceInputReader( std::string traceDir, RowTypePtr dataType, @@ -84,4 +84,68 @@ OperatorTraceSummary OperatorTraceSummaryReader::read() const { summary.inputRows = summaryObj[OperatorTraceTraits::kInputRowsKey].asInt(); return summary; } + +OperatorTraceSplitReader::OperatorTraceSplitReader( + std::vector traceDirs, + memory::MemoryPool* pool) + : traceDirs_(std::move(traceDirs)), + fs_(filesystems::getFileSystem(traceDirs_[0], nullptr)), + pool_(pool) { + VELOX_CHECK_NOT_NULL(fs_); +} + +std::vector OperatorTraceSplitReader::read() const { + std::vector splits; + for (const auto& traceDir : traceDirs_) { + auto stream = getSplitInputStream(traceDir); + if (stream == nullptr) { + continue; + } + auto curSplits = deserialize(stream.get()); + splits.insert( + splits.end(), + std::make_move_iterator(curSplits.begin()), + std::make_move_iterator(curSplits.end())); + } + return splits; +} + +std::unique_ptr +OperatorTraceSplitReader::getSplitInputStream( + const std::string& traceDir) const { + auto splitInfoFile = fs_->openFileForRead(getOpTraceSplitFilePath(traceDir)); + if (splitInfoFile->size() == 0) { + LOG(WARNING) << "Split info is empty in " << traceDir; + return nullptr; + } + // TODO: Make the buffer size configurable. + return std::make_unique( + std::move(splitInfoFile), 1 << 20, pool_); +} + +// static +std::vector OperatorTraceSplitReader::deserialize( + common::FileInputStream* stream) { + std::vector splits; + try { + while (!stream->atEnd()) { + const auto length = stream->read(); + std::string splitStr(length, '\0'); + stream->readBytes(reinterpret_cast(splitStr.data()), length); + const auto crc32 = stream->read(); + const auto actualCrc32 = folly::crc32( + reinterpret_cast(splitStr.data()), splitStr.size()); + if (crc32 != actualCrc32) { + LOG(ERROR) << "Failed to verify the split checksum " << crc32 + << " which does not equal to the actual computed checksum " + << actualCrc32; + break; + } + splits.push_back(std::move(splitStr)); + } + } catch (const VeloxException& e) { + LOG(ERROR) << "Failed to deserialize split: " << e.message(); + } + return splits; +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/OperatorTraceReader.h b/velox/exec/OperatorTraceReader.h index 6888d5a600b8..60860e0e691e 100644 --- a/velox/exec/OperatorTraceReader.h +++ b/velox/exec/OperatorTraceReader.h @@ -18,11 +18,11 @@ #include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/Split.h" #include "velox/exec/Trace.h" #include "velox/serializers/PrestoSerializer.h" namespace facebook::velox::exec::trace { - /// Used to read an operator trace input. class OperatorTraceInputReader { public: @@ -67,4 +67,32 @@ class OperatorTraceSummaryReader { memory::MemoryPool* const pool_; const std::unique_ptr summaryFile_; }; + +/// Used to load the input splits from a set of traced 'TableScan' operators for +/// replay. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +class OperatorTraceSplitReader { + public: + /// 'traceDirs' provides a list of directories with each one containing the + /// traced split info file for one table scan operator. + explicit OperatorTraceSplitReader( + std::vector traceDirs, + memory::MemoryPool* pool); + + /// Reads and deserializes all the traced split strings. + std::vector read() const; + + private: + static std::vector deserialize(common::FileInputStream* stream); + + std::unique_ptr getSplitInputStream( + const std::string& traceDir) const; + + const std::vector traceDirs_; + const std::shared_ptr fs_; + memory::MemoryPool* const pool_; +}; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/OperatorTraceWriter.cpp b/velox/exec/OperatorTraceWriter.cpp index 38987226ccd8..3c129712a7e7 100644 --- a/velox/exec/OperatorTraceWriter.cpp +++ b/velox/exec/OperatorTraceWriter.cpp @@ -16,16 +16,20 @@ #include "velox/exec/OperatorTraceWriter.h" +#include +#include #include + #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/Operator.h" #include "velox/exec/Trace.h" #include "velox/exec/TraceUtil.h" namespace facebook::velox::exec::trace { -OperatorTraceWriter::OperatorTraceWriter( +OperatorTraceInputWriter::OperatorTraceInputWriter( Operator* traceOp, std::string traceDir, memory::MemoryPool* pool, @@ -40,7 +44,7 @@ OperatorTraceWriter::OperatorTraceWriter( VELOX_CHECK_NOT_NULL(traceFile_); } -void OperatorTraceWriter::write(const RowVectorPtr& rows) { +void OperatorTraceInputWriter::write(const RowVectorPtr& rows) { if (FOLLY_UNLIKELY(finished_)) { return; } @@ -67,7 +71,7 @@ void OperatorTraceWriter::write(const RowVectorPtr& rows) { traceFile_->append(std::move(iobuf)); } -void OperatorTraceWriter::finish() { +void OperatorTraceInputWriter::finish() { if (finished_) { return; } @@ -81,7 +85,7 @@ void OperatorTraceWriter::finish() { finished_ = true; } -void OperatorTraceWriter::writeSummary() const { +void OperatorTraceInputWriter::writeSummary() const { const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; @@ -97,4 +101,63 @@ void OperatorTraceWriter::writeSummary() const { file->close(); } +OperatorTraceSplitWriter::OperatorTraceSplitWriter( + Operator* traceOp, + std::string traceDir) + : traceOp_(traceOp), + traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + splitFile_(fs_->openFileForWrite(getOpTraceSplitFilePath(traceDir_))) { + VELOX_CHECK_NOT_NULL(splitFile_); +} + +void OperatorTraceSplitWriter::write(const exec::Split& split) const { + // TODO: Supports group later once we have driver id mapping in trace node. + VELOX_CHECK(!split.hasGroup(), "Do not support grouped execution"); + VELOX_CHECK(split.hasConnectorSplit()); + const auto splitObj = split.connectorSplit->serialize(); + const auto splitJson = folly::toJson(splitObj); + auto ioBuf = serialize(splitJson); + splitFile_->append(std::move(ioBuf)); +} + +void OperatorTraceSplitWriter::finish() { + if (finished_) { + return; + } + + VELOX_CHECK_NOT_NULL( + splitFile_, "The query data writer has already been finished"); + splitFile_->close(); + writeSummary(); + finished_ = true; +} + +// static +std::unique_ptr OperatorTraceSplitWriter::serialize( + const std::string& split) { + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + auto ioBuf = + folly::IOBuf::create(sizeof(length) + split.size() + sizeof(crc32)); + folly::io::Appender appender(ioBuf.get(), 0); + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32); + return ioBuf; +} + +void OperatorTraceSplitWriter::writeSummary() const { + const auto summaryFilePath = getOpTraceSummaryFilePath(traceDir_); + const auto file = fs_->openFileForWrite(summaryFilePath); + folly::dynamic obj = folly::dynamic::object; + obj[OperatorTraceTraits::kOpTypeKey] = traceOp_->operatorType(); + const auto stats = traceOp_->stats(/*clear=*/false); + obj[OperatorTraceTraits::kPeakMemoryKey] = + stats.memoryStats.peakTotalMemoryReservation; + obj[OperatorTraceTraits::kNumSplits] = stats.numSplits; + file->append(folly::toJson(obj)); + file->close(); +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/OperatorTraceWriter.h b/velox/exec/OperatorTraceWriter.h index c0b818e93acb..451b87f792bb 100644 --- a/velox/exec/OperatorTraceWriter.h +++ b/velox/exec/OperatorTraceWriter.h @@ -19,6 +19,7 @@ #include "TraceConfig.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/Split.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/VectorStream.h" @@ -28,13 +29,14 @@ class Operator; namespace facebook::velox::exec::trace { -/// Used to serialize and write the input vectors from a given operator into a -/// file. -class OperatorTraceWriter { +/// Used to serialize and write the input vectors from a particular operator +/// into a data file. Additionally, it creates a corresponding summary file that +/// contains information such as peak memory, input rows, operator type, etc. +class OperatorTraceInputWriter { public: /// 'traceOp' is the operator to trace. 'traceDir' specifies the trace /// directory for the operator. - explicit OperatorTraceWriter( + explicit OperatorTraceInputWriter( Operator* traceOp, std::string traceDir, memory::MemoryPool* pool, @@ -48,8 +50,6 @@ class OperatorTraceWriter { private: // Flushes the trace data summaries to the disk. - // - // TODO: add more summaries such as number of rows etc. void writeSummary() const; Operator* const traceOp_; @@ -71,4 +71,34 @@ class OperatorTraceWriter { bool finished_{false}; }; +/// Used to write the input splits during the execution of a traced 'TableScan' +/// operator. Additionally, it creates a corresponding summary file that +/// contains information such as peak memory, number of splits, etc. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +class OperatorTraceSplitWriter { + public: + explicit OperatorTraceSplitWriter(Operator* traceOp, std::string traceDir); + + /// Serializes and writes out each split. Each serialized split is immediately + /// flushed to ensure that we can still replay a traced operator even if a + /// crash occurs during execution. + void write(const exec::Split& split) const; + + void finish(); + + private: + static std::unique_ptr serialize(const std::string& split); + + // Flushes the trace data summaries to the disk. + void writeSummary() const; + + Operator* const traceOp_; + const std::string traceDir_; + const std::shared_ptr fs_; + const std::unique_ptr splitFile_; + bool finished_{false}; +}; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index f93d4a718afb..7586a52f763f 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -17,6 +17,7 @@ #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" #include "velox/exec/Task.h" +#include "velox/exec/TraceUtil.h" #include "velox/expression/Expr.h" using facebook::velox::common::testutil::TestValue; @@ -135,6 +136,9 @@ RowVectorPtr TableScan::getOutput() { return nullptr; } + if (FOLLY_UNLIKELY(splitTracer_ != nullptr)) { + splitTracer_->write(split); + } const auto& connectorSplit = split.connectorSplit; currentSplitWeight_ = connectorSplit->splitWeight; needNewSplit_ = false; diff --git a/velox/exec/Trace.h b/velox/exec/Trace.h index 6bf7c6103b84..cdbe2a1384da 100644 --- a/velox/exec/Trace.h +++ b/velox/exec/Trace.h @@ -33,11 +33,13 @@ struct TraceTraits { struct OperatorTraceTraits { static inline const std::string kSummaryFileName = "op_trace_summary.json"; static inline const std::string kInputFileName = "op_input_trace.data"; + static inline const std::string kSplitFileName = "op_split_trace.split"; /// Keys for operator trace summary file. static inline const std::string kOpTypeKey = "opType"; static inline const std::string kPeakMemoryKey = "peakMemory"; - static inline const std::string kInputRowsKey = "inputhRows"; + static inline const std::string kInputRowsKey = "inputRows"; + static inline const std::string kNumSplits = "numSplits"; }; /// Contains the summary of an operator trace. diff --git a/velox/exec/TraceUtil.cpp b/velox/exec/TraceUtil.cpp index 2817f1640f46..8dd8cecb7f59 100644 --- a/velox/exec/TraceUtil.cpp +++ b/velox/exec/TraceUtil.cpp @@ -108,6 +108,10 @@ std::string getOpTraceInputFilePath(const std::string& opTraceDir) { return fmt::format("{}/{}", opTraceDir, OperatorTraceTraits::kInputFileName); } +std::string getOpTraceSplitFilePath(const std::string& opTraceDir) { + return fmt::format("{}/{}", opTraceDir, OperatorTraceTraits::kSplitFileName); +} + std::string getOpTraceSummaryFilePath(const std::string& opTraceDir) { return fmt::format( "{}/{}", opTraceDir, OperatorTraceTraits::kSummaryFileName); @@ -182,13 +186,14 @@ size_t getNumDrivers( bool canTrace(const std::string& operatorType) { static const std::unordered_set kSupportedOperatorTypes{ - "FilterProject", - "TableWrite", "Aggregation", + "FilterProject", + "HashBuild", + "HashProbe", "PartialAggregation", "PartitionedOutput", - "HashBuild", - "HashProbe"}; + "TableScan", + "TableWrite"}; return kSupportedOperatorTypes.count(operatorType) > 0; } } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TraceUtil.h b/velox/exec/TraceUtil.h index 807b51c0eddb..69f26c7c7dbd 100644 --- a/velox/exec/TraceUtil.h +++ b/velox/exec/TraceUtil.h @@ -73,6 +73,9 @@ std::string getOpTraceDirectory( /// Returns the file path for a given operator's traced input file. std::string getOpTraceInputFilePath(const std::string& opTraceDir); +/// Returns the file path for a given operator's traced split file. +std::string getOpTraceSplitFilePath(const std::string& opTraceDir); + /// Returns the file path for a given operator's traced input file. std::string getOpTraceSummaryFilePath(const std::string& opTraceDir); diff --git a/velox/exec/tests/OperatorTraceTest.cpp b/velox/exec/tests/OperatorTraceTest.cpp index 2a8f075511a1..d4113adad8bc 100644 --- a/velox/exec/tests/OperatorTraceTest.cpp +++ b/velox/exec/tests/OperatorTraceTest.cpp @@ -15,16 +15,21 @@ */ #include +#include #include #include #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/OperatorTraceReader.h" +#include "velox/exec/OperatorTraceWriter.h" #include "velox/exec/PartitionFunction.h" +#include "velox/exec/Split.h" #include "velox/exec/TaskTraceReader.h" #include "velox/exec/Trace.h" #include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -49,6 +54,7 @@ class OperatorTraceTest : public HiveConnectorTestBase { connector::hive::LocationHandle::registerSerDe(); connector::hive::HiveColumnHandle::registerSerDe(); connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); core::PlanNode::registerSerDe(); core::ITypedExpr::registerSerDe(); registerPartitionFunctionSerDe(); @@ -73,17 +79,12 @@ class OperatorTraceTest : public HiveConnectorTestBase { filesystems::registerLocalFileSystem(); } - RowTypePtr generateTypes(size_t numColumns) { - std::vector names; - names.reserve(numColumns); - std::vector types; - types.reserve(numColumns); - for (auto i = 0; i < numColumns; ++i) { - names.push_back(fmt::format("c{}", i)); - types.push_back(vectorFuzzer_.randType((2))); - } - return ROW(std::move(names), std::move(types)); - ; + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const RowTypePtr& rowType = nullptr) { + auto inputs = rowType ? rowType : dataType_; + return HiveConnectorTestBase::makeVectors(inputs, count, rowsPerVector); } bool isSamePlan( @@ -727,6 +728,252 @@ TEST_F(OperatorTraceTest, filterProject) { } } +TEST_F(OperatorTraceTest, traceSplitRoundTrip) { + constexpr auto numSplits = 5; + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + auto splits = makeHiveConnectorSplits(splitFiles); + std::sort(splits.begin(), splits.end()); + + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder().tableScan(dataType_).planNode(); + std::shared_ptr task; + AssertQueryBuilder(plan) + .maxDrivers(3) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, "0") + .splits(splits) + .copyResults(pool(), task); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + std::vector traceDirs; + for (int i = 0; i < 3; ++i) { + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + /*planNodeId=*/"0", + /*pipelineId=*/0, + /*driverId=*/i); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto splitFilePath = getOpTraceSplitFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(splitFilePath)); + + traceDirs.push_back(opTraceDir); + } + + const auto reader = exec::trace::OperatorTraceSplitReader( + traceDirs, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + std::unordered_set splitStrs; + std::transform( + splits.begin(), + splits.end(), + std::inserter(splitStrs, splitStrs.begin()), + [](const auto& s) { return s->toString(); }); + ASSERT_EQ(actualSplits.size(), splits.size()); + for (int i = 0; i < numSplits; ++i) { + folly::dynamic splitInfoObj = folly::parseJson(actualSplits[i]); + const auto actualSplit = exec::Split{ + std::const_pointer_cast( + ISerializable::deserialize( + splitInfoObj))}; + ASSERT_FALSE(actualSplit.hasGroup()); + ASSERT_TRUE(actualSplit.hasConnectorSplit()); + const auto actualConnectorSplit = actualSplit.connectorSplit; + ASSERT_EQ(splitStrs.count(actualConnectorSplit->toString()), 1); + } +} + +TEST_F(OperatorTraceTest, traceSplitPartial) { + constexpr auto numSplits = 3; + const auto vectors = makeVectors(2, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + auto splits = makeHiveConnectorSplits(splitFiles); + + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder().tableScan(dataType_).planNode(); + std::shared_ptr task; + AssertQueryBuilder(plan) + .maxDrivers(3) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, "0") + .splits(splits) + .copyResults(pool(), task); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + std::vector traceDirs; + for (int i = 0; i < 3; ++i) { + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + /*planNodeId=*/"0", + /*pipelineId=*/0, + /*driverId=*/i); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto splitFilePath = getOpTraceSplitFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(splitFilePath)); + + traceDirs.push_back(opTraceDir); + } + + // Append a partial split to the split info file. + const std::string split = "split123"; + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + const auto splitInfoFile = fs->openFileForWrite( + fmt::format( + "{}/{}", + traceDirPath->getPath(), + OperatorTraceTraits::kSplitFileName), + filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); + auto ioBuf = folly::IOBuf::create(12 + 16); + folly::io::Appender appender(ioBuf.get(), 0); + // Writes an invalid split without crc. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + // Writes a valid spilt. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32); + splitInfoFile->append(std::move(ioBuf)); + splitInfoFile->close(); + + const auto reader = exec::trace::OperatorTraceSplitReader( + traceDirs, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + std::unordered_set splitStrs; + std::transform( + splits.begin(), + splits.end(), + std::inserter(splitStrs, splitStrs.begin()), + [](const auto& s) { return s->toString(); }); + ASSERT_EQ(actualSplits.size(), splits.size()); + for (int i = 0; i < numSplits; ++i) { + folly::dynamic splitInfoObj = folly::parseJson(actualSplits[i]); + const auto actualSplit = exec::Split{ + std::const_pointer_cast( + ISerializable::deserialize( + splitInfoObj))}; + ASSERT_FALSE(actualSplit.hasGroup()); + ASSERT_TRUE(actualSplit.hasConnectorSplit()); + const auto actualConnectorSplit = actualSplit.connectorSplit; + ASSERT_EQ(splitStrs.count(actualConnectorSplit->toString()), 1); + } +} + +TEST_F(OperatorTraceTest, traceSplitCorrupted) { + constexpr auto numSplits = 3; + const auto vectors = makeVectors(2, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < numSplits; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + auto splits = makeHiveConnectorSplits(splitFiles); + + auto traceDirPath = TempDirectoryPath::create(); + auto plan = PlanBuilder().tableScan(dataType_).planNode(); + std::shared_ptr task; + AssertQueryBuilder(plan) + .maxDrivers(3) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceDirPath->getPath()) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, "0") + .splits(splits) + .copyResults(pool(), task); + + const auto taskTraceDir = + getTaskTraceDirectory(traceDirPath->getPath(), *task); + std::vector traceDirs; + for (int i = 0; i < 3; ++i) { + const auto opTraceDir = getOpTraceDirectory( + taskTraceDir, + /*planNodeId=*/"0", + /*pipelineId=*/0, + /*driverId=*/i); + const auto summaryFilePath = getOpTraceSummaryFilePath(opTraceDir); + const auto splitFilePath = getOpTraceSplitFilePath(opTraceDir); + ASSERT_TRUE(fs->exists(summaryFilePath)); + ASSERT_TRUE(fs->exists(splitFilePath)); + + traceDirs.push_back(opTraceDir); + } + + // Append a split with wrong checksum to the split info file. + const std::string split = "split123"; + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + const auto splitInfoFile = fs->openFileForWrite( + fmt::format( + "{}/{}", + traceDirPath->getPath(), + OperatorTraceTraits::kSplitFileName), + filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); + auto ioBuf = folly::IOBuf::create(16 * 2); + folly::io::Appender appender(ioBuf.get(), 0); + // Writes an invalid split with a wrong checksum. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32 - 1); + // Writes a valid split. + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32); + splitInfoFile->append(std::move(ioBuf)); + splitInfoFile->close(); + + const auto reader = exec::trace::OperatorTraceSplitReader( + traceDirs, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + std::unordered_set splitStrs; + std::transform( + splits.begin(), + splits.end(), + std::inserter(splitStrs, splitStrs.begin()), + [](const auto& s) { return s->toString(); }); + ASSERT_EQ(actualSplits.size(), splits.size()); + for (int i = 0; i < numSplits; ++i) { + folly::dynamic splitInfoObj = folly::parseJson(actualSplits[i]); + const auto actualSplit = exec::Split{ + std::const_pointer_cast( + ISerializable::deserialize( + splitInfoObj))}; + ASSERT_FALSE(actualSplit.hasGroup()); + ASSERT_TRUE(actualSplit.hasConnectorSplit()); + const auto actualConnectorSplit = actualSplit.connectorSplit; + ASSERT_EQ(splitStrs.count(actualConnectorSplit->toString()), 1); + } +} + TEST_F(OperatorTraceTest, hashJoin) { std::vector probeInput; RowTypePtr probeType = @@ -882,6 +1129,7 @@ TEST_F(OperatorTraceTest, canTrace) { {"PartialAggregation", true}, {"Aggregation", true}, {"TableWrite", true}, + {"TableScan", true}, {"FilterProject", true}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); diff --git a/velox/exec/tests/TraceUtilTest.cpp b/velox/exec/tests/TraceUtilTest.cpp index 3afae1b9db42..7220591378cb 100644 --- a/velox/exec/tests/TraceUtilTest.cpp +++ b/velox/exec/tests/TraceUtilTest.cpp @@ -124,6 +124,9 @@ TEST_F(TraceUtilTest, traceDirectoryLayoutUtilities) { ASSERT_EQ( getOpTraceSummaryFilePath(opTraceDir), "/traceRoot/queryId/taskId/1/1/1/op_trace_summary.json"); + ASSERT_EQ( + getOpTraceSplitFilePath(opTraceDir), + "/traceRoot/queryId/taskId/1/1/1/op_split_trace.split"); } TEST_F(TraceUtilTest, getTaskIds) { diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index dc5138e1bbc5..d179b26c340e 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -19,6 +19,7 @@ velox_add_library( HashJoinReplayer.cpp OperatorReplayerBase.cpp PartitionedOutputReplayer.cpp + TableScanReplayer.cpp TableWriterReplayer.cpp TraceReplayRunner.cpp) @@ -30,6 +31,7 @@ velox_link_libraries( velox_vector_test_lib velox_exec velox_exec_test_lib + velox_hive_connector velox_tpch_connector velox_memory Folly::folly diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index 463752732dae..1c70075c7917 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -67,6 +67,13 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { const auto* replayNode = core::PlanNode::findFirstNode( planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); + + if (replayNode->name() == "TableScan") { + return exec::test::PlanBuilder() + .addNode(replayNodeFactory(replayNode)) + .planNode(); + } + return exec::test::PlanBuilder(planNodeIdGenerator_) .traceScan( nodeTraceDir_, diff --git a/velox/tool/trace/TableScanReplayer.cpp b/velox/tool/trace/TableScanReplayer.cpp new file mode 100644 index 000000000000..e4076157f42e --- /dev/null +++ b/velox/tool/trace/TableScanReplayer.cpp @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TableScanReplayer.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/exec/OperatorTraceReader.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { + +RowVectorPtr TableScanReplayer::run() { + const auto plan = createPlan(); + return exec::test::AssertQueryBuilder(plan) + .maxDrivers(maxDrivers_) + .configs(queryConfigs_) + .connectorSessionProperties(connectorConfigs_) + .splits(getSplits()) + .copyResults(memory::MemoryManager::getInstance()->tracePool()); +} + +core::PlanNodePtr TableScanReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& /*source*/) const { + const auto scanNode = dynamic_cast(node); + VELOX_CHECK_NOT_NULL(scanNode); + return std::make_shared( + nodeId, + scanNode->outputType(), + scanNode->tableHandle(), + scanNode->assignments()); +} + +std::vector TableScanReplayer::getSplits() const { + std::vector splitInfoDirs; + if (driverId_ != -1) { + splitInfoDirs.push_back(exec::trace::getOpTraceDirectory( + nodeTraceDir_, pipelineId_, driverId_)); + } else { + for (auto i = 0; i < maxDrivers_; ++i) { + splitInfoDirs.push_back( + exec::trace::getOpTraceDirectory(nodeTraceDir_, pipelineId_, i)); + } + } + const auto splitStrs = + exec::trace::OperatorTraceSplitReader( + splitInfoDirs, memory::MemoryManager::getInstance()->tracePool()) + .read(); + + std::vector splits; + for (const auto& splitStr : splitStrs) { + folly::dynamic splitInfoObj = folly::parseJson(splitStr); + const auto split = + ISerializable::deserialize( + splitInfoObj); + splits.emplace_back( + std::const_pointer_cast(split)); + } + return splits; +} +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TableScanReplayer.h b/velox/tool/trace/TableScanReplayer.h new file mode 100644 index 000000000000..ea7d0d90ef93 --- /dev/null +++ b/velox/tool/trace/TableScanReplayer.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/Split.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +namespace facebook::velox::tool::trace { + +/// The replayer to replay the traced 'TableScan' operators. +class TableScanReplayer final : public OperatorReplayerBase { + public: + TableScanReplayer( + const std::string& traceDir, + const std::string& queryId, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType, + const int32_t driverId = -1) + : OperatorReplayerBase( + traceDir, + queryId, + taskId, + nodeId, + pipelineId, + operatorType), + driverId_(driverId) {} + + RowVectorPtr run() override; + + private: + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& /*source*/) const override; + + std::vector getSplits() const; + + const int32_t driverId_; +}; + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TraceReplayRunner.cpp b/velox/tool/trace/TraceReplayRunner.cpp index 1a314f903ab1..ccc444a2aa4a 100644 --- a/velox/tool/trace/TraceReplayRunner.cpp +++ b/velox/tool/trace/TraceReplayRunner.cpp @@ -47,6 +47,7 @@ #include "velox/tool/trace/HashJoinReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" +#include "velox/tool/trace/TableScanReplayer.h" #include "velox/tool/trace/TableWriterReplayer.h" #include "velox/type/Type.h" @@ -70,6 +71,7 @@ DEFINE_string( "query task."); DEFINE_string(node_id, "", "Specify the target node id."); DEFINE_int32(pipeline_id, 0, "Specify the target pipeline id."); +DEFINE_int32(driver_id, -1, "Specify the target driver id."); DEFINE_string(operator_type, "", "Specify the target operator type."); DEFINE_string( table_writer_output_dir, @@ -132,6 +134,14 @@ std::unique_ptr createReplayer() { FLAGS_pipeline_id, getVectorSerdeKind(), FLAGS_operator_type); + } else if (FLAGS_operator_type == "TableScan") { + replayer = std::make_unique( + FLAGS_root_dir, + FLAGS_query_id, + FLAGS_task_id, + FLAGS_node_id, + FLAGS_pipeline_id, + FLAGS_operator_type); } else if (FLAGS_operator_type == "FilterProject") { replayer = std::make_unique( FLAGS_root_dir, diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index a0bf1f15e3ae..e1c0906fc4e8 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -27,6 +27,7 @@ DECLARE_string(query_id); DECLARE_string(task_id); DECLARE_string(node_id); DECLARE_int32(pipeline_id); +DECLARE_int32(driver_id); DECLARE_string(operator_type); DECLARE_string(table_writer_output_dir); DECLARE_double(hiveConnectorExecutorHwMultiplier); diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index fe2c66a5fd3c..f6241119110d 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable( FilterProjectReplayerTest.cpp HashJoinReplayerTest.cpp PartitionedOutputReplayerTest.cpp + TableScanReplayerTest.cpp TableWriterReplayerTest.cpp) add_test( diff --git a/velox/tool/trace/tests/TableScanReplayerTest.cpp b/velox/tool/trace/tests/TableScanReplayerTest.cpp new file mode 100644 index 000000000000..5cf1db5f21ea --- /dev/null +++ b/velox/tool/trace/tests/TableScanReplayerTest.cpp @@ -0,0 +1,251 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include "folly/dynamic.h" +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/TraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/TableScanReplayer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::common::testutil; +using namespace facebook::velox::common::hll; + +namespace facebook::velox::tool::trace::test { +class TableScanReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const RowTypePtr& rowType = nullptr) { + auto inputs = rowType ? rowType : rowType_; + return HiveConnectorTestBase::makeVectors(inputs, count, rowsPerVector); + } + + core::PlanNodePtr tableScanNode() { + return tableScanNode(rowType_); + } + + core::PlanNodePtr tableScanNode(const RowTypePtr& outputType) { + return PlanBuilder(pool_.get()) + .tableScan(outputType) + .capturePlanNodeId(traceNodeId_) + .planNode(); + } + + const RowTypePtr rowType_{ + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6"}, + {BIGINT(), + INTEGER(), + SMALLINT(), + REAL(), + DOUBLE(), + VARCHAR(), + TINYINT()})}; + core::PlanNodeId traceNodeId_; +}; + +TEST_F(TableScanReplayerTest, basic) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < 5; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = tableScanNode(); + auto results = AssertQueryBuilder(plan) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool()); + + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} + +TEST_F(TableScanReplayerTest, columnPrunning) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < 5; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = + tableScanNode(ROW({"c0", "c3", "c5"}, {BIGINT(), REAL(), VARCHAR()})); + + const auto results = AssertQueryBuilder(plan) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool()); + + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} + +TEST_F(TableScanReplayerTest, subfieldPrunning) { + // rowType: ROW + // └── "e": ROW + // ├── "c": ROW + // │ ├── "a": BIGINT + // │ └── "b": DOUBLE + // └── "d": BIGINT + auto innerType = ROW({"a", "b"}, {BIGINT(), DOUBLE()}); + auto columnType = ROW({"c", "d"}, {innerType, BIGINT()}); + auto rowType = ROW({"e"}, {columnType}); + auto vectors = makeVectors(10, 1'000, rowType); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + std::vector requiredSubfields; + requiredSubfields.emplace_back("e.c"); + std::unordered_map> + assignments; + assignments["e"] = std::make_shared( + "e", + HiveColumnHandle::ColumnType::kRegular, + columnType, + columnType, + std::move(requiredSubfields)); + const auto plan = PlanBuilder() + .startTableScan() + .outputType(rowType) + .assignments(assignments) + .endTableScan() + .capturePlanNodeId(traceNodeId_) + .planNode(); + const auto split = makeHiveConnectorSplit(filePath->getPath()); + const auto results = + AssertQueryBuilder(plan).split(split).copyResults(pool()); + + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .split(split) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = TableScanReplayer( + traceRoot, + task->queryCtx()->queryId(), + task->taskId(), + traceNodeId_, + 0, + "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} +} // namespace facebook::velox::tool::trace::test