Skip to content

Commit

Permalink
misc: Add toString() to OutputBuffer stats
Browse files Browse the repository at this point in the history
  • Loading branch information
yingsu00 committed Nov 28, 2024
1 parent b3faec3 commit a5e42e1
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
37 changes: 36 additions & 1 deletion velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ class DestinationBuffer {
int64_t bytesSent{0};
int64_t rowsSent{0};
int64_t pagesSent{0};

std::string toString() const {
return fmt::format(
"[finished: {}, bytesBuffered: {}, rowsBuffered: {}, pagesBuffered: {}, bytesSent: {}, rowsSent: {}, pagesSent:{}]",
// "[{}, {}, {}, {}, {}, {}, {}]",
finished,
succinctBytes(bytesBuffered),
rowsBuffered,
pagesBuffered,
succinctBytes(bytesSent),
rowsSent,
pagesSent);
}
};

void enqueue(std::shared_ptr<SerializedPage> data);
Expand Down Expand Up @@ -254,7 +267,29 @@ class OutputBuffer {
/// Stats of the OutputBuffer's destinations.
std::vector<DestinationBuffer::Stats> buffersStats;

std::string toString() const;
std::string toString() const {
std::string destinationBufferStats;
if (!buffersStats.empty()) {
for (int i = 0; i < buffersStats.size(); i++) {
auto& destinationBufferStat = buffersStats[i];
destinationBufferStats +=
fmt::format(" D{}: {}\n", i, destinationBufferStat.toString());
}
}

return fmt::format(
"[bufferedBytes: {}, bufferedPages: {}, "
"totalBytesSent: {}, totalRowsSent: {}, totalPagesSent: {}, "
"averageBufferTimeMs: {}, numTopBuffers: {}]\n{}",
succinctBytes(bufferedBytes),
bufferedPages,
succinctBytes(totalBytesSent),
totalRowsSent,
totalPagesSent,
averageBufferTimeMs,
numTopBuffers,
destinationBufferStats);
}
};

OutputBuffer(
Expand Down
41 changes: 40 additions & 1 deletion velox/exec/tests/OutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#include <gtest/gtest.h>
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/OutputMatcher.h"
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Task.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/SerializedPageUtil.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"

using namespace facebook::velox;
Expand Down Expand Up @@ -1324,6 +1324,44 @@ TEST_P(AllOutputBufferManagerTest, outputBufferUtilization) {
verifyOutputBuffer(task, OutputBufferStatus::kFinished);
}

TEST_P(AllOutputBufferManagerTest, printOutputBufferStats) {
const vector_size_t vectorSize = 100;
const std::string taskId = std::to_string(folly::Random::rand32());
const int numDestinations = 4;
initializeTask(
taskId,
rowType_,
core::PartitionedOutputNode::Kind::kPartitioned,
numDestinations,
1);

const int numPages = numDestinations;
int totalNumRows = 0;
int totalBytes = 0;
for (int pageId = 0; pageId < numPages; ++pageId) {
const auto pageBytes = enqueue(taskId, pageId, rowType_, vectorSize);
fetchOneAndAck(taskId, pageId, 0);
}

const auto statsEnqueue = getStats(taskId);
OutputMatcher::compareOutputs(
::testing::UnitTest::GetInstance()->current_test_info()->name(),
statsEnqueue.toString(),
{{"\\[bufferedBytes: ([\\d.]+[KMGT]?[B]?), bufferedPages: (\\d+), totalBytesSent: ([\\d.]+[KMGT]?[B]?), totalRowsSent: (\\d+), totalPagesSent: (\\d+), averageBufferTimeMs: (\\d+), numTopBuffers: (\\d+)\\]"},
{"\\s*D0: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D1: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D2: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"},
{"\\s*D3: \\[finished: (true|false), bytesBuffered: ([\\d.]+[KMGT]?[B]?), rowsBuffered: (\\d+), pagesBuffered: (\\d+), bytesSent: ([\\d.]+[KMGT]?[B]?), rowsSent: (\\d+), pagesSent:(\\d+)\\]"}});

bufferManager_->updateOutputBuffers(taskId, numDestinations, true);
noMoreData(taskId);
for (int pageId = 0; pageId < numPages; ++pageId) {
fetchEndMarker(taskId, pageId, 1);
deleteResults(taskId, pageId);
}
bufferManager_->removeTask(taskId);
}

TEST_P(AllOutputBufferManagerTest, outputBufferStats) {
const vector_size_t vectorSize = 100;
const std::string taskId = std::to_string(folly::Random::rand32());
Expand Down Expand Up @@ -1355,6 +1393,7 @@ TEST_P(AllOutputBufferManagerTest, outputBufferStats) {
fetchOne(taskId, 0, pageId);
}
const auto statsEnqueue = getStats(taskId);
std::cout << statsEnqueue.toString();
ASSERT_EQ(statsEnqueue.buffersStats[0].pagesBuffered, 1);
ASSERT_EQ(statsEnqueue.buffersStats[0].rowsBuffered, vectorSize);
if (outputKind_ == core::PartitionedOutputNode::Kind::kBroadcast) {
Expand Down

0 comments on commit a5e42e1

Please sign in to comment.