From bf5ad1ebac3ac7d8bf2d25f5dbf0730805d1762f Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 04:14:40 +0000 Subject: [PATCH] radix sort --- cpp/velox/shuffle/RadixSort.h | 157 ++++++++++++++++++++ cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +- cpp/velox/shuffle/VeloxShuffleReader.h | 3 + cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 53 ++++--- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 13 +- 5 files changed, 204 insertions(+), 27 deletions(-) create mode 100644 cpp/velox/shuffle/RadixSort.h diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h new file mode 100644 index 000000000000..49a40d2add0a --- /dev/null +++ b/cpp/velox/shuffle/RadixSort.h @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +namespace gluten { + +template +class RadixSort { + public: + /** + * Sorts a given array of longs using least-significant-digit radix sort. This routine assumes + * you have extra space at the end of the array at least equal to the number of records. The + * sort is destructive and may relocate the data positioned within the array. + * + * @param array array of long elements followed by at least that many empty slots. + * @param numRecords number of data records in the array. + * @param startByteIndex the first byte (in range [0, 7]) to sort each long by, counting from the + * least significant byte. + * @param endByteIndex the last byte (in range [0, 7]) to sort each long by, counting from the + * least significant byte. Must be greater than startByteIndex. + * + * @return The starting index of the sorted data within the given array. We return this instead + * of always copying the data back to position zero for efficiency. + */ + static int32_t sort(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + assert(startByteIndex >= 0 && "startByteIndex should >= 0"); + assert(endByteIndex <= 7 && "endByteIndex should <= 7"); + assert(endByteIndex > startByteIndex); + assert(numRecords * 2 <= array.size()); + + int64_t inIndex = 0; + int64_t outIndex = numRecords; + + if (numRecords > 0) { + auto counts = getCounts(array, numRecords, startByteIndex, endByteIndex); + + for (auto i = startByteIndex; i <= endByteIndex; i++) { + if (!counts[i].empty()) { + sortAtByte(array, numRecords, counts[i], i, inIndex, outIndex); + std::swap(inIndex, outIndex); + } + } + } + + return static_cast(inIndex); + } + + private: + /** + * Performs a partial sort by copying data into destination offsets for each byte value at the + * specified byte offset. + * + * @param array array to partially sort. + * @param numRecords number of data records in the array. + * @param counts counts for each byte value. This routine destructively modifies this array. + * @param byteIdx the byte in a long to sort at, counting from the least significant byte. + * @param inIndex the starting index in the array where input data is located. + * @param outIndex the starting index where sorted output data should be written. + */ + static void sortAtByte( + SortArray& array, + int64_t numRecords, + std::vector& counts, + int32_t byteIdx, + int64_t inIndex, + int64_t outIndex) { + assert(counts.size() == 256); + + auto offsets = transformCountsToOffsets(counts, outIndex); + + for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { + auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff; + array[offsets[bucket]++] = array[offset]; + } + } + + /** + * Computes a value histogram for each byte in the given array. + * + * @param array array to count records in. + * @param numRecords number of data records in the array. + * @param startByteIndex the first byte to compute counts for (the prior are skipped). + * @param endByteIndex the last byte to compute counts for. + * + * @return a vector of eight 256-element count arrays, one for each byte starting from the least + * significant byte. If the byte does not need sorting the vector entry will be empty. + */ + static std::vector> + getCounts(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + std::vector> counts; + counts.resize(8); + + // Optimization: do a fast pre-pass to determine which byte indices we can skip for sorting. + // If all the byte values at a particular index are the same we don't need to count it. + int64_t bitwiseMax = 0; + int64_t bitwiseMin = -1L; + for (auto offset = 0; offset < numRecords; ++offset) { + auto value = array[offset]; + bitwiseMax |= value; + bitwiseMin &= value; + } + auto bitsChanged = bitwiseMin ^ bitwiseMax; + + // Compute counts for each byte index. + for (auto i = startByteIndex; i <= endByteIndex; i++) { + if (((bitsChanged >> (i * 8)) & 0xff) != 0) { + counts[i].resize(256); + for (auto offset = 0; offset < numRecords; ++offset) { + counts[i][(array[offset] >> (i * 8)) & 0xff]++; + } + } + } + + return counts; + } + + /** + * Transforms counts into the proper output offsets for the sort type. + * + * @param counts counts for each byte value. This routine destructively modifies this vector. + * @param numRecords number of data records in the original data array. + * @param outputOffset output offset in bytes from the base array object. + * + * @return the input counts vector. + */ + static std::vector& transformCountsToOffsets(std::vector& counts, int64_t outputOffset) { + assert(counts.size() == 256); + + int64_t pos = outputOffset; + for (auto i = 0; i < 256; i++) { + auto tmp = counts[i & 0xff]; + counts[i & 0xff] = pos; + pos += tmp; + } + + return counts; + } +}; + +} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index e55f4d01de82..10673cceeaaf 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -428,9 +428,8 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo auto buffer = cur->second; const auto* rawBuffer = buffer->as(); while (rowOffset_ < cur->first && readRows < batchSize_) { - auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_); - byteOffset_ += sizeof(uint32_t); - data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize)); + auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_); + data.push_back(std::string_view(rawBuffer + byteOffset_ + sizeof(RowSizeType), rowSize - sizeof(RowSizeType))); byteOffset_ += rowSize; ++rowOffset_; ++readRows; diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 96273948526d..2be913aa13a7 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -20,6 +20,7 @@ #include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "shuffle/Payload.h" #include "shuffle/ShuffleReader.h" +#include "shuffle/VeloxSortShuffleWriter.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" @@ -64,6 +65,8 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator { public: + using RowSizeType = VeloxSortShuffleWriter::RowSizeType; + VeloxSortShuffleReaderDeserializer( std::shared_ptr in, const std::shared_ptr& schema, diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index cb81ea5f6c8a..f274cb1a291d 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -29,6 +29,8 @@ namespace gluten { namespace { constexpr uint32_t kMaskLower27Bits = (1 << 27) - 1; constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1; +constexpr uint32_t kPartitionIdStartByteIndex = 5; +constexpr uint32_t kPartitionIdEndByteIndex = 7; uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) { // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) | @@ -101,6 +103,7 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); + partitionRawSize_.resize(numPartitions_, 0); return arrow::Status::OK(); } @@ -108,6 +111,9 @@ void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv if (UNLIKELY(!rowType_)) { rowType_ = facebook::velox::asRowType(rv->type()); fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_); + if (fixedRowSize_) { + *fixedRowSize_ += sizeof(RowSizeType); + } } } @@ -151,7 +157,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr rowSizes_.resize(inputRows + 1); rowSizes_[0] = 0; for (auto i = 0; i < inputRows; ++i) { - rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i); + rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i) + sizeof(RowSizeType); } } @@ -177,8 +183,15 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u // Allocate newArray can trigger spill. growArrayIfNecessary(rows); for (auto i = offset; i < offset + rows; ++i) { - auto size = row.serialize(i, currentPage_ + pageCursor_); - array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; + auto pid = row2Partition_[i]; + array_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); + + // (RowSizeType)size | serialized row + auto dest = currentPage_ + pageCursor_; + RowSizeType size = sizeof(RowSizeType) + row.serialize(i, dest + sizeof(RowSizeType)); + memcpy(dest, &size, sizeof(RowSizeType)); + + partitionRawSize_[pid] += size; pageCursor_ += size; } } @@ -192,17 +205,23 @@ arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) { } arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { + auto numRecords = offset_; + int32_t begin = 0; { ScopedTimer timer(&sortTime_); // TODO: Add radix sort to align with Spark. - std::sort(array_.begin(), array_.begin() + offset_); + if (useRadixSort_) { + begin = RadixSort::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); + } else { + std::sort(array_.begin(), array_.begin() + numRecords); + } } - size_t begin = 0; - size_t cur = 0; - auto pid = extractPartitionId(array_[begin].first); - while (++cur < offset_) { - auto curPid = extractPartitionId(array_[cur].first); + auto end = begin + numRecords; + auto cur = begin; + auto pid = extractPartitionId(array_[begin]); + while (++cur < end) { + auto curPid = extractPartitionId(array_[cur]); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -230,10 +249,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { // Serialize [begin, end) uint32_t numRows = end - begin; - uint64_t rawSize = numRows * sizeof(RowSizeType); - for (auto i = begin; i < end; ++i) { - rawSize += array_[i].second; - } + uint64_t rawSize = partitionRawSize_[partitionId]; + partitionRawSize_[partitionId] = 0; if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { sortedBuffer_ = nullptr; @@ -243,12 +260,10 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - // size(size_t) | bytes - auto size = array_[i].second; - memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); - offset += sizeof(RowSizeType); - auto index = extractPageNumberAndOffset(array_[i].first); - memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); + auto index = extractPageNumberAndOffset(array_[i]); + const auto* src = pageAddresses_[index.first] + index.second; + auto size = *(RowSizeType*)(src); + memcpy(rawBuffer + offset, src, size); offset += size; } VELOX_CHECK_EQ(offset, rawSize); diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 8f01997b11e1..cb393ea5474f 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -17,6 +17,7 @@ #pragma once +#include "shuffle/RadixSort.h" #include "shuffle/VeloxShuffleWriter.h" #include @@ -31,6 +32,8 @@ namespace gluten { class VeloxSortShuffleWriter final : public VeloxShuffleWriter { public: + using RowSizeType = uint32_t; + static arrow::Result> create( uint32_t numPartitions, std::unique_ptr partitionWriter, @@ -80,10 +83,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); - using RowSizeType = uint32_t; - using ElementType = std::pair; - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using Allocator = facebook::velox::StlAllocator; + using SortArray = std::vector; std::unique_ptr allocator_; // Stores compact row id -> row @@ -98,7 +99,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // FIXME: Use configuration to replace hardcode. uint32_t initialSize_ = 4096; - bool useRadixSort_ = false; + bool useRadixSort_ = true; facebook::velox::BufferPtr sortedBuffer_; @@ -108,6 +109,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // Updated for each input RowVector. std::vector row2Partition_; + std::vector partitionRawSize_; + std::shared_ptr rowType_; std::optional fixedRowSize_; std::vector rowSizes_;