diff --git a/velox/exec/OutputBuffer.h b/velox/exec/OutputBuffer.h index 2620693e6900d..4000d605e6bc9 100644 --- a/velox/exec/OutputBuffer.h +++ b/velox/exec/OutputBuffer.h @@ -115,6 +115,19 @@ class DestinationBuffer { int64_t bytesSent{0}; int64_t rowsSent{0}; int64_t pagesSent{0}; + + std::string toString() const { + return fmt::format( + "[finished: {}, bytesBuffered: {}, rowsBuffered: {}, pagesBuffered: {}, bytesSent: {}, rowsSent: {}, pagesSent:{}]", + // "[{}, {}, {}, {}, {}, {}, {}]", + finished, + succinctBytes(bytesBuffered), + rowsBuffered, + pagesBuffered, + succinctBytes(bytesSent), + rowsSent, + pagesSent); + } }; void enqueue(std::shared_ptr data); @@ -254,7 +267,29 @@ class OutputBuffer { /// Stats of the OutputBuffer's destinations. std::vector buffersStats; - std::string toString() const; + std::string toString() const { + std::string destinationBufferStats; + if (!buffersStats.empty()) { + for (int i = 0; i < buffersStats.size(); i++) { + auto& destinationBufferStat = buffersStats[i]; + destinationBufferStats += + fmt::format(" D{}: {}\n", i, destinationBufferStat.toString()); + } + } + + return fmt::format( + "[bufferedBytes: {}, bufferedPages: {}, " + "totalBytesSent: {}, totalRowsSent: {}, totalPagesSent: {}, " + "averageBufferTimeMs: {}, numTopBuffers: {}]\n{}", + succinctBytes(bufferedBytes), + bufferedPages, + succinctBytes(totalBytesSent), + totalRowsSent, + totalPagesSent, + averageBufferTimeMs, + numTopBuffers, + destinationBufferStats); + } }; OutputBuffer( diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index e341ae4329fe0..8d691a279bfa0 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -17,12 +17,12 @@ #include #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/OutputMatcher.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Task.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/SerializedPageUtil.h" #include "velox/serializers/CompactRowSerializer.h" -#include "velox/serializers/PrestoSerializer.h" #include "velox/serializers/UnsafeRowSerializer.h" using namespace facebook::velox; @@ -768,8 +768,8 @@ TEST_P(OutputBufferManagerWithDifferentSerdeKindsTest, destinationBuffer) { buffer.enqueue(makeSerializedPage(rowType_, 100)); } DestinationBuffer destinationBuffer; - auto buffers = - destinationBuffer.getData(1, 0, noNotify, [] { return true; }, &buffer); + auto buffers = destinationBuffer.getData( + 1, 0, noNotify, [] { return true; }, &buffer); ASSERT_TRUE(buffers.immediate); ASSERT_EQ(buffers.data.size(), 1); ASSERT_GT(buffers.data[0]->length(), 0); @@ -1324,6 +1324,44 @@ TEST_P(AllOutputBufferManagerTest, outputBufferUtilization) { verifyOutputBuffer(task, OutputBufferStatus::kFinished); } +TEST_P(AllOutputBufferManagerTest, printOutputBufferStats) { + const vector_size_t vectorSize = 100; + const std::string taskId = std::to_string(folly::Random::rand32()); + const int numDestinations = 4; + initializeTask( + taskId, + rowType_, + core::PartitionedOutputNode::Kind::kPartitioned, + numDestinations, + 1); + + const int numPages = numDestinations; + int totalNumRows = 0; + int totalBytes = 0; + for (int pageId = 0; pageId < numPages; ++pageId) { + const auto pageBytes = enqueue(taskId, pageId, rowType_, vectorSize); + fetchOneAndAck(taskId, pageId, 0); + } + + const auto statsEnqueue = getStats(taskId); + OutputMatcher::compareOutputs( + ::testing::UnitTest::GetInstance()->current_test_info()->name(), + statsEnqueue.toString(), + {{"\\[bufferedBytes: ([\\d.]+[KMGT]?[B]?), bufferedPages: (\\d+), totalBytesSent: ([\\d.]+[KMGT]?[B]?), totalRowsSent: (\\d+), totalPagesSent: (\\d+), averageBufferTimeMs: (\\d+), numTopBuffers: (\\d+)\\]"}, + {"\\s*D0: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}, + {"\\s*D1: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}, + {"\\s*D2: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}, + {"\\s*D3: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}}); + + bufferManager_->updateOutputBuffers(taskId, numDestinations, true); + noMoreData(taskId); + for (int pageId = 0; pageId < numPages; ++pageId) { + fetchEndMarker(taskId, pageId, 1); + deleteResults(taskId, pageId); + } + bufferManager_->removeTask(taskId); +} + TEST_P(AllOutputBufferManagerTest, outputBufferStats) { const vector_size_t vectorSize = 100; const std::string taskId = std::to_string(folly::Random::rand32()); @@ -1355,6 +1393,7 @@ TEST_P(AllOutputBufferManagerTest, outputBufferStats) { fetchOne(taskId, 0, pageId); } const auto statsEnqueue = getStats(taskId); + std::cout << statsEnqueue.toString(); ASSERT_EQ(statsEnqueue.buffersStats[0].pagesBuffered, 1); ASSERT_EQ(statsEnqueue.buffersStats[0].rowsBuffered, vectorSize); if (outputKind_ == core::PartitionedOutputNode::Kind::kBroadcast) {