Skip to content

Commit

Permalink
[VL] Reduce memory waste in sort based shuffle (#6727)
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma authored Aug 8, 2024
1 parent 54d532e commit 531e82a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 21 deletions.
52 changes: 33 additions & 19 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ arrow::Status VeloxSortShuffleWriter::init() {
ARROW_RETURN_IF(
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
initArray();
allocateMinimalArray();
sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get());
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
return arrow::Status::OK();
Expand Down Expand Up @@ -260,7 +260,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
pageCursor_ = 0;

// Reset and reallocate array_ to minimal size. Allocate array_ can trigger spill.
initArray();
allocateMinimalArray();
}
return arrow::Status::OK();
}
Expand Down Expand Up @@ -316,21 +316,34 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows)
}

void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired) {
auto size = std::max(std::min<uint64_t>(memLimit >> 2, 64UL * 1024 * 1024), minSizeRequired);
DLOG_IF(INFO, !pages_.empty()) << "Acquire new buffer. current capacity: " << pages_.back()->capacity()
<< ", size: " << pages_.back()->size() << ", pageCursor: " << pageCursor_
<< ", unused: " << pages_.back()->capacity() - pageCursor_;
auto size = std::max(
std::min<uint64_t>(
std::max<uint64_t>(memLimit >> 2, facebook::velox::AlignedBuffer::kPaddedSize), 64UL * 1024 * 1024) -
facebook::velox::AlignedBuffer::kPaddedSize,
minSizeRequired);
// Allocating new buffer can trigger spill.
auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, veloxPool_.get(), 0);
auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, veloxPool_.get());
DLOG(INFO) << "Allocated new buffer. capacity: " << newBuffer->capacity() << ", size: " << newBuffer->size();
auto newBufferSize = newBuffer->capacity();
newBuffer->setSize(newBufferSize);

currentPage_ = newBuffer->asMutable<char>();
currenPageSize_ = newBufferSize;
memset(currentPage_, 0, newBufferSize);

// If spill triggered, clear pages_.
if (offset_ == 0 && pages_.size() > 0) {
pageAddresses_.clear();
pages_.clear();
}
currentPage_ = newBuffer->asMutable<char>();
pageAddresses_.emplace_back(currentPage_);
pages_.emplace_back(std::move(newBuffer));

pageCursor_ = 0;
pageNumber_ = pages_.size() - 1;
currenPageSize_ = pages_.back()->size();
}

void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
Expand All @@ -339,16 +352,12 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
// May trigger spill.
auto newSizeBytes = newSize * sizeof(uint64_t);
auto newArray = facebook::velox::AlignedBuffer::allocate<char>(newSizeBytes, veloxPool_.get());
// Check if already satisfies.
// Check if already satisfies (spill has been triggered).
if (newArraySize(rows) > arraySize_) {
auto newPtr = newArray->asMutable<uint64_t>();
if (offset_ > 0) {
gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t));
gluten::fastCopy(newArray->asMutable<void>(), arrayPtr_, offset_ * sizeof(uint64_t));
}
arraySize_ = newSize;
arrayPtr_ = newPtr;
array_.reset();
array_.swap(newArray);
setUpArray(std::move(newArray));
}
}
}
Expand All @@ -363,9 +372,13 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) {
return newSize;
}

void VeloxSortShuffleWriter::initArray() {
arraySize_ = options_.sortBufferInitialSize;
array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ * sizeof(uint64_t), veloxPool_.get());
void VeloxSortShuffleWriter::setUpArray(facebook::velox::BufferPtr&& array) {
array_.reset();
array_ = std::move(array);
// Capacity is a multiple of 8 (bytes).
auto capacity = array_->capacity() & 0xfffffff8;
array_->setSize(capacity);
arraySize_ = capacity >> 3;
arrayPtr_ = array_->asMutable<uint64_t>();
}

Expand All @@ -381,8 +394,9 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const {
return c2rTime_;
}

int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
// No same values.
return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1;
void VeloxSortShuffleWriter::allocateMinimalArray() {
auto array = facebook::velox::AlignedBuffer::allocate<char>(
options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
setUpArray(std::move(array));
}
} // namespace gluten
4 changes: 2 additions & 2 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

uint32_t newArraySize(uint32_t rows);

void initArray();
void setUpArray(facebook::velox::BufferPtr&& array);

static int compare(const void* a, const void* b);
void allocateMinimalArray();

// Stores compact row id -> row
facebook::velox::BufferPtr array_;
Expand Down

0 comments on commit 531e82a

Please sign in to comment.