Skip to content

Commit

Permalink
[VL] Avoid shrinking the binary buffer which triggers spill (#3663)
Browse files Browse the repository at this point in the history
If spill is triggered by resizing binary partition buffer, should avoid shrinking itself during reclaiming memory from shrinking partition buffers. Because shrinking the buffer can possibly invalid the original underlying ptr.
  • Loading branch information
marin-ma authored Nov 27, 2023
1 parent 1e65165 commit a192737
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 22 deletions.
48 changes: 40 additions & 8 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ class EvictGuard {
EvictState& evictState_;
};

class BinaryArrayResizeGuard {
public:
explicit BinaryArrayResizeGuard(BinaryArrayResizeState& state) : state_(state) {
state_.inResize = true;
}

~BinaryArrayResizeGuard() {
state_.inResize = false;
}

private:
BinaryArrayResizeState& state_;
};

template <facebook::velox::TypeKind kind>
arrow::Status collectFlatVectorBuffer(
facebook::velox::BaseVector* vector,
Expand Down Expand Up @@ -910,7 +924,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
auto& binaryBuf = dst[pid];

// use 32bit offset
auto dstOffsetBase = (BinaryArrayLengthBufferType*)(binaryBuf.offsetPtr) + partitionBufferIdxBase_[pid];
auto dstOffsetBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) + partitionBufferIdxBase_[pid];

auto valueOffset = binaryBuf.valueOffset;
auto dstValuePtr = binaryBuf.valuePtr + valueOffset;
Expand All @@ -936,10 +950,12 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
capacity = capacity + std::max((capacity >> multiply), (uint64_t)stringLen);
multiply = std::min(3, multiply + 1);

auto valueBuffer = std::static_pointer_cast<arrow::ResizableBuffer>(
partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][pid][kBinaryValueBufferIndex]);

RETURN_NOT_OK(valueBuffer->Reserve(capacity));
const auto& valueBuffer = partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][pid][kBinaryValueBufferIndex];
{
binaryArrayResizeState_ = BinaryArrayResizeState{pid, binaryIdx};
BinaryArrayResizeGuard guard(binaryArrayResizeState_);
RETURN_NOT_OK(valueBuffer->Reserve(capacity));
}

binaryBuf.valuePtr = valueBuffer->mutable_data();
binaryBuf.valueCapacity = capacity;
Expand Down Expand Up @@ -1100,7 +1116,8 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
auto stringViewColumn = column->asFlatVector<facebook::velox::StringView>();
assert(stringViewColumn);

uint64_t binarySizeBytes = stringViewColumn->values()->size();
// uint64_t binarySizeBytes = stringViewColumn->values()->size();
uint64_t binarySizeBytes = 0;
for (auto& buffer : stringViewColumn->stringBuffers()) {
binarySizeBytes += buffer->size();
}
Expand Down Expand Up @@ -1466,17 +1483,32 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
// Resize length buffer.
auto binaryIdx = i - fixedWidthColumnCount_;
auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId];
auto& lengthBuffer = buffers[kBinaryLengthBufferIndex];
ARROW_RETURN_IF(!lengthBuffer, arrow::Status::Invalid("Offset buffer of binary array is null."));
RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfBinaryArrayLengthBuffer));

// Skip Resize value buffer if the spill is triggered by resizing this split binary buffer.
// Only update length buffer ptr.
if (binaryArrayResizeState_.inResize && partitionId == binaryArrayResizeState_.partitionId &&
binaryIdx == binaryArrayResizeState_.binaryIdx) {
binaryBuf.lengthPtr = lengthBuffer->mutable_data();
break;
}

// Resize value buffer.
auto binaryIdx = i - fixedWidthColumnCount_;
auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId];
auto& valueBuffer = buffers[kBinaryValueBufferIndex];
ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of binary array is null."));
// Determine the new Size for value buffer.
auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize);
// If shrink is triggered by spill, and binary new size is larger, do not resize the buffer to avoid issuing
// another spill. Only update length buffer ptr.
if (evictState_ == EvictState::kUnevictable && newSize <= partition2BufferSize_[partitionId] &&
valueBufferSize >= valueBuffer->size()) {
binaryBuf.lengthPtr = lengthBuffer->mutable_data();
break;
}
auto valueOffset = 0;
// If preserve data, the new valueBufferSize should not be smaller than the current offset.
if (preserveData) {
Expand Down
20 changes: 16 additions & 4 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ namespace gluten {
enum SplitState { kInit, kPreAlloc, kSplit, kStop };
enum EvictState { kEvictable, kUnevictable };

struct BinaryArrayResizeState {
bool inResize;
uint32_t partitionId;
uint32_t binaryIdx;

BinaryArrayResizeState() : inResize(false) {}
BinaryArrayResizeState(uint32_t partitionId, uint32_t binaryIdx)
: inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {}
};

class VeloxShuffleWriter final : public ShuffleWriter {
enum {
kValidityBufferIndex = 0,
Expand All @@ -100,15 +110,15 @@ class VeloxShuffleWriter final : public ShuffleWriter {

public:
struct BinaryBuf {
BinaryBuf(uint8_t* value, uint8_t* offset, uint64_t valueCapacityIn, uint64_t valueOffsetIn)
: valuePtr(value), offsetPtr(offset), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {}
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn, uint64_t valueOffsetIn)
: valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {}

BinaryBuf(uint8_t* value, uint8_t* offset, uint64_t valueCapacity) : BinaryBuf(value, offset, valueCapacity, 0) {}
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) : BinaryBuf(value, length, valueCapacity, 0) {}

BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {}

uint8_t* valuePtr;
uint8_t* offsetPtr;
uint8_t* lengthPtr;
uint64_t valueCapacity;
uint64_t valueOffset;
};
Expand Down Expand Up @@ -323,6 +333,8 @@ class VeloxShuffleWriter final : public ShuffleWriter {

EvictState evictState_{kEvictable};

BinaryArrayResizeState binaryArrayResizeState_{};

bool supportAvx512_ = false;

// store arrow column types
Expand Down
26 changes: 26 additions & 0 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <arrow/c/bridge.h>
#include <arrow/io/api.h>

#include <boost/stacktrace.hpp>

#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/CelebornPartitionWriter.h"
Expand Down Expand Up @@ -636,6 +638,30 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_EQ(evicted, 0);
}

TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
shuffleWriterOptions_.buffer_realloc_threshold = 1;
auto delegated = shuffleWriterOptions_.memory_pool;
auto pool = SelfEvictedMemoryPool(delegated, false);
shuffleWriterOptions_.memory_pool = &pool;
auto shuffleWriter = createShuffleWriter();

pool.setEvictable(shuffleWriter.get());

// Split first input vector. Large average string length.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_));

// Evict cached payloads.
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->cachedPayloadSize(), &evicted));
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
// Set limited capacity.
ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
// Split second input vector. Large average string length.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_));
}));
ASSERT_NOT_OK(shuffleWriter->stop());
}

