Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 20, 2023
1 parent 8bde588 commit 1985b27
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 17 deletions.
15 changes: 12 additions & 3 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
auto& binaryBuf = dst[pid];

// use 32bit offset
auto dstOffsetBase = (BinaryArrayLengthBufferType*)(binaryBuf.offsetPtr) + partitionBufferIdxBase_[pid];
auto dstOffsetBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) + partitionBufferIdxBase_[pid];

auto valueOffset = binaryBuf.valueOffset;
auto dstValuePtr = binaryBuf.valuePtr + valueOffset;
Expand Down Expand Up @@ -1116,7 +1116,8 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
auto stringViewColumn = column->asFlatVector<facebook::velox::StringView>();
assert(stringViewColumn);

uint64_t binarySizeBytes = stringViewColumn->values()->size();
// uint64_t binarySizeBytes = stringViewColumn->values()->size();
uint64_t binarySizeBytes = 0;
for (auto& buffer : stringViewColumn->stringBuffers()) {
binarySizeBytes += buffer->size();
}
Expand Down Expand Up @@ -1489,9 +1490,10 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfBinaryArrayLengthBuffer));

// Skip Resize value buffer if the spill is triggered by resizing this split binary buffer.
// Only update length buffer ptr.
if (binaryArrayResizeState_.inResize && partitionId == binaryArrayResizeState_.partitionId &&
binaryIdx == binaryArrayResizeState_.binaryIdx) {
binaryBuf.offsetPtr = lengthBuffer->mutable_data();
binaryBuf.lengthPtr = lengthBuffer->mutable_data();
break;
}

Expand All @@ -1500,6 +1502,13 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of binary array is null."));
// Determine the new Size for value buffer.
auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize);
// If shrink is triggered by spill, and binary new size is larger, do not resize the buffer to avoid issuing
// another spill. Only update length buffer ptr.
if (evictState_ == EvictState::kUnevictable && newSize <= partition2BufferSize_[partitionId] &&
valueBufferSize >= valueBuffer->size()) {
binaryBuf.lengthPtr = lengthBuffer->mutable_data();
break;
}
auto valueOffset = 0;
// If preserve data, the new valueBufferSize should not be smaller than the current offset.
if (preserveData) {
Expand Down
8 changes: 4 additions & 4 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ class VeloxShuffleWriter final : public ShuffleWriter {

public:
struct BinaryBuf {
BinaryBuf(uint8_t* value, uint8_t* offset, uint64_t valueCapacityIn, uint64_t valueOffsetIn)
: valuePtr(value), offsetPtr(offset), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {}
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn, uint64_t valueOffsetIn)
: valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {}

BinaryBuf(uint8_t* value, uint8_t* offset, uint64_t valueCapacity) : BinaryBuf(value, offset, valueCapacity, 0) {}
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) : BinaryBuf(value, length, valueCapacity, 0) {}

BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {}

uint8_t* valuePtr;
uint8_t* offsetPtr;
uint8_t* lengthPtr;
uint64_t valueCapacity;
uint64_t valueOffset;
};
Expand Down
26 changes: 26 additions & 0 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <arrow/c/bridge.h>
#include <arrow/io/api.h>

#include <boost/stacktrace.hpp>

#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
Expand Down Expand Up @@ -636,6 +638,30 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_EQ(evicted, 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
shuffleWriterOptions_.buffer_realloc_threshold = 1;
auto delegated = shuffleWriterOptions_.memory_pool;
auto pool = SelfEvictedMemoryPool(delegated, false);
shuffleWriterOptions_.memory_pool = &pool;
auto shuffleWriter = createShuffleWriter();

pool.setEvictable(shuffleWriter.get());

// Split first input vector. Large average string length.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_));

// Evict cached payloads.
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->cachedPayloadSize(), &evicted));
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
// Set limited capacity.
ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
// Split second input vector. Large average string length.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_));
}));
ASSERT_NOT_OK(shuffleWriter->stop());
}

