From 962fc8068c5e3bdfc9112b438b336a4bc764e5b3 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 26 Jul 2024 08:38:29 +0000 Subject: [PATCH] prealloc sortedBuffer --- cpp/velox/shuffle/RadixSort.h | 6 +- cpp/velox/shuffle/VeloxShuffleReader.cc | 4 +- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 121 +++++++++++--------- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 13 +-- 4 files changed, 75 insertions(+), 69 deletions(-) diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h index 89b4a45cb884a..17f05d349c8e2 100644 --- a/cpp/velox/shuffle/RadixSort.h +++ b/cpp/velox/shuffle/RadixSort.h @@ -86,7 +86,7 @@ class RadixSort { auto offsets = transformCountsToOffsets(counts, outIndex); for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { - auto bucket = (array[offset].value >> (byteIdx * 8)) & 0xff; + auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff; array[offsets[bucket]++] = array[offset]; } } @@ -112,7 +112,7 @@ class RadixSort { int64_t bitwiseMax = 0; int64_t bitwiseMin = -1L; for (auto offset = 0; offset < numRecords; ++offset) { - auto value = array[offset].value; + auto value = array[offset]; bitwiseMax |= value; bitwiseMin &= value; } @@ -123,7 +123,7 @@ class RadixSort { if (((bitsChanged >> (i * 8)) & 0xff) != 0) { counts[i].resize(256); for (auto offset = 0; offset < numRecords; ++offset) { - counts[i][(array[offset].value >> (i * 8)) & 0xff]++; + counts[i][(array[offset] >> (i * 8)) & 0xff]++; } } } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index e55f4d01de821..ab93d9a33d04a 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -428,8 +428,8 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo auto buffer = cur->second; const auto* rawBuffer = buffer->as(); while (rowOffset_ < cur->first && readRows < batchSize_) { - auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_); - byteOffset_ += sizeof(uint32_t); + auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_) - sizeof(RowSizeType); + byteOffset_ += sizeof(RowSizeType); data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize)); byteOffset_ += rowSize; ++rowOffset_; diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 3f4181286bb7a..dfb0ff382b674 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -32,6 +32,8 @@ constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1; constexpr uint32_t kPartitionIdStartByteIndex = 5; constexpr uint32_t kPartitionIdEndByteIndex = 7; +constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024; + uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) { // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) | return (uint64_t)partitionId << 40 | (uint64_t)pageNumber << 27 | offsetInPage; @@ -80,7 +82,8 @@ arrow::Status VeloxSortShuffleWriter::stop() { if (offset_ > 0) { RETURN_NOT_OK(evictAllPartitions()); } - array_ = nullptr; + array_.reset(); + sortedBuffer_.reset(); pages_.clear(); pageAddresses_.clear(); RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); @@ -103,6 +106,8 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); initArray(); + sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(kSortedBufferSize, veloxPool_.get()); + rawBuffer_ = sortedBuffer_->asMutable(); return arrow::Status::OK(); } @@ -110,6 +115,9 @@ void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv if (UNLIKELY(!rowType_)) { rowType_ = facebook::velox::asRowType(rv->type()); fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_); + if (fixedRowSize_) { + *fixedRowSize_ += sizeof(RowSizeType); + } } } @@ -150,11 +158,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr facebook::velox::row::CompactRow row(vector); if (!fixedRowSize_) { - rowSizes_.resize(inputRows + 1); - rowSizes_[0] = 0; + rowSize_.resize(inputRows); + rowSizePrefixSum_.resize(inputRows + 1); + rowSizePrefixSum_[0] = 0; for (auto i = 0; i < inputRows; ++i) { - rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i); + auto rowSize = row.rowSize(i) + sizeof(RowSizeType); + rowSize_[i] = rowSize; + rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize; } + } else { + rowSize_.resize(inputRows, *fixedRowSize_); } uint32_t rowOffset = 0; @@ -162,7 +175,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr auto remainingRows = inputRows - rowOffset; auto rows = maxRowsToInsert(rowOffset, remainingRows); if (rows == 0) { - auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : rowSizes_[rowOffset + 1] - rowSizes_[rowOffset]; + auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : rowSize_[rowOffset]; acquireNewBuffer(memLimit, minSizeRequired); rows = maxRowsToInsert(rowOffset, remainingRows); ARROW_RETURN_IF( @@ -181,10 +194,12 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) { VELOX_CHECK(!pages_.empty()); for (auto i = offset; i < offset + rows; ++i) { + auto pid = row2Partition_[i]; + arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); + // size(RowSize) | bytes + memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType)); + pageCursor_ += sizeof(RowSizeType); auto size = row.serialize(i, currentPage_ + pageCursor_); - arrayPtr_[offset_].value = toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_); - arrayPtr_[offset_].rowSize = size; - ++offset_; pageCursor_ += size; VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } @@ -205,19 +220,20 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { { ScopedTimer timer(&sortTime_); if (options_.useRadixSort) { - begin = RadixSort::sort( + begin = RadixSort::sort( arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { auto ptr = arrayPtr_; - qsort(ptr, numRecords, sizeof(Element), compare); + qsort(ptr, numRecords, sizeof(uint64_t), compare); + (void)ptr; } } auto end = begin + numRecords; auto cur = begin; - auto pid = extractPartitionId(arrayPtr_[begin].value); + auto pid = extractPartitionId(arrayPtr_[begin]); while (++cur < end) { - auto curPid = extractPartitionId(arrayPtr_[cur].value); + auto curPid = extractPartitionId(arrayPtr_[cur]); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -226,8 +242,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } RETURN_NOT_OK(evictPartition(pid, begin, cur)); - sortedBuffer_ = nullptr; - if (!stopped_) { // Preserve the last page for use. auto numPages = pages_.size(); @@ -253,43 +267,39 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { - auto payload = prepareToEvict(begin, end); - RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); - return arrow::Status::OK(); -} - -std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t begin, size_t end) { ScopedTimer timer(&sortTime_); // Serialize [begin, end) - uint32_t numRows = end - begin; - uint64_t rawSize = numRows * sizeof(RowSizeType); - for (auto i = begin; i < end; ++i) { - rawSize += arrayPtr_[i].rowSize; - } - - if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { - sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(rawSize, veloxPool_.get()); - } - auto* rawBuffer = sortedBuffer_->asMutable(); - uint64_t offset = 0; - for (auto i = begin; i < end; ++i) { - // size(RowSize) | bytes - auto size = arrayPtr_[i].rowSize; - memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); - offset += sizeof(RowSizeType); - auto index = extractPageNumberAndOffset(arrayPtr_[i].value); - memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); + char* addr; + uint32_t size; + + auto index = begin; + while (index < end) { + auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]); + addr = pageAddresses_[pageIndex.first] + pageIndex.second; + size = *(RowSizeType*)addr; + if (offset + size > kSortedBufferSize) { + VELOX_CHECK(offset > 0); + auto payload = std::make_unique( + index - begin, + nullptr, + std::vector>{std::make_shared(rawBuffer_, offset)}); + RETURN_NOT_OK( + partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + begin = index; + offset = 0; + } + gluten::fastCopy(rawBuffer_ + offset, addr, size); offset += size; + index++; } - VELOX_CHECK_EQ(offset, rawSize); - - auto rawData = sortedBuffer_->as(); - std::vector> buffers; - buffers.push_back(std::make_shared(rawData, rawSize)); - - return std::make_unique(numRows, nullptr, std::move(buffers)); + auto payload = std::make_unique( + end - begin, + nullptr, + std::vector>{std::make_shared(rawBuffer_, offset)}); + RETURN_NOT_OK( + partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + return arrow::Status::OK(); } uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) { @@ -301,8 +311,8 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) if (fixedRowSize_) { return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), rows); } - auto beginIter = rowSizes_.begin() + 1 + offset; - auto iter = std::upper_bound(beginIter, rowSizes_.end(), remainingBytes); + auto beginIter = rowSizePrefixSum_.begin() + 1 + offset; + auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes); return iter - beginIter; } @@ -328,17 +338,18 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { auto newSize = newArraySize(rows); if (newSize > arraySize_) { // May trigger spill. - auto newSizeBytes = newSize * sizeof(Element); + auto newSizeBytes = newSize * sizeof(uint64_t); auto newArray = facebook::velox::AlignedBuffer::allocate(newSizeBytes, veloxPool_.get()); // Check if already satisfies. if (newArraySize(rows) > arraySize_) { - auto newPtr = newArray->asMutable(); + auto newPtr = newArray->asMutable(); if (offset_ > 0) { - gluten::fastCopy(newPtr, arrayPtr_, arraySize_ * sizeof(Element)); + gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t)); } arraySize_ = newSize; arrayPtr_ = newPtr; - array_ = std::move(newArray); + array_.reset(); + array_.swap(newArray); } } } @@ -355,8 +366,8 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) { void VeloxSortShuffleWriter::initArray() { arraySize_ = options_.sortBufferInitialSize; - array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(Element), veloxPool_.get()); - arrayPtr_ = array_->asMutable(); + array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(uint64_t), veloxPool_.get()); + arrayPtr_ = array_->asMutable(); } int64_t VeloxSortShuffleWriter::peakBytesAllocated() const { @@ -373,6 +384,6 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const { int VeloxSortShuffleWriter::compare(const void* a, const void* b) { // No same values. - return ((Element*)a)->value > ((Element*)b)->value ? 1 : -1; + return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1; } } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 961700b6da078..7d83d2f347aaf 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -77,8 +77,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end); - std::unique_ptr prepareToEvict(size_t begin, size_t end); - uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows); void acquireNewBuffer(int64_t memLimit, uint64_t minSizeRequired); @@ -89,16 +87,11 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void initArray(); - struct Element { - uint64_t value; - uint32_t rowSize; - }; - static int compare(const void* a, const void* b); // Stores compact row id -> row facebook::velox::BufferPtr array_; - Element* arrayPtr_; + uint64_t* arrayPtr_; uint32_t arraySize_; uint32_t offset_{0}; @@ -111,6 +104,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { uint32_t currenPageSize_; facebook::velox::BufferPtr sortedBuffer_; + uint8_t* rawBuffer_; // Row ID -> Partition ID // subscript: The index of row in the current input RowVector @@ -120,7 +114,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { std::shared_ptr rowType_; std::optional fixedRowSize_; - std::vector rowSizes_; + std::vector rowSize_; + std::vector rowSizePrefixSum_; int64_t c2rTime_{0}; int64_t sortTime_{0};