Skip to content

Commit

Permalink
[GLUTEN-6822][VL] Fix wrong maxRowsToInsert and sort time metrics (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rong Ma authored and shamirchen committed Oct 14, 2024
1 parent 45457d8 commit 1f04cf9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 30 deletions.
36 changes: 21 additions & 15 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}

arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
ScopedTimer timer(&sortTime_);
// Count copy row time into sortTime_.
Timer sortTime{};
// Serialize [begin, end)
uint64_t offset = 0;
int64_t offset = 0;
char* addr;
uint32_t size;

Expand All @@ -278,42 +279,47 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
addr = pageAddresses_[pageIndex.first] + pageIndex.second;
size = *(RowSizeType*)addr;
if (offset + size > kSortedBufferSize) {
VELOX_CHECK(offset > 0);
auto payload = std::make_unique<InMemoryPayload>(
index - begin,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_, offset)});
updateSpillMetrics(payload);
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
sortTime.stop();
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
sortTime.start();
begin = index;
offset = 0;
}
gluten::fastCopy(rawBuffer_ + offset, addr, size);
offset += size;
index++;
}
sortTime.stop();
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));

sortTime_ += sortTime.realTimeUsed();
return arrow::Status::OK();
}

arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength) {
VELOX_CHECK(rawLength > 0);
auto payload = std::make_unique<InMemoryPayload>(
end - begin,
numRows,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_, offset)});
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_, rawLength)});
updateSpillMetrics(payload);
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
}

uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) {
uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t remainingRows) {
// Check how many rows can be handled.
if (pages_.empty()) {
return 0;
}
auto remainingBytes = pages_.back()->size() - pageCursor_;
if (fixedRowSize_) {
return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), rows);
return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), remainingRows);
}
auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes);
auto bytesWritten = rowSizePrefixSum_[offset];
auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes + bytesWritten);
return iter - beginIter;
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);

uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows);
arrow::Status evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength);

uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);

void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);

Expand Down
37 changes: 26 additions & 11 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,22 @@ TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), {{blockPid1}, {blockPid2}});
}

TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
if (GetParam().shuffleWriterType != kSortShuffle) {
return;
}
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());

// Set memLimit to 0 to force allocate a new buffer for each row.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));

auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8}});
auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9}});
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), {{blockPid1}, {blockPid2}});
}

TEST_F(VeloxHashShuffleWriterMemoryTest, memoryLeak) {
ASSERT_NOT_OK(initShuffleWriterOptions());
std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<LimitedMemoryPool>();
shuffleWriterOptions_.bufferSize = 4;
Expand All @@ -425,7 +440,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
ASSERT_TRUE(pool->bytes_allocated() == 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
TEST_F(VeloxHashShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
ASSERT_NOT_OK(initShuffleWriterOptions());
std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<LimitedMemoryPool>(0);
shuffleWriterOptions_.bufferSize = 4;
Expand All @@ -438,7 +453,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
ASSERT_TRUE(status.IsOutOfMemory());
}

TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kInit) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
Expand Down Expand Up @@ -508,7 +523,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kInitSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
shuffleWriterOptions_.bufferSize = 4;
Expand All @@ -530,7 +545,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kSplit) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -552,7 +567,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -570,7 +585,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
for (const auto partitioning : {Partitioning::kSingle, Partitioning::kRoundRobin}) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = partitioning;
Expand All @@ -592,7 +607,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
}
}

TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -613,7 +628,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); }));
}

TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
TEST_F(VeloxHashShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -635,7 +650,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get());
Expand All @@ -657,7 +672,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_EQ(evicted, 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
TEST_F(VeloxHashShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferReallocThreshold = 1;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand Down
9 changes: 6 additions & 3 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
inputVectorComplex_ = makeRowVector(childrenComplex_);
}

arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) {
arrow::Status splitRowVector(
VeloxShuffleWriter& shuffleWriter,
facebook::velox::RowVectorPtr vector,
int64_t memLimit = ShuffleWriter::kMinMemLimit) {
std::shared_ptr<ColumnarBatch> cb = std::make_shared<VeloxColumnarBatch>(vector);
return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit);
return shuffleWriter.write(cb, memLimit);
}

// Create multiple local dirs and join with comma.
Expand Down Expand Up @@ -533,7 +536,7 @@ class RoundRobinPartitioningShuffleWriter : public MultiplePartitioningShuffleWr
}
};

class VeloxShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public testing::Test {
class VeloxHashShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public testing::Test {
protected:
static void SetUpTestCase() {
facebook::velox::memory::MemoryManager::testingSetInstance({});
Expand Down

0 comments on commit 1f04cf9

Please sign in to comment.