diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 3b65536ba04cd..a75648a6cbfb4 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -131,21 +131,27 @@ void Operator::maybeSetTracer() { return; } tracedOpMap.emplace(operatorId(), operatorType()); - const auto pipelineId = operatorCtx_->driverCtx()->pipelineId; const auto driverId = operatorCtx_->driverCtx()->driverId; LOG(INFO) << "Trace data for operator type: " << operatorType() << ", operator id: " << operatorId() << ", pipeline: " << pipelineId << ", driver: " << driverId << ", task: " << taskId(); const auto opTraceDirPath = fmt::format( - "{}/{}/{}/{}/data", + "{}/{}/{}/{}", queryTraceConfig->queryTraceDir, planNodeId(), pipelineId, driverId); - trace::createTraceDirectory(opTraceDirPath); + setupTracer(opTraceDirPath); +} + +void Operator::setupTracer(const std::string& traceDir) { + const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); + const auto opTraceDataPath = fmt::format( + "{}/{}", traceDir, trace::QueryTraceTraits::kTraceDataDirName); + trace::createTraceDirectory(opTraceDataPath); inputTracer_ = std::make_unique( - opTraceDirPath, + opTraceDataPath, memory::traceMemoryPool(), queryTraceConfig->updateAndCheckTraceLimitCB); } diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 20561b3e788de..19f8754f03b65 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -740,10 +740,12 @@ class Operator : public BaseRuntimeStatWriter { return spillConfig_.has_value() ? &spillConfig_.value() : nullptr; } - /// Invoked to setup query data writer for this operator if the associated - /// query plan node is configured to collect trace. + /// Invoked to setup query data/split writer for this operator if the + /// associated query plan node is configured to collect trace. void maybeSetTracer(); + virtual void setupTracer(const std::string& traceDir); + /// Creates output vector from 'input_' and 'results' according to /// 'identityProjections_' and 'resultProjections_'. If 'mapping' is set to /// nullptr, the children of the output vector will be identical to their diff --git a/velox/exec/QuerySplitReader.cpp b/velox/exec/QuerySplitReader.cpp index ed660c50df509..4b212a68cdc00 100644 --- a/velox/exec/QuerySplitReader.cpp +++ b/velox/exec/QuerySplitReader.cpp @@ -28,47 +28,53 @@ using namespace facebook::velox::connector::hive; namespace facebook::velox::exec::trace { QuerySplitReader::QuerySplitReader( - std::string traceDir, + std::vector traceDir, memory::MemoryPool* pool) - : traceDir_(std::move(traceDir)), - fs_(filesystems::getFileSystem(traceDir_, nullptr)), - pool_(pool), - splitInfoStream_(getSplitInputStream()) { + : traceDirs_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDirs_[0], nullptr)), + pool_(pool) { VELOX_CHECK_NOT_NULL(fs_); - VELOX_CHECK_NOT_NULL(splitInfoStream_); } std::vector QuerySplitReader::read() const { - const auto splitStrings = getSplitInfos(splitInfoStream_.get()); std::vector splits; - for (const auto& splitString : splitStrings) { - folly::dynamic splitInfoObj = folly::parseJson(splitString); - const auto split = - ISerializable::deserialize(splitInfoObj); - splits.emplace_back( - std::make_shared( - split->connectorId, - split->filePath, - split->fileFormat, - split->start, - split->length, - split->partitionKeys, - split->tableBucketNumber, - split->customSplitInfo, - split->extraFileInfo, - split->serdeParameters, - split->splitWeight, - split->infoColumns, - split->properties), - -1); + for (const auto& traceDir : traceDirs_) { + auto splitInfoStream = getSplitInputStream(traceDir); + if (splitInfoStream == nullptr) { + continue; + } + const auto splitStrs = getSplitInfos(splitInfoStream.get()); + for (const auto& splitStr : splitStrs) { + folly::dynamic splitInfoObj = folly::parseJson(splitStr); + const auto split = + ISerializable::deserialize(splitInfoObj); + splits.emplace_back(std::make_shared( + split->connectorId, + split->filePath, + split->fileFormat, + split->start, + split->length, + split->partitionKeys, + split->tableBucketNumber, + split->customSplitInfo, + split->extraFileInfo, + split->serdeParameters, + split->splitWeight, + split->infoColumns, + split->properties)); + } } return splits; } -std::unique_ptr QuerySplitReader::getSplitInputStream() - const { +std::unique_ptr QuerySplitReader::getSplitInputStream( + const std::string& traceDir) const { auto splitInfoFile = fs_->openFileForRead( - fmt::format("{}/{}", traceDir_, QueryTraceTraits::kSplitInfoFileName)); + fmt::format("{}/{}", traceDir, QueryTraceTraits::kSplitInfoFileName)); + if (splitInfoFile->size() == 0) { + LOG(ERROR) << "Split info is empty in " << traceDir; + return nullptr; + } // TODO: Make the buffer size configurable. return std::make_unique( std::move(splitInfoFile), 1 << 20, pool_); diff --git a/velox/exec/QuerySplitReader.h b/velox/exec/QuerySplitReader.h index c45024e9d374b..97ac61bf78a6f 100644 --- a/velox/exec/QuerySplitReader.h +++ b/velox/exec/QuerySplitReader.h @@ -30,21 +30,23 @@ namespace facebook::velox::exec::trace { /// 'IcebergHiveConnectorSplit'. class QuerySplitReader { public: - explicit QuerySplitReader(std::string traceDir, memory::MemoryPool* pool); + explicit QuerySplitReader( + std::vector traceDir, + memory::MemoryPool* pool); /// Reads from 'splitInfoStream_' and deserializes to 'splitInfos'. Returns - /// all the correctly traced splits. + /// all the collected tracing splits. std::vector read() const; private: static std::vector getSplitInfos( common::FileInputStream* stream); - std::unique_ptr getSplitInputStream() const; + std::unique_ptr getSplitInputStream( + const std::string& traceDir) const; - const std::string traceDir_; + const std::vector traceDirs_; const std::shared_ptr fs_; memory::MemoryPool* const pool_; - const std::unique_ptr splitInfoStream_; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QuerySplitWriter.cpp b/velox/exec/QuerySplitWriter.cpp index f23930414231f..bfe3c5a0ab229 100644 --- a/velox/exec/QuerySplitWriter.cpp +++ b/velox/exec/QuerySplitWriter.cpp @@ -46,7 +46,7 @@ void QuerySplitWriter::write(const exec::Split& split) const { VELOX_CHECK(split.hasConnectorSplit()); const auto splitObj = split.connectorSplit->serialize(); const auto splitJson = folly::toJson(splitObj); - auto ioBuf = appendToBuffer(splitJson); + auto ioBuf = serialize(splitJson); splitInfoFile_->append(std::move(ioBuf)); } @@ -63,7 +63,7 @@ void QuerySplitWriter::finish() { } // static -std::unique_ptr QuerySplitWriter::appendToBuffer( +std::unique_ptr QuerySplitWriter::serialize( const std::string& split) { const uint32_t length = split.length(); const uint32_t crc32 = folly::crc32( diff --git a/velox/exec/QuerySplitWriter.h b/velox/exec/QuerySplitWriter.h index 4b1ad3ede47f2..4c1602766ee87 100644 --- a/velox/exec/QuerySplitWriter.h +++ b/velox/exec/QuerySplitWriter.h @@ -39,7 +39,8 @@ class QuerySplitWriter { void finish(); - static std::unique_ptr appendToBuffer(const std::string& split); + private: + static std::unique_ptr serialize(const std::string& split); const std::string traceDir_; const std::shared_ptr fs_; diff --git a/velox/exec/QueryTraceTraits.h b/velox/exec/QueryTraceTraits.h index 8c5100d380175..bc1b5f2b7c540 100644 --- a/velox/exec/QueryTraceTraits.h +++ b/velox/exec/QueryTraceTraits.h @@ -30,6 +30,8 @@ struct QueryTraceTraits { static inline const std::string kQueryMetaFileName = "query_meta.json"; static inline const std::string kDataSummaryFileName = "data_summary.json"; + static inline const std::string kTraceDataDirName = "data"; + static inline const std::string kTraceSplitDirName = "split"; static inline const std::string kDataFileName = "trace.data"; static inline const std::string kSplitInfoFileName = "trace.split"; }; diff --git a/velox/exec/QueryTraceUtil.cpp b/velox/exec/QueryTraceUtil.cpp index f3a339eec080a..ee0f413034134 100644 --- a/velox/exec/QueryTraceUtil.cpp +++ b/velox/exec/QueryTraceUtil.cpp @@ -105,7 +105,21 @@ uint8_t getNumDrivers( std::string getDataDir(const std::string& traceDir, int pipelineId, int driverId) { - return fmt::format("{}/{}/{}/data", traceDir, pipelineId, driverId); + return fmt::format( + "{}/{}/{}/{}", + traceDir, + pipelineId, + driverId, + trace::QueryTraceTraits::kTraceDataDirName); } +std::string +getSplitDir(const std::string& traceDir, int pipelineId, int driverId) { + return fmt::format( + "{}/{}/{}/{}", + traceDir, + pipelineId, + driverId, + trace::QueryTraceTraits::kTraceSplitDirName); +} } // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceUtil.h b/velox/exec/QueryTraceUtil.h index 633c0bc27b2a8..3abc69dc4da59 100644 --- a/velox/exec/QueryTraceUtil.h +++ b/velox/exec/QueryTraceUtil.h @@ -72,4 +72,9 @@ folly::dynamic getMetadata( /// given plan node, which is $traceRoot/$taskId/$nodeId. std::string getDataDir(const std::string& traceDir, int pipelineId, int driverId); + +/// Gets the traced split directory. 'traceaDir' is the trace directory for a +/// given plan node, which is $traceRoot/$taskId/$nodeId. +std::string +getSplitDir(const std::string& traceDir, int pipelineId, int driverId); } // namespace facebook::velox::exec::trace diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index f93d4a718afb2..954efab8bea31 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -16,6 +16,7 @@ #include "velox/exec/TableScan.h" #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" +#include "velox/exec/QueryTraceUtil.h" #include "velox/exec/Task.h" #include "velox/expression/Expr.h" @@ -117,6 +118,9 @@ RowVectorPtr TableScan::getOutput() { if (!split.hasConnectorSplit()) { noMoreSplits_ = true; dynamicFilters_.clear(); + if (splitTracer_ != nullptr) { + splitTracer_->finish(); + } if (dataSource_) { curStatus_ = "getOutput: noMoreSplits_=1, updating stats_"; const auto connectorStats = dataSource_->runtimeStats(); @@ -135,6 +139,9 @@ RowVectorPtr TableScan::getOutput() { return nullptr; } + if (FOLLY_UNLIKELY(splitTracer_ != nullptr)) { + splitTracer_->write(split); + } const auto& connectorSplit = split.connectorSplit; currentSplitWeight_ = connectorSplit->splitWeight; needNewSplit_ = false; @@ -376,4 +383,12 @@ void TableScan::addDynamicFilter( stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } +void TableScan::setupTracer(const std::string& traceDir) { + const auto& queryTraceConfig = operatorCtx_->driverCtx()->traceConfig(); + const auto opTraceSplitPath = fmt::format( + "{}/{}", traceDir, trace::QueryTraceTraits::kTraceSplitDirName); + trace::createTraceDirectory(opTraceSplitPath); + splitTracer_ = std::make_unique(opTraceSplitPath); +} + } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index deec109b02c1f..78b11131322f3 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -17,6 +17,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Operator.h" +#include "velox/exec/QuerySplitWriter.h" namespace facebook::velox::exec { @@ -50,6 +51,8 @@ class TableScan : public SourceOperator { column_index_t outputChannel, const std::shared_ptr& filter) override; + void setupTracer(const std::string& traceDir) override; + private: // Checks if this table scan operator needs to yield before processing the // next split. @@ -120,5 +123,7 @@ class TableScan : public SourceOperator { // Holds the current status of the operator. Used when debugging to understand // what operator is doing. std::atomic curStatus_{""}; + + std::unique_ptr splitTracer_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/QueryTraceTest.cpp b/velox/exec/tests/QueryTraceTest.cpp index 328246737b89b..ef72a4effb185 100644 --- a/velox/exec/tests/QueryTraceTest.cpp +++ b/velox/exec/tests/QueryTraceTest.cpp @@ -267,8 +267,11 @@ TEST_F(QueryTracerTest, traceMetadata) { } } -TEST_F(QueryTracerTest, traceSplit) { - constexpr auto numSplits = 13; +TEST_F(QueryTracerTest, traceSplitRoundTrip) { + const auto testDir = TempDirectoryPath::create(); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector splitInfoDirs; + constexpr auto numSplits = 12; std::vector splits; for (int i = 0; i < numSplits; ++i) { auto builder = HiveConnectorSplitBuilder(fmt::format("path-{}-{}", i, i)); @@ -287,68 +290,143 @@ TEST_F(QueryTracerTest, traceSplit) { -1); } - enum class TestMode { kNormal = 0, kPartial = 1, kChecksum = 2 }; + std::vector traceDirs; + for (int i = 0; i < 3; ++i) { + const auto traceDir = fmt::format("{}/{}", testDir->getPath(), i); + fs->mkdir(traceDir); + traceDirs.push_back(traceDir); + auto writer = exec::trace::QuerySplitWriter(traceDir); + for (int j = i * 4; j < (i + 1) * 4; ++j) { + writer.write(splits.at(j)); + } + writer.finish(); + } - const auto testDir = TempDirectoryPath::create(); - const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); - const std::unordered_map traceDirs{ - {TestMode::kNormal, fmt::format("{}/normal", testDir->getPath())}, - {TestMode::kPartial, fmt::format("{}/partial", testDir->getPath())}, - {TestMode::kChecksum, fmt::format("{}/checksum", testDir->getPath())}}; + const auto reader = exec::trace::QuerySplitReader( + traceDirs, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + for (int i = 0; i < numSplits; ++i) { + ASSERT_FALSE(actualSplits[i].hasGroup()); + ASSERT_TRUE(actualSplits[i].hasConnectorSplit()); + const auto actualConnectorSplit = actualSplits[i].connectorSplit; + const auto expectedConnectorSplit = splits[i].connectorSplit; + ASSERT_EQ( + actualConnectorSplit->toString(), expectedConnectorSplit->toString()); + } +} + +TEST_F(QueryTracerTest, traceSplitPartial) { + constexpr auto numSplits = 3; + std::vector splits; + for (int i = 0; i < numSplits; ++i) { + auto builder = HiveConnectorSplitBuilder(fmt::format("path-{}-{}", i, i)); + const auto key = fmt::format("k{}", i); + const auto value = fmt::format("v{}", i); + splits.emplace_back( + builder.start(i) + .length(i) + .connectorId(fmt::format("{}", i)) + .fileFormat(dwio::common::FileFormat(i + 1)) + .infoColumn(key, value) + .partitionKey( + key, i > 1 ? std::nullopt : std::optional(value)) + .tableBucketNumber(i) + .build(), + -1); + } + + const auto traceDir = TempDirectoryPath::create(); + auto writer = exec::trace::QuerySplitWriter(traceDir->getPath()); + for (int i = 0; i < numSplits; ++i) { + writer.write(splits.at(i)); + } + writer.finish(); + + // Append a partial split to the split info file. + const auto fs = filesystems::getFileSystem(traceDir->getPath(), nullptr); const std::string split = "deadbeaf"; const uint32_t length = split.length(); const uint32_t crc32 = folly::crc32( reinterpret_cast(split.data()), split.size()); - struct { - TestMode testMode; - uint32_t ioBufSize; - std::string debugString() const { - return fmt::format( - "Test mode: {}, ioBufSize: {}", - static_cast(testMode), - ioBufSize); - } - } testSettings[]{ - {TestMode::kNormal, 0}, - {TestMode::kPartial, 4 + length}, - {TestMode::kChecksum, 4 + length + 4}}; + const auto splitInfoFile = fs->openFileForWrite( + fmt::format( + "{}/{}", traceDir->getPath(), QueryTraceTraits::kSplitInfoFileName), + filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); + auto ioBuf = folly::IOBuf::create(12); + folly::io::Appender appender(ioBuf.get(), 0); + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + splitInfoFile->append(std::move(ioBuf)); + splitInfoFile->close(); + + const auto reader = exec::trace::QuerySplitReader( + {traceDir->getPath()}, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + for (int i = 0; i < numSplits; ++i) { + ASSERT_FALSE(actualSplits[i].hasGroup()); + ASSERT_TRUE(actualSplits[i].hasConnectorSplit()); + const auto actualConnectorSplit = actualSplits[i].connectorSplit; + const auto expectedConnectorSplit = splits[i].connectorSplit; + ASSERT_EQ( + actualConnectorSplit->toString(), expectedConnectorSplit->toString()); + } +} - for (const auto& testData : testSettings) { - SCOPED_TRACE(testData.debugString()); - fs->mkdir(traceDirs.at(testData.testMode)); - const auto& traceDir = traceDirs.at(testData.testMode); - auto writer = exec::trace::QuerySplitWriter(traceDir); - for (int i = 0; i < numSplits; ++i) { - writer.write(splits.at(i)); - } - writer.finish(); +TEST_F(QueryTracerTest, traceSplitCorrupted) { + constexpr auto numSplits = 3; + std::vector splits; + for (int i = 0; i < numSplits; ++i) { + auto builder = HiveConnectorSplitBuilder(fmt::format("path-{}-{}", i, i)); + const auto key = fmt::format("k{}", i); + const auto value = fmt::format("v{}", i); + splits.emplace_back( + builder.start(i) + .length(i) + .connectorId(fmt::format("{}", i)) + .fileFormat(dwio::common::FileFormat(i + 1)) + .infoColumn(key, value) + .partitionKey( + key, i > 1 ? std::nullopt : std::optional(value)) + .tableBucketNumber(i) + .build(), + -1); + } - if (testData.testMode != TestMode::kNormal) { - const auto splitInfoFile = fs->openFileForWrite( - fmt::format("{}/{}", traceDir, QueryTraceTraits::kSplitInfoFileName), - filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); - auto ioBuf = folly::IOBuf::create(testData.ioBufSize); - folly::io::Appender appender(ioBuf.get(), 0); - appender.writeLE(length); - appender.push(reinterpret_cast(split.data()), length); - if (testData.testMode == TestMode::kChecksum) { - appender.writeLE(crc32 - 1); - } - splitInfoFile->append(std::move(ioBuf)); - splitInfoFile->close(); - } + const auto traceDir = TempDirectoryPath::create(); + auto writer = exec::trace::QuerySplitWriter(traceDir->getPath()); + for (int i = 0; i < numSplits; ++i) { + writer.write(splits.at(i)); + } + writer.finish(); - const auto reader = exec::trace::QuerySplitReader( - traceDir, memory::MemoryManager::getInstance()->tracePool()); - auto actualSplits = reader.read(); - for (int i = 0; i < numSplits; ++i) { - ASSERT_FALSE(actualSplits[i].hasGroup()); - ASSERT_TRUE(actualSplits[i].hasConnectorSplit()); - const auto actualConnectorSplit = actualSplits[i].connectorSplit; - const auto expectedConnectorSplit = splits[i].connectorSplit; - ASSERT_EQ( - actualConnectorSplit->toString(), expectedConnectorSplit->toString()); - } + // Append a split with wrong checksum to the split info file. + const auto fs = filesystems::getFileSystem(traceDir->getPath(), nullptr); + const std::string split = "deadbeaf"; + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + const auto splitInfoFile = fs->openFileForWrite( + fmt::format( + "{}/{}", traceDir->getPath(), QueryTraceTraits::kSplitInfoFileName), + filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); + auto ioBuf = folly::IOBuf::create(16); + folly::io::Appender appender(ioBuf.get(), 0); + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32 - 1); + splitInfoFile->append(std::move(ioBuf)); + splitInfoFile->close(); + + const auto reader = exec::trace::QuerySplitReader( + {traceDir->getPath()}, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + for (int i = 0; i < numSplits; ++i) { + ASSERT_FALSE(actualSplits[i].hasGroup()); + ASSERT_TRUE(actualSplits[i].hasConnectorSplit()); + const auto actualConnectorSplit = actualSplits[i].connectorSplit; + const auto expectedConnectorSplit = splits[i].connectorSplit; + ASSERT_EQ( + actualConnectorSplit->toString(), expectedConnectorSplit->toString()); } } diff --git a/velox/tool/trace/CMakeLists.txt b/velox/tool/trace/CMakeLists.txt index 3218b4ee30b0b..73ac9afd64817 100644 --- a/velox/tool/trace/CMakeLists.txt +++ b/velox/tool/trace/CMakeLists.txt @@ -17,6 +17,7 @@ velox_add_library( AggregationReplayer.cpp OperatorReplayerBase.cpp PartitionedOutputReplayer.cpp + TableScanReplayer.cpp TableWriterReplayer.cpp) velox_link_libraries( diff --git a/velox/tool/trace/OperatorReplayerBase.cpp b/velox/tool/trace/OperatorReplayerBase.cpp index b54ad323a4471..d526956c2916a 100644 --- a/velox/tool/trace/OperatorReplayerBase.cpp +++ b/velox/tool/trace/OperatorReplayerBase.cpp @@ -68,6 +68,13 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const { const auto* replayNode = core::PlanNode::findFirstNode( planFragment_.get(), [this](const core::PlanNode* node) { return node->id() == nodeId_; }); + // Leaf node. + if (replayNode->sources().empty()) { + return exec::test::PlanBuilder() + .addNode(replayNodeFactory(replayNode)) + .planNode(); + } + return exec::test::PlanBuilder() .traceScan(nodeDir_, exec::trace::getDataType(planFragment_, nodeId_)) .addNode(replayNodeFactory(replayNode)) diff --git a/velox/tool/trace/QueryReplayer.cpp b/velox/tool/trace/QueryReplayer.cpp index 5b135d5a69919..6afb2311951f4 100644 --- a/velox/tool/trace/QueryReplayer.cpp +++ b/velox/tool/trace/QueryReplayer.cpp @@ -20,6 +20,7 @@ #include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" @@ -32,6 +33,7 @@ #include "velox/tool/trace/AggregationReplayer.h" #include "velox/tool/trace/OperatorReplayerBase.h" #include "velox/tool/trace/PartitionedOutputReplayer.h" +#include "velox/tool/trace/TableScanReplayer.h" #include "velox/tool/trace/TableWriterReplayer.h" #include "velox/type/Type.h" @@ -54,6 +56,7 @@ DEFINE_string( "query task."); DEFINE_string(node_id, "", "Specify the target node id."); DEFINE_int32(pipeline_id, 0, "Specify the target pipeline id."); +DEFINE_int32(driver_id, -1, "Specify the target driver id."); DEFINE_string(operator_type, "", "Specify the target operator type."); DEFINE_string( table_writer_output_dir, @@ -83,6 +86,7 @@ void init() { connector::hive::LocationHandle::registerSerDe(); connector::hive::HiveColumnHandle::registerSerDe(); connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); if (!isRegisteredVectorSerde()) { serializer::presto::PrestoVectorSerde::registerVectorSerde(); } @@ -125,6 +129,14 @@ std::unique_ptr createReplayer() { FLAGS_node_id, FLAGS_pipeline_id, FLAGS_operator_type); + } else if (FLAGS_operator_type == "TableScan") { + replayer = std::make_unique( + FLAGS_root_dir, + FLAGS_task_id, + FLAGS_node_id, + FLAGS_pipeline_id, + FLAGS_operator_type, + FLAGS_driver_id); } else { VELOX_UNSUPPORTED("Unsupported operator type: {}", FLAGS_operator_type); } @@ -179,12 +191,12 @@ void printSummary( } // namespace int main(int argc, char** argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - if (argc == 1) { gflags::ShowUsageWithFlags(argv[0]); return -1; } + gflags::ParseCommandLineFlags(&argc, &argv, true); + if (FLAGS_root_dir.empty()) { gflags::SetUsageMessage("--root_dir must be provided."); gflags::ShowUsageWithFlags(argv[0]); diff --git a/velox/tool/trace/TableScanReplayer.cpp b/velox/tool/trace/TableScanReplayer.cpp new file mode 100644 index 0000000000000..8f654c1304b06 --- /dev/null +++ b/velox/tool/trace/TableScanReplayer.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/tool/trace/TableScanReplayer.h" +#include "velox/exec/QuerySplitReader.h" +#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace facebook::velox::tool::trace { + +RowVectorPtr TableScanReplayer::run() { + const auto plan = createPlan(); + return exec::test::AssertQueryBuilder(plan) + .maxDrivers(maxDrivers_) + .configs(queryConfigs_) + .connectorSessionProperties(connectorConfigs_) + .splits(getSplits()) + .copyResults(memory::MemoryManager::getInstance()->tracePool()); +} + +core::PlanNodePtr TableScanReplayer::createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& /*source*/) const { + const auto scanNode = dynamic_cast(node); + VELOX_CHECK_NOT_NULL(scanNode); + return std::make_shared( + nodeId, + scanNode->outputType(), + scanNode->tableHandle(), + scanNode->assignments()); +} + +std::vector TableScanReplayer::getSplits() const { + std::vector splitInfoDirs; + if (driverId_ != -1) { + splitInfoDirs.push_back( + exec::trace::getSplitDir(nodeDir_, pipelineId_, driverId_)); + } else { + for (auto i = 0; i < maxDrivers_; ++i) { + splitInfoDirs.push_back( + exec::trace::getSplitDir(nodeDir_, pipelineId_, i)); + } + } + const auto reader = exec::trace::QuerySplitReader( + splitInfoDirs, memory::MemoryManager::getInstance()->tracePool()); + return reader.read(); +} +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/TableScanReplayer.h b/velox/tool/trace/TableScanReplayer.h new file mode 100644 index 0000000000000..8fd3fc2a2fe10 --- /dev/null +++ b/velox/tool/trace/TableScanReplayer.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/Split.h" +#include "velox/tool/trace/OperatorReplayerBase.h" + +namespace facebook::velox::tool::trace { + +/// The replayer to replay the traced 'TableScan' operator. +class TableScanReplayer final : public OperatorReplayerBase { + public: + TableScanReplayer( + const std::string& rootDir, + const std::string& taskId, + const std::string& nodeId, + const int32_t pipelineId, + const std::string& operatorType, + const int32_t driverId = -1) + : OperatorReplayerBase(rootDir, taskId, nodeId, pipelineId, operatorType), + driverId_(driverId) {} + + RowVectorPtr run() override; + + private: + core::PlanNodePtr createPlanNode( + const core::PlanNode* node, + const core::PlanNodeId& nodeId, + const core::PlanNodePtr& /*source*/) const override; + + std::vector getSplits() const; + + const int32_t driverId_; +}; + +} // namespace facebook::velox::tool::trace diff --git a/velox/tool/trace/tests/CMakeLists.txt b/velox/tool/trace/tests/CMakeLists.txt index 03d1705eebe76..7c06eccf89f34 100644 --- a/velox/tool/trace/tests/CMakeLists.txt +++ b/velox/tool/trace/tests/CMakeLists.txt @@ -15,7 +15,7 @@ add_executable( velox_tool_trace_test AggregationReplayerTest.cpp PartitionedOutputReplayerTest.cpp - TableWriterReplayerTest.cpp) + TableScanReplayerTest.cpp TableWriterReplayerTest.cpp) add_test( NAME velox_tool_trace_test diff --git a/velox/tool/trace/tests/TableScanReplayerTest.cpp b/velox/tool/trace/tests/TableScanReplayerTest.cpp new file mode 100644 index 0000000000000..4929082ff035b --- /dev/null +++ b/velox/tool/trace/tests/TableScanReplayerTest.cpp @@ -0,0 +1,238 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include + +#include "folly/dynamic.h" +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/hyperloglog/SparseHll.h" +#include "velox/exec/PartitionFunction.h" +#include "velox/exec/QueryDataReader.h" +#include "velox/exec/QueryTraceUtil.h" +#include "velox/exec/TableWriter.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/tool/trace/TableWriterReplayer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::core; +using namespace facebook::velox::common; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::common::testutil; +using namespace facebook::velox::common::hll; + +namespace facebook::velox::tool::trace::test { +class TableScanReplayerTest : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + HiveConnectorTestBase::SetUpTestCase(); + filesystems::registerLocalFileSystem(); + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + Type::registerSerDe(); + common::Filter::registerSerDe(); + connector::hive::HiveTableHandle::registerSerDe(); + connector::hive::LocationHandle::registerSerDe(); + connector::hive::HiveColumnHandle::registerSerDe(); + connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); + core::PlanNode::registerSerDe(); + core::ITypedExpr::registerSerDe(); + registerPartitionFunctionSerDe(); + } + + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const RowTypePtr& rowType = nullptr) { + auto inputs = rowType ? rowType : rowType_; + return HiveConnectorTestBase::makeVectors(inputs, count, rowsPerVector); + } + + core::PlanNodePtr tableScanNode() { + return tableScanNode(rowType_); + } + + core::PlanNodePtr tableScanNode(const RowTypePtr& outputType) { + return PlanBuilder(pool_.get()) + .tableScan(outputType) + .capturePlanNodeId(traceNodeId_) + .planNode(); + } + + const RowTypePtr rowType_{ + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6"}, + {BIGINT(), + INTEGER(), + SMALLINT(), + REAL(), + DOUBLE(), + VARCHAR(), + TINYINT()})}; + core::PlanNodeId traceNodeId_; +}; + +TEST_F(TableScanReplayerTest, basic) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < 5; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = tableScanNode(); + auto results = AssertQueryBuilder(plan) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool()); + + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = + TableScanReplayer(traceRoot, task->taskId(), traceNodeId_, 0, "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} + +TEST_F(TableScanReplayerTest, columnPrunning) { + const auto vectors = makeVectors(10, 100); + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + std::vector> splitFiles; + for (int i = 0; i < 5; ++i) { + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + } + + const auto plan = + tableScanNode(ROW({"c0", "c3", "c5"}, {BIGINT(), REAL(), VARCHAR()})); + + const auto results = AssertQueryBuilder(plan) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool()); + + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .maxDrivers(4) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .splits(makeHiveConnectorSplits(splitFiles)) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = + TableScanReplayer(traceRoot, task->taskId(), traceNodeId_, 0, "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} + +TEST_F(TableScanReplayerTest, subfieldPrunning) { + // rowType: ROW + // └── "e": ROW + // ├── "c": ROW + // │ ├── "a": BIGINT + // │ └── "b": DOUBLE + // └── "d": BIGINT + auto innerType = ROW({"a", "b"}, {BIGINT(), DOUBLE()}); + auto columnType = ROW({"c", "d"}, {innerType, BIGINT()}); + auto rowType = ROW({"e"}, {columnType}); + auto vectors = makeVectors(10, 1'000, rowType); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + std::vector requiredSubfields; + requiredSubfields.emplace_back("e.c"); + std::unordered_map> + assignments; + assignments["e"] = std::make_shared( + "e", + HiveColumnHandle::ColumnType::kRegular, + columnType, + columnType, + std::move(requiredSubfields)); + const auto plan = PlanBuilder() + .startTableScan() + .outputType(rowType) + .assignments(assignments) + .endTableScan() + .capturePlanNodeId(traceNodeId_) + .planNode(); + const auto split = makeHiveConnectorSplit(filePath->getPath()); + const auto results = + AssertQueryBuilder(plan).split(split).copyResults(pool()); + + const auto testDir = TempDirectoryPath::create(); + const auto traceRoot = fmt::format("{}/{}", testDir->getPath(), "traceRoot"); + std::shared_ptr task; + auto traceResult = + AssertQueryBuilder(plan) + .config(core::QueryConfig::kQueryTraceEnabled, true) + .config(core::QueryConfig::kQueryTraceDir, traceRoot) + .config(core::QueryConfig::kQueryTraceMaxBytes, 100UL << 30) + .config(core::QueryConfig::kQueryTraceTaskRegExp, ".*") + .config(core::QueryConfig::kQueryTraceNodeIds, traceNodeId_) + .split(split) + .copyResults(pool(), task); + + assertEqualResults({results}, {traceResult}); + + const auto taskId = task->taskId(); + const auto replayingResult = + TableScanReplayer(traceRoot, task->taskId(), traceNodeId_, 0, "TableScan") + .run(); + assertEqualResults({results}, {replayingResult}); +} +} // namespace facebook::velox::tool::trace::test