INSTANTIATE_TEST_SUITE_P(
VeloxShuffleWriteParam,
SinglePartitioningShuffleWriter,
Expand Down
22 changes: 13 additions & 9 deletions cpp/velox/utils/tests/MemoryPoolUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,19 @@ arrow::Status SelfEvictedMemoryPool::evict(int64_t size) {
int64_t actual;
RETURN_NOT_OK(evictable_->evictFixedSize(size, &actual));
if (size > capacity_ - pool_->bytes_allocated()) {
return arrow::Status::OutOfMemory(
"Failed to allocate after evict. Capacity: ",
capacity_,
", Requested: ",
size,
", Evicted: ",
actual,
", Allocated: ",
pool_->bytes_allocated());
if (failIfOOM_) {
return arrow::Status::OutOfMemory(
"Failed to allocate after evict. Capacity: ",
capacity_,
", Requested: ",
size,
", Evicted: ",
actual,
", Allocated: ",
pool_->bytes_allocated());
} else {
capacity_ = size + pool_->bytes_allocated();
}
}
bytesEvicted_ += actual;
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/utils/tests/MemoryPoolUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LimitedMemoryPool final : public arrow::MemoryPool {
*/
class SelfEvictedMemoryPool : public arrow::MemoryPool {
public:
explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool) : pool_(pool) {}
explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool, bool failIfOOM = true) : pool_(pool), failIfOOM_(failIfOOM) {}

bool checkEvict(int64_t newCapacity, std::function<void()> block);

Expand Down Expand Up @@ -89,6 +89,8 @@ class SelfEvictedMemoryPool : public arrow::MemoryPool {
arrow::Status evict(int64_t size);

arrow::MemoryPool* pool_;
bool failIfOOM_;

Evictable* evictable_;
int64_t capacity_{std::numeric_limits<int64_t>::max()};

Expand Down
52 changes: 52 additions & 0 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@

namespace gluten {

namespace {
std::string makeString(uint32_t length) {
static const std::string kLargeStringOf128Bytes =
"thisisalaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaargestringlengthmorethan16bytes";
std::string res{};
auto repeats = length / kLargeStringOf128Bytes.length();
while (repeats--) {
res.append(kLargeStringOf128Bytes);
}
if (auto remains = length % kLargeStringOf128Bytes.length()) {
res.append(kLargeStringOf128Bytes.substr(0, remains));
}
return res;
}
} // namespace

struct ShuffleTestParams {
PartitionWriterType partition_writer_type;
arrow::Compression::type compression_type;
Expand Down Expand Up @@ -110,9 +127,38 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
makeFlatVector<facebook::velox::StringView>({"alice", ""}),
};

largeString1_ = makeString(1024);
childrenLargeBinary1_ = {
makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
makeFlatVector<int32_t>(std::vector<int32_t>(4096, 0)),
makeFlatVector<int64_t>(std::vector<int64_t>(4096, 0)),
makeFlatVector<float>(std::vector<float>(4096, 0)),
makeFlatVector<bool>(std::vector<bool>(4096, true)),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(4096, largeString1_.c_str())),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(4096, std::nullopt)),
};
largeString2_ = makeString(4096);
auto vectorToSpill = childrenLargeBinary2_ = {
makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
makeFlatVector<int32_t>(std::vector<int32_t>(2048, 0)),
makeFlatVector<int64_t>(std::vector<int64_t>(2048, 0)),
makeFlatVector<float>(std::vector<float>(2048, 0)),
makeFlatVector<bool>(std::vector<bool>(2048, true)),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(2048, largeString2_.c_str())),
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(2048, std::nullopt)),
};

inputVector1_ = makeRowVector(children1_);
inputVector2_ = makeRowVector(children2_);
inputVectorNoNull_ = makeRowVector(childrenNoNull_);
inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
}

arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) {
Expand Down Expand Up @@ -151,10 +197,16 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
std::vector<facebook::velox::VectorPtr> children1_;
std::vector<facebook::velox::VectorPtr> children2_;
std::vector<facebook::velox::VectorPtr> childrenNoNull_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;

facebook::velox::RowVectorPtr inputVector1_;
facebook::velox::RowVectorPtr inputVector2_;
facebook::velox::RowVectorPtr inputVectorNoNull_;
std::string largeString1_;
std::string largeString2_;
facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
};

class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
Expand Down

0 comments on commit a192737

Please sign in to comment.