From 1985b279b532e32ba98b68515dbd8ac3088dea66 Mon Sep 17 00:00:00 2001 From: "Ma, Rong" Date: Mon, 20 Nov 2023 17:33:57 +0800 Subject: [PATCH] add ut --- cpp/velox/shuffle/VeloxShuffleWriter.cc | 15 ++++-- cpp/velox/shuffle/VeloxShuffleWriter.h | 8 +-- cpp/velox/tests/VeloxShuffleWriterTest.cc | 26 ++++++++++ cpp/velox/utils/tests/MemoryPoolUtils.cc | 22 ++++---- cpp/velox/utils/tests/MemoryPoolUtils.h | 4 +- .../utils/tests/VeloxShuffleWriterTestBase.h | 52 +++++++++++++++++++ 6 files changed, 110 insertions(+), 17 deletions(-) diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index 36550ca75020..1e21d6f3f1fb 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -924,7 +924,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel auto& binaryBuf = dst[pid]; // use 32bit offset - auto dstOffsetBase = (BinaryArrayLengthBufferType*)(binaryBuf.offsetPtr) + partitionBufferIdxBase_[pid]; + auto dstOffsetBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) + partitionBufferIdxBase_[pid]; auto valueOffset = binaryBuf.valueOffset; auto dstValuePtr = binaryBuf.valuePtr + valueOffset; @@ -1116,7 +1116,8 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel auto stringViewColumn = column->asFlatVector(); assert(stringViewColumn); - uint64_t binarySizeBytes = stringViewColumn->values()->size(); + // uint64_t binarySizeBytes = stringViewColumn->values()->size(); + uint64_t binarySizeBytes = 0; for (auto& buffer : stringViewColumn->stringBuffers()) { binarySizeBytes += buffer->size(); } @@ -1489,9 +1490,10 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfBinaryArrayLengthBuffer)); // Skip Resize value buffer if the spill is triggered by resizing this split binary buffer. + // Only update length buffer ptr. if (binaryArrayResizeState_.inResize && partitionId == binaryArrayResizeState_.partitionId && binaryIdx == binaryArrayResizeState_.binaryIdx) { - binaryBuf.offsetPtr = lengthBuffer->mutable_data(); + binaryBuf.lengthPtr = lengthBuffer->mutable_data(); break; } @@ -1500,6 +1502,13 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of binary array is null.")); // Determine the new Size for value buffer. auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize); + // If shrink is triggered by spill, and binary new size is larger, do not resize the buffer to avoid issuing + // another spill. Only update length buffer ptr. + if (evictState_ == EvictState::kUnevictable && newSize <= partition2BufferSize_[partitionId] && + valueBufferSize >= valueBuffer->size()) { + binaryBuf.lengthPtr = lengthBuffer->mutable_data(); + break; + } auto valueOffset = 0; // If preserve data, the new valueBufferSize should not be smaller than the current offset. if (preserveData) { diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 23ba7d40673d..f257589fcd2d 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -110,15 +110,15 @@ class VeloxShuffleWriter final : public ShuffleWriter { public: struct BinaryBuf { - BinaryBuf(uint8_t* value, uint8_t* offset, uint64_t valueCapacityIn, uint64_t valueOffsetIn) - : valuePtr(value), offsetPtr(offset), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {} + BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn, uint64_t valueOffsetIn) + : valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {} - BinaryBuf(uint8_t* value, uint8_t* offset, uint64_t valueCapacity) : BinaryBuf(value, offset, valueCapacity, 0) {} + BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) : BinaryBuf(value, length, valueCapacity, 0) {} BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {} uint8_t* valuePtr; - uint8_t* offsetPtr; + uint8_t* lengthPtr; uint64_t valueCapacity; uint64_t valueOffset; }; diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 54d64beb2235..b8853ed93552 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -18,6 +18,8 @@ #include #include +#include + #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "shuffle/rss/CelebornPartitionWriter.h" @@ -636,6 +638,30 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) { ASSERT_EQ(evicted, 0); } +TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) { + shuffleWriterOptions_.buffer_realloc_threshold = 1; + auto delegated = shuffleWriterOptions_.memory_pool; + auto pool = SelfEvictedMemoryPool(delegated, false); + shuffleWriterOptions_.memory_pool = &pool; + auto shuffleWriter = createShuffleWriter(); + + pool.setEvictable(shuffleWriter.get()); + + // Split first input vector. Large average string length. + ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_)); + + // Evict cached payloads. + int64_t evicted; + ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->cachedPayloadSize(), &evicted)); + ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0); + // Set limited capacity. + ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { + // Split second input vector. Large average string length. + ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_)); + })); + ASSERT_NOT_OK(shuffleWriter->stop()); +} + INSTANTIATE_TEST_SUITE_P( VeloxShuffleWriteParam, SinglePartitioningShuffleWriter, diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.cc b/cpp/velox/utils/tests/MemoryPoolUtils.cc index 21ca464b47fd..4a917a4d43e9 100644 --- a/cpp/velox/utils/tests/MemoryPoolUtils.cc +++ b/cpp/velox/utils/tests/MemoryPoolUtils.cc @@ -133,15 +133,19 @@ arrow::Status SelfEvictedMemoryPool::evict(int64_t size) { int64_t actual; RETURN_NOT_OK(evictable_->evictFixedSize(size, &actual)); if (size > capacity_ - pool_->bytes_allocated()) { - return arrow::Status::OutOfMemory( - "Failed to allocate after evict. Capacity: ", - capacity_, - ", Requested: ", - size, - ", Evicted: ", - actual, - ", Allocated: ", - pool_->bytes_allocated()); + if (failIfOOM_) { + return arrow::Status::OutOfMemory( + "Failed to allocate after evict. Capacity: ", + capacity_, + ", Requested: ", + size, + ", Evicted: ", + actual, + ", Allocated: ", + pool_->bytes_allocated()); + } else { + capacity_ = size + pool_->bytes_allocated(); + } } bytesEvicted_ += actual; } diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.h b/cpp/velox/utils/tests/MemoryPoolUtils.h index a49fc02d7981..4fb88a117f11 100644 --- a/cpp/velox/utils/tests/MemoryPoolUtils.h +++ b/cpp/velox/utils/tests/MemoryPoolUtils.h @@ -59,7 +59,7 @@ class LimitedMemoryPool final : public arrow::MemoryPool { */ class SelfEvictedMemoryPool : public arrow::MemoryPool { public: - explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool) : pool_(pool) {} + explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool, bool failIfOOM = true) : pool_(pool), failIfOOM_(failIfOOM) {} bool checkEvict(int64_t newCapacity, std::function block); @@ -89,6 +89,8 @@ class SelfEvictedMemoryPool : public arrow::MemoryPool { arrow::Status evict(int64_t size); arrow::MemoryPool* pool_; + bool failIfOOM_; + Evictable* evictable_; int64_t capacity_{std::numeric_limits::max()}; diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 0fb10faac655..812eb319c760 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -30,6 +30,23 @@ namespace gluten { +namespace { +std::string makeString(uint32_t length) { + static const std::string kLargeStringOf128Bytes = + "thisisalaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaargestringlengthmorethan16bytes"; + std::string res{}; + auto repeats = length / kLargeStringOf128Bytes.length(); + while (repeats--) { + res.append(kLargeStringOf128Bytes); + } + if (auto remains = length % kLargeStringOf128Bytes.length()) { + res.append(kLargeStringOf128Bytes.substr(0, remains)); + } + return res; +} +} // namespace + struct ShuffleTestParams { PartitionWriterType partition_writer_type; arrow::Compression::type compression_type; @@ -110,9 +127,38 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase makeFlatVector({"alice", ""}), }; + largeString1_ = makeString(1024); + childrenLargeBinary1_ = { + makeFlatVector(std::vector(4096, 0)), + makeFlatVector(std::vector(4096, 0)), + makeFlatVector(std::vector(4096, 0)), + makeFlatVector(std::vector(4096, 0)), + makeFlatVector(std::vector(4096, 0)), + makeFlatVector(std::vector(4096, true)), + makeNullableFlatVector( + std::vector>(4096, largeString1_.c_str())), + makeNullableFlatVector( + std::vector>(4096, std::nullopt)), + }; + largeString2_ = makeString(4096); + auto vectorToSpill = childrenLargeBinary2_ = { + makeFlatVector(std::vector(2048, 0)), + makeFlatVector(std::vector(2048, 0)), + makeFlatVector(std::vector(2048, 0)), + makeFlatVector(std::vector(2048, 0)), + makeFlatVector(std::vector(2048, 0)), + makeFlatVector(std::vector(2048, true)), + makeNullableFlatVector( + std::vector>(2048, largeString2_.c_str())), + makeNullableFlatVector( + std::vector>(2048, std::nullopt)), + }; + inputVector1_ = makeRowVector(children1_); inputVector2_ = makeRowVector(children2_); inputVectorNoNull_ = makeRowVector(childrenNoNull_); + inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_); + inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_); } arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) { @@ -151,10 +197,16 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase std::vector children1_; std::vector children2_; std::vector childrenNoNull_; + std::vector childrenLargeBinary1_; + std::vector childrenLargeBinary2_; facebook::velox::RowVectorPtr inputVector1_; facebook::velox::RowVectorPtr inputVector2_; facebook::velox::RowVectorPtr inputVectorNoNull_; + std::string largeString1_; + std::string largeString2_; + facebook::velox::RowVectorPtr inputVectorLargeBinary1_; + facebook::velox::RowVectorPtr inputVectorLargeBinary2_; }; class VeloxShuffleWriterTest : public ::testing::TestWithParam, public VeloxShuffleWriterTestBase {