diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 1310d0227b7f..d014ddf6315c 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -64,9 +64,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter( ShuffleWriterOptions options, std::shared_ptr veloxPool, arrow::MemoryPool* pool) - : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool), - allocator_{std::make_unique(veloxPool_.get())}, - array_{SortArray{Allocator(allocator_.get())}} {} + : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool) {} arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr cb, int64_t memLimit) { ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb)); @@ -105,6 +103,7 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); + tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); return arrow::Status::OK(); } @@ -186,6 +185,7 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u auto size = row.serialize(i, currentPage_ + pageCursor_); array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; pageCursor_ += size; + VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } } @@ -231,17 +231,23 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { while (--numPages) { pages_.pop_front(); } - auto& page = pages_.front(); + auto& page = pages_.back(); + // Clear page for serialization. memset(page->asMutable(), 0, page->size()); + // currentPage_ should always point to the last page. + VELOX_CHECK(currentPage_ == page->asMutable()); + pageAddresses_.resize(1); pageAddresses_[0] = currentPage_; pageNumber_ = 0; pageCursor_ = 0; + // Reset and reallocate array_ to minimal size. offset_ = 0; array_.clear(); // Allocate array_ can trigger spill. array_.resize(initialSize_); + tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); } return arrow::Status::OK(); } @@ -263,14 +269,13 @@ std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t b } if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { - sortedBuffer_ = nullptr; sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(rawSize, veloxPool_.get()); } auto* rawBuffer = sortedBuffer_->asMutable(); uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - // size(size_t) | bytes + // size(RowSize) | bytes auto size = array_[i].second; memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); offset += sizeof(RowSizeType); @@ -305,31 +310,51 @@ void VeloxSortShuffleWriter::acquireNewBuffer(int64_t memLimit, uint64_t minSize auto size = std::max(std::min((uint64_t)memLimit >> 2, 64UL * 1024 * 1024), minSizeRequired); // Allocating new buffer can trigger spill. auto newBuffer = facebook::velox::AlignedBuffer::allocate(size, veloxPool_.get(), 0); + // If spill triggered, clear pages_. + if (offset_ == 0 && pages_.size() > 0) { + pageAddresses_.clear(); + pages_.clear(); + } + currentPage_ = newBuffer->asMutable(); + pageAddresses_.emplace_back(currentPage_); pages_.emplace_back(std::move(newBuffer)); + pageCursor_ = 0; pageNumber_ = pages_.size() - 1; - currentPage_ = pages_.back()->asMutable(); - pageAddresses_.emplace_back(currentPage_); + currenPageSize_ = pages_.back()->size(); } void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { auto arraySize = (uint32_t)array_.size(); - auto usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; - while (offset_ + rows > usableCapacity) { - arraySize <<= 1; - usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; - } - if (arraySize != array_.size()) { - auto newArray{SortArray{Allocator(allocator_.get())}}; - newArray.resize(arraySize); - if (offset_ > 0) { - std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + auto newSize = newArraySize(arraySize, rows); + if (newSize > arraySize) { + SortArray newArray; + // May trigger spill. + newArray.resize(newSize); + auto tmp = facebook::velox::AlignedBuffer::allocate(newSize * sizeof(ElementType), veloxPool_.get()); + // Check if already satisfies. + arraySize = (uint32_t)array_.size(); + if (newArraySize(arraySize, rows) > arraySize) { + if (offset_ > 0) { + std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + } array_.clear(); array_ = std::move(newArray); + tmp_ = tmp; } } } +uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) { + auto newSize = oldSize; + auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + while (offset_ + rows > usableCapacity) { + newSize <<= 1; + usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + } + return newSize; +} + int64_t VeloxSortShuffleWriter::peakBytesAllocated() const { return veloxPool_->peakBytes(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 45f54556081d..6d8ec47d7c92 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -85,13 +85,14 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); + uint32_t newArraySize(uint32_t oldSize, uint32_t rows); + using ElementType = std::pair; - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using SortArray = std::vector; - std::unique_ptr allocator_; // Stores compact row id -> row SortArray array_; + facebook::velox::BufferPtr tmp_; uint32_t offset_{0}; std::list pages_; @@ -99,6 +100,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { char* currentPage_; uint32_t pageNumber_; uint32_t pageCursor_; + // For debug. + uint32_t currenPageSize_; // FIXME: Use configuration to replace hardcode. uint32_t initialSize_ = 4096;