INSTANTIATE_TEST_SUITE_P(
VeloxShuffleWriteParam,
SinglePartitioningShuffleWriter,
Expand Down
22 changes: 13 additions & 9 deletions cpp/velox/utils/tests/MemoryPoolUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,19 @@ arrow::Status SelfEvictedMemoryPool::evict(int64_t size) {
int64_t actual;
RETURN_NOT_OK(evictable_->evictFixedSize(size, &actual));
if (size > capacity_ - pool_->bytes_allocated()) {
return arrow::Status::OutOfMemory(
"Failed to allocate after evict. Capacity: ",
capacity_,
", Requested: ",
size,
", Evicted: ",
actual,
", Allocated: ",
pool_->bytes_allocated());
if (failIfOOM_) {
return arrow::Status::OutOfMemory(
"Failed to allocate after evict. Capacity: ",
capacity_,
", Requested: ",
size,
", Evicted: ",
actual,
", Allocated: ",
pool_->bytes_allocated());
} else {
capacity_ = size + pool_->bytes_allocated();
}
}
bytesEvicted_ += actual;
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/utils/tests/MemoryPoolUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LimitedMemoryPool final : public arrow::MemoryPool {
*/
class SelfEvictedMemoryPool : public arrow::MemoryPool {
public:
explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool) : pool_(pool) {}
explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool, bool failIfOOM = true) : pool_(pool), failIfOOM_(failIfOOM) {}

bool checkEvict(int64_t newCapacity, std::function<void()> block);

Expand Down Expand Up @@ -89,6 +89,8 @@ class SelfEvictedMemoryPool : public arrow::MemoryPool {
arrow::Status evict(int64_t size);

arrow::MemoryPool* pool_;
bool failIfOOM_;

Evictable* evictable_;
int64_t capacity_{std::numeric_limits<int64_t>::max()};

Expand Down
52 changes: 52 additions & 0 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@

namespace gluten {

namespace {
std::string makeString(uint32_t length) {
static const std::string kLargeStringOf128Bytes =
"thisisalaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaargestringlengthmorethan16bytes";
std::string res{};
auto repeats = length / kLargeStringOf128Bytes.length();
while (repeats--) {
res.append(kLargeStringOf128Bytes);
}
if (auto remains = length % kLargeStringOf128Bytes.length()) {
res.append(kLargeStringOf128Bytes.substr(0, remains));
}
return res;
}
} // namespace

struct ShuffleTestParams {
PartitionWriterType partition_writer_type;
arrow::Compression::type compression_type;
Expand Down Expand Up @@ -110,9 +127,38 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
makeFlatVector<facebook::velox::StringView>({"alice", ""}),
};

largeString1_ = makeString(1024);
childrenLargeBinary1_ = {
makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
makeFlatVector<int32_t>(std::vector<int32_t>(4096, 0)),
makeFlatVector<int64_t>(std::vector<int64_t>(4096, 0)),
makeFlatVector<float>(std::vector<float>(4096, 0)),
makeFlatVector<bool>(std::vector<bool>(4096, true)),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(4096, largeString1_.c_str())),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(4096, std::nullopt)),
};
largeString2_ = makeString(4096);
auto vectorToSpill = childrenLargeBinary2_ = {
makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
makeFlatVector<int32_t>(std::vector<int32_t>(2048, 0)),
makeFlatVector<int64_t>(std::vector<int64_t>(2048, 0)),
makeFlatVector<float>(std::vector<float>(2048, 0)),
makeFlatVector<bool>(std::vector<bool>(2048, true)),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(2048, largeString2_.c_str())),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(2048, std::nullopt)),
};

inputVector1_ = makeRowVector(children1_);
inputVector2_ = makeRowVector(children2_);
inputVectorNoNull_ = makeRowVector(childrenNoNull_);
inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
}

arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) {
Expand Down Expand Up @@ -151,10 +197,16 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
std::vector<facebook::velox::VectorPtr> children1_;
std::vector<facebook::velox::VectorPtr> children2_;
std::vector<facebook::velox::VectorPtr> childrenNoNull_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;

facebook::velox::RowVectorPtr inputVector1_;
facebook::velox::RowVectorPtr inputVector2_;
facebook::velox::RowVectorPtr inputVectorNoNull_;
std::string largeString1_;
std::string largeString2_;
facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
};

class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
Expand Down

0 comments on commit 1985b27

Please sign in to comment.