Skip to content

Commit

Permalink
fix wrong maxRowsToInsert and sort time metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Aug 14, 2024
1 parent fc7f9cd commit 7977c7d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
10 changes: 6 additions & 4 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}

arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
ScopedTimer timer(&sortTime_);
// Serialize [begin, end)
uint64_t offset = 0;
char* addr;
Expand All @@ -289,6 +288,8 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
begin = index;
offset = 0;
}
// Count copy row time into sortTime_.
ScopedTimer timer(&sortTime_);
gluten::fastCopy(rawBuffer_ + offset, addr, size);
offset += size;
index++;
Expand All @@ -303,17 +304,18 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
return arrow::Status::OK();
}

uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) {
uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t remainingRows) {
// Check how many rows can be handled.
if (pages_.empty()) {
return 0;
}
auto remainingBytes = pages_.back()->size() - pageCursor_;
if (fixedRowSize_) {
return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), rows);
return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), remainingRows);
}
auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes);
auto bytesWritten = rowSizePrefixSum_[offset];
auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes + bytesWritten);
return iter - beginIter;
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);

uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows);
uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);

void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);

Expand Down
37 changes: 26 additions & 11 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,22 @@ TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), {{blockPid1}, {blockPid2}});
}

TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
if (GetParam().shuffleWriterType != kSortShuffle) {
return;
}
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());

// Set memLimit to 0 to force allocate a new buffer for each row.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));

auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8}});
auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9}});
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), {{blockPid1}, {blockPid2}});
}

TEST_F(VeloxHashShuffleWriterMemoryTest, memoryLeak) {
ASSERT_NOT_OK(initShuffleWriterOptions());
std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<LimitedMemoryPool>();
shuffleWriterOptions_.bufferSize = 4;
Expand All @@ -425,7 +440,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
ASSERT_TRUE(pool->bytes_allocated() == 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
TEST_F(VeloxHashShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
ASSERT_NOT_OK(initShuffleWriterOptions());
std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<LimitedMemoryPool>(0);
shuffleWriterOptions_.bufferSize = 4;
Expand All @@ -438,7 +453,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
ASSERT_TRUE(status.IsOutOfMemory());
}

TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kInit) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
Expand Down Expand Up @@ -508,7 +523,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kInitSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
shuffleWriterOptions_.bufferSize = 4;
Expand All @@ -530,7 +545,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kSplit) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -552,7 +567,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -570,7 +585,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
for (const auto partitioning : {Partitioning::kSingle, Partitioning::kRoundRobin}) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = partitioning;
Expand All @@ -592,7 +607,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
}
}

TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -613,7 +628,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); }));
}

TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
TEST_F(VeloxHashShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand All @@ -635,7 +650,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
TEST_F(VeloxHashShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get());
Expand All @@ -657,7 +672,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_EQ(evicted, 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
TEST_F(VeloxHashShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferReallocThreshold = 1;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
Expand Down
9 changes: 6 additions & 3 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
inputVectorComplex_ = makeRowVector(childrenComplex_);
}

arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) {
arrow::Status splitRowVector(
VeloxShuffleWriter& shuffleWriter,
facebook::velox::RowVectorPtr vector,
int64_t memLimit = ShuffleWriter::kMinMemLimit) {
std::shared_ptr<ColumnarBatch> cb = std::make_shared<VeloxColumnarBatch>(vector);
return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit);
return shuffleWriter.write(cb, memLimit);
}

// Create multiple local dirs and join with comma.
Expand Down Expand Up @@ -533,7 +536,7 @@ class RoundRobinPartitioningShuffleWriter : public MultiplePartitioningShuffleWr
}
};

class VeloxShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public testing::Test {
class VeloxHashShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public testing::Test {
protected:
static void SetUpTestCase() {
facebook::velox::memory::MemoryManager::testingSetInstance({});
Expand Down

0 comments on commit 7977c7d

Please sign in to comment.