Skip to content

Commit

Permalink
reduce memory waste
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Aug 6, 2024
1 parent d746f0f commit 0083a40
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
40 changes: 23 additions & 17 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ arrow::Status VeloxSortShuffleWriter::init() {
ARROW_RETURN_IF(
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
initArray();
allocateMinimalArray();
sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get());
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
return arrow::Status::OK();
Expand Down Expand Up @@ -260,7 +260,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
pageCursor_ = 0;

// Reset and reallocate array_ to minimal size. Allocate array_ can trigger spill.
initArray();
allocateMinimalArray();
}
return arrow::Status::OK();
}
Expand Down Expand Up @@ -318,19 +318,24 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows)
void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired) {
auto size = std::max(std::min<uint64_t>(memLimit >> 2, 64UL * 1024 * 1024), minSizeRequired);
// Allocating new buffer can trigger spill.
auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, veloxPool_.get(), 0);
auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, veloxPool_.get());
auto newBufferSize = newBuffer->capacity();
newBuffer->setSize(newBufferSize);

currentPage_ = newBuffer->asMutable<char>();
currenPageSize_ = newBufferSize;
memset(currentPage_, 0, newBufferSize);

// If spill triggered, clear pages_.
if (offset_ == 0 && pages_.size() > 0) {
pageAddresses_.clear();
pages_.clear();
}
currentPage_ = newBuffer->asMutable<char>();
pageAddresses_.emplace_back(currentPage_);
pages_.emplace_back(std::move(newBuffer));

pageCursor_ = 0;
pageNumber_ = pages_.size() - 1;
currenPageSize_ = pages_.back()->size();
}

void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
Expand All @@ -341,14 +346,10 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
auto newArray = facebook::velox::AlignedBuffer::allocate<char>(newSizeBytes, veloxPool_.get());
// Check if already satisfies.
if (newArraySize(rows) > arraySize_) {
auto newPtr = newArray->asMutable<uint64_t>();
if (offset_ > 0) {
gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t));
gluten::fastCopy(newArray->asMutable<void>(), arrayPtr_, offset_ * sizeof(uint64_t));
}
arraySize_ = newSize;
arrayPtr_ = newPtr;
array_.reset();
array_.swap(newArray);
setUpArray(std::move(newArray));
}
}
}
Expand All @@ -363,9 +364,13 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) {
return newSize;
}

void VeloxSortShuffleWriter::initArray() {
arraySize_ = options_.sortBufferInitialSize;
array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ * sizeof(uint64_t), veloxPool_.get());
void VeloxSortShuffleWriter::setUpArray(facebook::velox::BufferPtr&& array) {
array_.reset();
array_ = std::move(array);
// Capacity is a multiple of 8 (bytes).
auto capacity = array_->capacity() & 0xfffffff8;
array_->setSize(capacity);
arraySize_ = capacity >> 3;
arrayPtr_ = array_->asMutable<uint64_t>();
}

Expand All @@ -381,8 +386,9 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const {
return c2rTime_;
}

int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
// No same values.
return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1;
void VeloxSortShuffleWriter::allocateMinimalArray() {
auto array = facebook::velox::AlignedBuffer::allocate<char>(
options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
setUpArray(std::move(array));
}
} // namespace gluten
4 changes: 2 additions & 2 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

uint32_t newArraySize(uint32_t rows);

void initArray();
void setUpArray(facebook::velox::BufferPtr&& array);

static int compare(const void* a, const void* b);
void allocateMinimalArray();

// Stores compact row id -> row
facebook::velox::BufferPtr array_;
Expand Down

0 comments on commit 0083a40

Please sign in to comment.