Skip to content

Commit

Permalink
fake array_ allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 25, 2024
1 parent f24c410 commit 501a116
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
61 changes: 43 additions & 18 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter(
ShuffleWriterOptions options,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
arrow::MemoryPool* pool)
: VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool),
allocator_{std::make_unique<facebook::velox::HashStringAllocator>(veloxPool_.get())},
array_{SortArray{Allocator(allocator_.get())}} {}
: VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool) {}

arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) {
ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb));
Expand Down Expand Up @@ -105,6 +103,7 @@ arrow::Status VeloxSortShuffleWriter::init() {
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
array_.resize(initialSize_);
tmp_ = facebook::velox::AlignedBuffer::allocate<char>(initialSize_ * sizeof(ElementType), veloxPool_.get());
return arrow::Status::OK();
}

Expand Down Expand Up @@ -186,6 +185,7 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u
auto size = row.serialize(i, currentPage_ + pageCursor_);
array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size};
pageCursor_ += size;
VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
}
}

Expand Down Expand Up @@ -231,17 +231,23 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
while (--numPages) {
pages_.pop_front();
}
auto& page = pages_.front();
auto& page = pages_.back();
// Clear page for serialization.
memset(page->asMutable<char>(), 0, page->size());
// currentPage_ should always point to the last page.
VELOX_CHECK(currentPage_ == page->asMutable<char>());

pageAddresses_.resize(1);
pageAddresses_[0] = currentPage_;
pageNumber_ = 0;
pageCursor_ = 0;

// Reset and reallocate array_ to minimal size.
offset_ = 0;
array_.clear();
// Allocate array_ can trigger spill.
array_.resize(initialSize_);
tmp_ = facebook::velox::AlignedBuffer::allocate<char>(initialSize_ * sizeof(ElementType), veloxPool_.get());
}
return arrow::Status::OK();
}
Expand All @@ -263,14 +269,13 @@ std::unique_ptr<InMemoryPayload> VeloxSortShuffleWriter::prepareToEvict(size_t b
}

if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) {
sortedBuffer_ = nullptr;
sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(rawSize, veloxPool_.get());
}
auto* rawBuffer = sortedBuffer_->asMutable<char>();

uint64_t offset = 0;
for (auto i = begin; i < end; ++i) {
// size(size_t) | bytes
// size(RowSize) | bytes
auto size = array_[i].second;
memcpy(rawBuffer + offset, &size, sizeof(RowSizeType));
offset += sizeof(RowSizeType);
Expand Down Expand Up @@ -305,31 +310,51 @@ void VeloxSortShuffleWriter::acquireNewBuffer(int64_t memLimit, uint64_t minSize
auto size = std::max(std::min((uint64_t)memLimit >> 2, 64UL * 1024 * 1024), minSizeRequired);
// Allocating new buffer can trigger spill.
auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, veloxPool_.get(), 0);
// If spill triggered, clear pages_.
if (offset_ == 0 && pages_.size() > 0) {
pageAddresses_.clear();
pages_.clear();
}
currentPage_ = newBuffer->asMutable<char>();
pageAddresses_.emplace_back(currentPage_);
pages_.emplace_back(std::move(newBuffer));

pageCursor_ = 0;
pageNumber_ = pages_.size() - 1;
currentPage_ = pages_.back()->asMutable<char>();
pageAddresses_.emplace_back(currentPage_);
currenPageSize_ = pages_.back()->size();
}

void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
auto arraySize = (uint32_t)array_.size();
auto usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize;
while (offset_ + rows > usableCapacity) {
arraySize <<= 1;
usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize;
}
if (arraySize != array_.size()) {
auto newArray{SortArray{Allocator(allocator_.get())}};
newArray.resize(arraySize);
if (offset_ > 0) {
std::copy(array_.begin(), array_.begin() + offset_, newArray.begin());
auto newSize = newArraySize(arraySize, rows);
if (newSize > arraySize) {
SortArray newArray;
// May trigger spill.
newArray.resize(newSize);
auto tmp = facebook::velox::AlignedBuffer::allocate<char>(newSize * sizeof(ElementType), veloxPool_.get());
// Check if already satisfies.
arraySize = (uint32_t)array_.size();
if (newArraySize(arraySize, rows) > arraySize) {
if (offset_ > 0) {
std::copy(array_.begin(), array_.begin() + offset_, newArray.begin());
}
array_.clear();
array_ = std::move(newArray);
tmp_ = tmp;
}
}
}

uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) {
auto newSize = oldSize;
auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize;
while (offset_ + rows > usableCapacity) {
newSize <<= 1;
usableCapacity = useRadixSort_ ? newSize / 2 : newSize;
}
return newSize;
}

int64_t VeloxSortShuffleWriter::peakBytesAllocated() const {
return veloxPool_->peakBytes();
}
Expand Down
9 changes: 6 additions & 3 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,23 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

void growArrayIfNecessary(uint32_t rows);

uint32_t newArraySize(uint32_t oldSize, uint32_t rows);

using ElementType = std::pair<uint64_t, RowSizeType>;
using Allocator = facebook::velox::StlAllocator<ElementType>;
using SortArray = std::vector<ElementType, Allocator>;
using SortArray = std::vector<ElementType>;

std::unique_ptr<facebook::velox::HashStringAllocator> allocator_;
// Stores compact row id -> row
SortArray array_;
facebook::velox::BufferPtr tmp_;
uint32_t offset_{0};

std::list<facebook::velox::BufferPtr> pages_;
std::vector<char*> pageAddresses_;
char* currentPage_;
uint32_t pageNumber_;
uint32_t pageCursor_;
// For debug.
uint32_t currenPageSize_;

// FIXME: Use configuration to replace hardcode.
uint32_t initialSize_ = 4096;
Expand Down

0 comments on commit 501a116

Please sign in to comment.