From 66165e3627336fe260b03228e862bd2109213a48 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 04:14:19 +0000 Subject: [PATCH 01/13] add shuffle write wall time metrics --- .../gluten/backendsapi/velox/VeloxMetricsApi.scala | 1 + .../VeloxCelebornColumnarShuffleWriter.scala | 14 ++++++++++---- .../spark/shuffle/ColumnarShuffleWriter.scala | 7 +++---- .../writer/VeloxUniffleColumnarShuffleWriter.java | 6 +++--- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 30b08749e907..c05ce7cdcd4f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -272,6 +272,7 @@ class VeloxMetricsApi extends MetricsApi with Logging { "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to compress"), "decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to decompress"), "deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to deserialize"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle wall time"), // For hash shuffle writer, the peak bytes represents the maximum split buffer size. // For sort shuffle writer, the peak bytes represents the maximum // row buffer + sort buffer size. diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index baf61b8a14f2..f13ef20a59ff 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -48,6 +48,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( celebornConf, client, writeMetrics) { + private val isSort = !GlutenConfig.GLUTEN_HASH_SHUFFLE_WRITER.equals(shuffleWriterType) private val runtime = Runtimes.contextInstance("CelebornShuffleWriter") @@ -72,7 +73,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( val handle = ColumnarBatches.getNativeHandle(cb) val startTime = System.nanoTime() jniWrapper.write(nativeShuffleWriter, cb.numRows, handle, availableOffHeapPerTask()) - dep.metrics("splitTime").add(System.nanoTime() - startTime) + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(cb.numRows) dep.metrics("inputBatches").add(1) // This metric is important, AQE use it to decide if EliminateLimit @@ -84,10 +85,15 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( val startTime = System.nanoTime() splitResult = jniWrapper.stop(nativeShuffleWriter) - dep - .metrics("splitTime") + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) + val nativeMetrics = if (isSort) { + dep.metrics("splitTime") + } else { + dep.metrics("sortTime") + } + nativeMetrics .add( - System.nanoTime() - startTime - splitResult.getTotalPushTime - + dep.metrics("shuffleWallTime").value - splitResult.getTotalPushTime - splitResult.getTotalWriteTime - splitResult.getTotalCompressTime) dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum) diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 8a8248f2397f..5274ec94c8d5 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -176,9 +176,7 @@ class ColumnarShuffleWriter[K, V]( } val startTime = System.nanoTime() jniWrapper.write(nativeShuffleWriter, rows, handle, availableOffHeapPerTask()) - if (!isSort) { - dep.metrics("splitTime").add(System.nanoTime() - startTime) - } + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(rows) dep.metrics("inputBatches").add(1) // This metric is important, AQE use it to decide if EliminateLimit @@ -191,11 +189,12 @@ class ColumnarShuffleWriter[K, V]( assert(nativeShuffleWriter != -1L) splitResult = jniWrapper.stop(nativeShuffleWriter) closeShuffleWriter() + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) if (!isSort) { dep .metrics("splitTime") .add( - System.nanoTime() - startTime - splitResult.getTotalSpillTime - + dep.metrics("shuffleWallTime").value - splitResult.getTotalSpillTime - splitResult.getTotalWriteTime - splitResult.getTotalCompressTime) } else { diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 62e1971635ce..32c672cec6be 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -180,7 +180,7 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { LOG.debug("jniWrapper.write rows {}, split bytes {}", cb.numRows(), bytes); columnarDep.metrics().get("dataSize").get().add(bytes); // this metric replace part of uniffle shuffle write time - columnarDep.metrics().get("splitTime").get().add(System.nanoTime() - startTime); + columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - startTime); columnarDep.metrics().get("numInputRows").get().add(cb.numRows()); columnarDep.metrics().get("inputBatches").get().add(1); shuffleWriteMetrics.incRecordsWritten(cb.numRows()); @@ -193,13 +193,13 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { throw new IllegalStateException("nativeShuffleWriter should not be -1L"); } splitResult = jniWrapper.stop(nativeShuffleWriter); + columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - startTime); columnarDep .metrics() .get("splitTime") .get() .add( - System.nanoTime() - - startTime + columnarDep.metrics().get("shuffleWallTime").get().value() - splitResult.getTotalPushTime() - splitResult.getTotalWriteTime() - splitResult.getTotalCompressTime()); From 04f19bf81c9fc42314ff785a6be7b91133270f26 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 04:14:40 +0000 Subject: [PATCH 02/13] radix sort --- cpp/velox/shuffle/RadixSort.h | 157 ++++++++++++++++++++ cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +- cpp/velox/shuffle/VeloxShuffleReader.h | 3 + cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 53 ++++--- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 13 +- 5 files changed, 204 insertions(+), 27 deletions(-) create mode 100644 cpp/velox/shuffle/RadixSort.h diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h new file mode 100644 index 000000000000..49a40d2add0a --- /dev/null +++ b/cpp/velox/shuffle/RadixSort.h @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +namespace gluten { + +template +class RadixSort { + public: + /** + * Sorts a given array of longs using least-significant-digit radix sort. This routine assumes + * you have extra space at the end of the array at least equal to the number of records. The + * sort is destructive and may relocate the data positioned within the array. + * + * @param array array of long elements followed by at least that many empty slots. + * @param numRecords number of data records in the array. + * @param startByteIndex the first byte (in range [0, 7]) to sort each long by, counting from the + * least significant byte. + * @param endByteIndex the last byte (in range [0, 7]) to sort each long by, counting from the + * least significant byte. Must be greater than startByteIndex. + * + * @return The starting index of the sorted data within the given array. We return this instead + * of always copying the data back to position zero for efficiency. + */ + static int32_t sort(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + assert(startByteIndex >= 0 && "startByteIndex should >= 0"); + assert(endByteIndex <= 7 && "endByteIndex should <= 7"); + assert(endByteIndex > startByteIndex); + assert(numRecords * 2 <= array.size()); + + int64_t inIndex = 0; + int64_t outIndex = numRecords; + + if (numRecords > 0) { + auto counts = getCounts(array, numRecords, startByteIndex, endByteIndex); + + for (auto i = startByteIndex; i <= endByteIndex; i++) { + if (!counts[i].empty()) { + sortAtByte(array, numRecords, counts[i], i, inIndex, outIndex); + std::swap(inIndex, outIndex); + } + } + } + + return static_cast(inIndex); + } + + private: + /** + * Performs a partial sort by copying data into destination offsets for each byte value at the + * specified byte offset. + * + * @param array array to partially sort. + * @param numRecords number of data records in the array. + * @param counts counts for each byte value. This routine destructively modifies this array. + * @param byteIdx the byte in a long to sort at, counting from the least significant byte. + * @param inIndex the starting index in the array where input data is located. + * @param outIndex the starting index where sorted output data should be written. + */ + static void sortAtByte( + SortArray& array, + int64_t numRecords, + std::vector& counts, + int32_t byteIdx, + int64_t inIndex, + int64_t outIndex) { + assert(counts.size() == 256); + + auto offsets = transformCountsToOffsets(counts, outIndex); + + for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { + auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff; + array[offsets[bucket]++] = array[offset]; + } + } + + /** + * Computes a value histogram for each byte in the given array. + * + * @param array array to count records in. + * @param numRecords number of data records in the array. + * @param startByteIndex the first byte to compute counts for (the prior are skipped). + * @param endByteIndex the last byte to compute counts for. + * + * @return a vector of eight 256-element count arrays, one for each byte starting from the least + * significant byte. If the byte does not need sorting the vector entry will be empty. + */ + static std::vector> + getCounts(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + std::vector> counts; + counts.resize(8); + + // Optimization: do a fast pre-pass to determine which byte indices we can skip for sorting. + // If all the byte values at a particular index are the same we don't need to count it. + int64_t bitwiseMax = 0; + int64_t bitwiseMin = -1L; + for (auto offset = 0; offset < numRecords; ++offset) { + auto value = array[offset]; + bitwiseMax |= value; + bitwiseMin &= value; + } + auto bitsChanged = bitwiseMin ^ bitwiseMax; + + // Compute counts for each byte index. + for (auto i = startByteIndex; i <= endByteIndex; i++) { + if (((bitsChanged >> (i * 8)) & 0xff) != 0) { + counts[i].resize(256); + for (auto offset = 0; offset < numRecords; ++offset) { + counts[i][(array[offset] >> (i * 8)) & 0xff]++; + } + } + } + + return counts; + } + + /** + * Transforms counts into the proper output offsets for the sort type. + * + * @param counts counts for each byte value. This routine destructively modifies this vector. + * @param numRecords number of data records in the original data array. + * @param outputOffset output offset in bytes from the base array object. + * + * @return the input counts vector. + */ + static std::vector& transformCountsToOffsets(std::vector& counts, int64_t outputOffset) { + assert(counts.size() == 256); + + int64_t pos = outputOffset; + for (auto i = 0; i < 256; i++) { + auto tmp = counts[i & 0xff]; + counts[i & 0xff] = pos; + pos += tmp; + } + + return counts; + } +}; + +} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index e55f4d01de82..10673cceeaaf 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -428,9 +428,8 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo auto buffer = cur->second; const auto* rawBuffer = buffer->as(); while (rowOffset_ < cur->first && readRows < batchSize_) { - auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_); - byteOffset_ += sizeof(uint32_t); - data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize)); + auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_); + data.push_back(std::string_view(rawBuffer + byteOffset_ + sizeof(RowSizeType), rowSize - sizeof(RowSizeType))); byteOffset_ += rowSize; ++rowOffset_; ++readRows; diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 96273948526d..2be913aa13a7 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -20,6 +20,7 @@ #include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "shuffle/Payload.h" #include "shuffle/ShuffleReader.h" +#include "shuffle/VeloxSortShuffleWriter.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" @@ -64,6 +65,8 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator { public: + using RowSizeType = VeloxSortShuffleWriter::RowSizeType; + VeloxSortShuffleReaderDeserializer( std::shared_ptr in, const std::shared_ptr& schema, diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 7c033fb98bf8..a2201dd9d95c 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -29,6 +29,8 @@ namespace gluten { namespace { constexpr uint32_t kMaskLower27Bits = (1 << 27) - 1; constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1; +constexpr uint32_t kPartitionIdStartByteIndex = 5; +constexpr uint32_t kPartitionIdEndByteIndex = 7; uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) { // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) | @@ -101,6 +103,7 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); + partitionRawSize_.resize(numPartitions_, 0); return arrow::Status::OK(); } @@ -108,6 +111,9 @@ void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv if (UNLIKELY(!rowType_)) { rowType_ = facebook::velox::asRowType(rv->type()); fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_); + if (fixedRowSize_) { + *fixedRowSize_ += sizeof(RowSizeType); + } } } @@ -151,7 +157,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr rowSizes_.resize(inputRows + 1); rowSizes_[0] = 0; for (auto i = 0; i < inputRows; ++i) { - rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i); + rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i) + sizeof(RowSizeType); } } @@ -177,8 +183,15 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u // Allocate newArray can trigger spill. growArrayIfNecessary(rows); for (auto i = offset; i < offset + rows; ++i) { - auto size = row.serialize(i, currentPage_ + pageCursor_); - array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; + auto pid = row2Partition_[i]; + array_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); + + // (RowSizeType)size | serialized row + auto dest = currentPage_ + pageCursor_; + RowSizeType size = sizeof(RowSizeType) + row.serialize(i, dest + sizeof(RowSizeType)); + memcpy(dest, &size, sizeof(RowSizeType)); + + partitionRawSize_[pid] += size; pageCursor_ += size; } } @@ -192,17 +205,23 @@ arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) { } arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { + auto numRecords = offset_; + int32_t begin = 0; { ScopedTimer timer(&sortTime_); // TODO: Add radix sort to align with Spark. - std::sort(array_.begin(), array_.begin() + offset_); + if (useRadixSort_) { + begin = RadixSort::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); + } else { + std::sort(array_.begin(), array_.begin() + numRecords); + } } - size_t begin = 0; - size_t cur = 0; - auto pid = extractPartitionId(array_[begin].first); - while (++cur < offset_) { - auto curPid = extractPartitionId(array_[cur].first); + auto end = begin + numRecords; + auto cur = begin; + auto pid = extractPartitionId(array_[begin]); + while (++cur < end) { + auto curPid = extractPartitionId(array_[cur]); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -230,10 +249,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { // Serialize [begin, end) uint32_t numRows = end - begin; - uint64_t rawSize = numRows * sizeof(RowSizeType); - for (auto i = begin; i < end; ++i) { - rawSize += array_[i].second; - } + uint64_t rawSize = partitionRawSize_[partitionId]; + partitionRawSize_[partitionId] = 0; if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { sortedBuffer_ = nullptr; @@ -243,12 +260,10 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - // size(size_t) | bytes - auto size = array_[i].second; - memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); - offset += sizeof(RowSizeType); - auto index = extractPageNumberAndOffset(array_[i].first); - memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); + auto index = extractPageNumberAndOffset(array_[i]); + const auto* src = pageAddresses_[index.first] + index.second; + auto size = *(RowSizeType*)(src); + memcpy(rawBuffer + offset, src, size); offset += size; } VELOX_CHECK_EQ(offset, rawSize); diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 6ac5308d0bd8..0b72bba6768e 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -17,6 +17,7 @@ #pragma once +#include "shuffle/RadixSort.h" #include "shuffle/VeloxShuffleWriter.h" #include @@ -31,6 +32,8 @@ namespace gluten { class VeloxSortShuffleWriter final : public VeloxShuffleWriter { public: + using RowSizeType = uint32_t; + static arrow::Result> create( uint32_t numPartitions, std::unique_ptr partitionWriter, @@ -80,10 +83,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); - using RowSizeType = uint32_t; - using ElementType = std::pair; - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using Allocator = facebook::velox::StlAllocator; + using SortArray = std::vector; std::unique_ptr allocator_; // Stores compact row id -> row @@ -98,7 +99,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // FIXME: Use configuration to replace hardcode. uint32_t initialSize_ = 4096; - bool useRadixSort_ = false; + bool useRadixSort_ = true; facebook::velox::BufferPtr sortedBuffer_; @@ -108,6 +109,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // Updated for each input RowVector. std::vector row2Partition_; + std::vector partitionRawSize_; + std::shared_ptr rowType_; std::optional fixedRowSize_; std::vector rowSizes_; From e1979713f26cca82e4f5871a4b20042a442488b7 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 04:26:46 +0000 Subject: [PATCH 03/13] partial revert --- cpp/velox/shuffle/RadixSort.h | 6 ++-- cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +-- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 38 ++++++++------------- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 7 ++-- 4 files changed, 24 insertions(+), 32 deletions(-) diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h index 49a40d2add0a..06d7a89a492b 100644 --- a/cpp/velox/shuffle/RadixSort.h +++ b/cpp/velox/shuffle/RadixSort.h @@ -86,7 +86,7 @@ class RadixSort { auto offsets = transformCountsToOffsets(counts, outIndex); for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { - auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff; + auto bucket = (array[offset].first >> (byteIdx * 8)) & 0xff; array[offsets[bucket]++] = array[offset]; } } @@ -112,7 +112,7 @@ class RadixSort { int64_t bitwiseMax = 0; int64_t bitwiseMin = -1L; for (auto offset = 0; offset < numRecords; ++offset) { - auto value = array[offset]; + auto value = array[offset].first; bitwiseMax |= value; bitwiseMin &= value; } @@ -123,7 +123,7 @@ class RadixSort { if (((bitsChanged >> (i * 8)) & 0xff) != 0) { counts[i].resize(256); for (auto offset = 0; offset < numRecords; ++offset) { - counts[i][(array[offset] >> (i * 8)) & 0xff]++; + counts[i][(array[offset].first >> (i * 8)) & 0xff]++; } } } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 10673cceeaaf..e55f4d01de82 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -428,8 +428,9 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo auto buffer = cur->second; const auto* rawBuffer = buffer->as(); while (rowOffset_ < cur->first && readRows < batchSize_) { - auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_); - data.push_back(std::string_view(rawBuffer + byteOffset_ + sizeof(RowSizeType), rowSize - sizeof(RowSizeType))); + auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_); + byteOffset_ += sizeof(uint32_t); + data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize)); byteOffset_ += rowSize; ++rowOffset_; ++readRows; diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index a2201dd9d95c..17cd9e5c7baa 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -103,7 +103,6 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); - partitionRawSize_.resize(numPartitions_, 0); return arrow::Status::OK(); } @@ -111,9 +110,6 @@ void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv if (UNLIKELY(!rowType_)) { rowType_ = facebook::velox::asRowType(rv->type()); fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_); - if (fixedRowSize_) { - *fixedRowSize_ += sizeof(RowSizeType); - } } } @@ -157,7 +153,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr rowSizes_.resize(inputRows + 1); rowSizes_[0] = 0; for (auto i = 0; i < inputRows; ++i) { - rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i) + sizeof(RowSizeType); + rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i); } } @@ -183,15 +179,8 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u // Allocate newArray can trigger spill. growArrayIfNecessary(rows); for (auto i = offset; i < offset + rows; ++i) { - auto pid = row2Partition_[i]; - array_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); - - // (RowSizeType)size | serialized row - auto dest = currentPage_ + pageCursor_; - RowSizeType size = sizeof(RowSizeType) + row.serialize(i, dest + sizeof(RowSizeType)); - memcpy(dest, &size, sizeof(RowSizeType)); - - partitionRawSize_[pid] += size; + auto size = row.serialize(i, currentPage_ + pageCursor_); + array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; pageCursor_ += size; } } @@ -209,7 +198,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { int32_t begin = 0; { ScopedTimer timer(&sortTime_); - // TODO: Add radix sort to align with Spark. if (useRadixSort_) { begin = RadixSort::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { @@ -219,9 +207,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { auto end = begin + numRecords; auto cur = begin; - auto pid = extractPartitionId(array_[begin]); + auto pid = extractPartitionId(array_[begin].first); while (++cur < end) { - auto curPid = extractPartitionId(array_[cur]); + auto curPid = extractPartitionId(array_[cur].first); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -249,8 +237,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { // Serialize [begin, end) uint32_t numRows = end - begin; - uint64_t rawSize = partitionRawSize_[partitionId]; - partitionRawSize_[partitionId] = 0; + uint64_t rawSize = numRows * sizeof(RowSizeType); + for (auto i = begin; i < end; ++i) { + rawSize += array_[i].second; + } if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { sortedBuffer_ = nullptr; @@ -260,10 +250,12 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - auto index = extractPageNumberAndOffset(array_[i]); - const auto* src = pageAddresses_[index.first] + index.second; - auto size = *(RowSizeType*)(src); - memcpy(rawBuffer + offset, src, size); + // size(size_t) | bytes + auto size = array_[i].second; + memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); + offset += sizeof(RowSizeType); + auto index = extractPageNumberAndOffset(array_[i].first); + memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); offset += size; } VELOX_CHECK_EQ(offset, rawSize); diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 0b72bba6768e..43993803fcdd 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -83,8 +83,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using ElementType = std::pair; + using Allocator = facebook::velox::StlAllocator; + using SortArray = std::vector; std::unique_ptr allocator_; // Stores compact row id -> row @@ -109,8 +110,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // Updated for each input RowVector. std::vector row2Partition_; - std::vector partitionRawSize_; - std::shared_ptr rowType_; std::optional fixedRowSize_; std::vector rowSizes_; From 71e5da807fcff8c6d7f1edd8a20eec523ce70566 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 08:03:51 +0000 Subject: [PATCH 04/13] update metrics --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 12 ++++++++---- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 ++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 17cd9e5c7baa..2c42f27cf928 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -235,6 +235,13 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { + auto payload = prepareToEvict(begin, end); + RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + return arrow::Status::OK(); +} + +std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t begin, size_t end) { + ScopedTimer timer(&sortTime_); // Serialize [begin, end) uint32_t numRows = end - begin; uint64_t rawSize = numRows * sizeof(RowSizeType); @@ -264,10 +271,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ std::vector> buffers; buffers.push_back(std::make_shared(rawData, rawSize)); - auto payload = std::make_unique(numRows, nullptr, std::move(buffers)); - RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); - return arrow::Status::OK(); + return std::make_unique(numRows, nullptr, std::move(buffers)); } uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) { diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 43993803fcdd..a6a7f5e3726b 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -77,6 +77,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end); + std::unique_ptr prepareToEvict(size_t begin, size_t end); + uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows); void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired); From b77599bc523386bb887acdc9dfed9c77e2d21fe9 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 15:45:39 +0000 Subject: [PATCH 05/13] address comments & fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 9 +++++++-- .../shuffle/VeloxCelebornColumnarBatchSerializer.scala | 2 ++ .../src/main/scala/org/apache/gluten/GlutenConfig.scala | 1 - 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 2c42f27cf928..21bf3402a3a6 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -236,7 +236,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { auto payload = prepareToEvict(begin, end); - RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + RETURN_NOT_OK( + partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); return arrow::Status::OK(); } @@ -307,7 +308,11 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; } if (arraySize != array_.size()) { - array_.resize(arraySize); + auto newArray{SortArray{Allocator(allocator_.get())}}; + newArray.resize(arraySize); + std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + array_.clear(); + array_ = std::move(newArray); } } diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index dd4904964957..6f21b528f1c2 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle import org.apache.gluten.GlutenConfig +import org.apache.gluten.GlutenConfig.{GLUTEN_RSS_SORT_SHUFFLE_WRITER, GLUTEN_SORT_SHUFFLE_WRITER} import org.apache.gluten.exec.Runtimes import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.utils.ArrowAbiUtil @@ -84,6 +85,7 @@ private class CelebornColumnarBatchSerializerInstance( val compressionCodecBackend = GlutenConfig.getConf.columnarShuffleCodecBackend.orNull val shuffleWriterType = GlutenConfig.getConf.celebornShuffleWriterType + .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER) val jniWrapper = ShuffleReaderJniWrapper.create(runtime) val batchSize = GlutenConfig.getConf.maxBatchSize val handle = jniWrapper diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 5547feafe331..586a277f7855 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -137,7 +137,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { conf .getConfString("spark.celeborn.client.spark.shuffle.writer", GLUTEN_HASH_SHUFFLE_WRITER) .toLowerCase(Locale.ROOT) - .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER) def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED) From 94b32cb6dee49ec0c60ce4eac4973d8a2d70274b Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 02:45:03 +0000 Subject: [PATCH 06/13] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 27 +++++++++++++------ cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 +- .../VeloxCelebornColumnarShuffleWriter.scala | 4 +-- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 21bf3402a3a6..a2200e459928 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -168,7 +168,10 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr ARROW_RETURN_IF( rows == 0, arrow::Status::Invalid("Failed to insert rows. Remaining rows: " + std::to_string(remainingRows))); } + // Spill to avoid offset_ overflow. RETURN_NOT_OK(maybeSpill(rows)); + // Allocate newArray can trigger spill. + growArrayIfNecessary(rows); insertRows(row, rowOffset, rows); rowOffset += rows; } @@ -176,8 +179,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr } void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) { - // Allocate newArray can trigger spill. - growArrayIfNecessary(rows); + VELOX_CHECK(!pages_.empty()); for (auto i = offset; i < offset + rows; ++i) { auto size = row.serialize(i, currentPage_ + pageCursor_); array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; @@ -218,18 +220,27 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } RETURN_NOT_OK(evictPartition(pid, begin, cur)); - pageCursor_ = 0; - pages_.clear(); - pageAddresses_.clear(); - + sortedBuffer_ = nullptr; offset_ = 0; array_.clear(); - sortedBuffer_ = nullptr; - if (!stopped_) { + auto numPages = pages_.size(); + while (--numPages) { + pages_.pop_front(); + } + auto& page = pages_.front(); + memset(page->asMutable(), 0, page->size()); + pageAddresses_.resize(1); + pageAddresses_[0] = currentPage_; + pageNumber_ = 0; + pageCursor_ = 0; + // Allocate array_ can trigger spill. array_.resize(initialSize_); + } else { + pages_.clear(); + pageAddresses_.clear(); } return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index a6a7f5e3726b..706d306cfa9f 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -94,7 +94,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { SortArray array_; uint32_t offset_{0}; - std::vector pages_; + std::list pages_; std::vector pageAddresses_; char* currentPage_; uint32_t pageNumber_; diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index f13ef20a59ff..4069f1b44324 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -87,9 +87,9 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) val nativeMetrics = if (isSort) { - dep.metrics("splitTime") - } else { dep.metrics("sortTime") + } else { + dep.metrics("splitTime") } nativeMetrics .add( From cd6eb1ded8f8dda80309b9c87fe2021ea22d873d Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 03:52:12 +0000 Subject: [PATCH 07/13] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index a2200e459928..8c59a49987be 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -78,10 +78,13 @@ arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr cb, i arrow::Status VeloxSortShuffleWriter::stop() { ARROW_RETURN_IF(evictState_ == EvictState::kUnevictable, arrow::Status::Invalid("Unevictable state in stop.")); - EvictGuard evictGuard{evictState_}; - stopped_ = true; - RETURN_NOT_OK(evictAllPartitions()); + if (offset_ > 0) { + RETURN_NOT_OK(evictAllPartitions()); + } + array_.clear(); + pages_.clear(); + pageAddresses_.clear(); RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); return arrow::Status::OK(); } @@ -91,7 +94,6 @@ arrow::Status VeloxSortShuffleWriter::reclaimFixedSize(int64_t size, int64_t* ac *actual = 0; return arrow::Status::OK(); } - EvictGuard evictGuard{evictState_}; auto beforeReclaim = veloxPool_->usedBytes(); RETURN_NOT_OK(evictAllPartitions()); *actual = beforeReclaim - veloxPool_->usedBytes(); @@ -189,13 +191,14 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) { if ((uint64_t)offset_ + nextRows > std::numeric_limits::max()) { - EvictGuard evictGuard{evictState_}; RETURN_NOT_OK(evictAllPartitions()); } return arrow::Status::OK(); } arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { + EvictGuard evictGuard{evictState_}; + auto numRecords = offset_; int32_t begin = 0; { @@ -221,10 +224,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { RETURN_NOT_OK(evictPartition(pid, begin, cur)); sortedBuffer_ = nullptr; - offset_ = 0; - array_.clear(); if (!stopped_) { + // Preserve the last page for use. auto numPages = pages_.size(); while (--numPages) { pages_.pop_front(); @@ -236,11 +238,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { pageNumber_ = 0; pageCursor_ = 0; + offset_ = 0; + array_.clear(); // Allocate array_ can trigger spill. array_.resize(initialSize_); - } else { - pages_.clear(); - pageAddresses_.clear(); } return arrow::Status::OK(); } From 1a30342fb866ad812a86ac7ec6235eedb28d9acd Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 01:14:33 +0000 Subject: [PATCH 08/13] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 8c59a49987be..51ed820d3558 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -322,9 +322,11 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { if (arraySize != array_.size()) { auto newArray{SortArray{Allocator(allocator_.get())}}; newArray.resize(arraySize); - std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); - array_.clear(); - array_ = std::move(newArray); + if (offset_ > 0) { + std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + array_.clear(); + array_ = std::move(newArray); + } } } From 8b03f02d2a11ec86af05fb393e15794009d56eeb Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 08:09:49 +0000 Subject: [PATCH 09/13] fake array_ allocation --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 61 +++++++++++++++------ cpp/velox/shuffle/VeloxSortShuffleWriter.h | 9 ++- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 51ed820d3558..6bb14c69c3a2 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -64,9 +64,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter( ShuffleWriterOptions options, std::shared_ptr veloxPool, arrow::MemoryPool* pool) - : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool), - allocator_{std::make_unique(veloxPool_.get())}, - array_{SortArray{Allocator(allocator_.get())}} {} + : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool) {} arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr cb, int64_t memLimit) { ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb)); @@ -105,6 +103,7 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); + tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); return arrow::Status::OK(); } @@ -186,6 +185,7 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u auto size = row.serialize(i, currentPage_ + pageCursor_); array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; pageCursor_ += size; + VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } } @@ -231,17 +231,23 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { while (--numPages) { pages_.pop_front(); } - auto& page = pages_.front(); + auto& page = pages_.back(); + // Clear page for serialization. memset(page->asMutable(), 0, page->size()); + // currentPage_ should always point to the last page. + VELOX_CHECK(currentPage_ == page->asMutable()); + pageAddresses_.resize(1); pageAddresses_[0] = currentPage_; pageNumber_ = 0; pageCursor_ = 0; + // Reset and reallocate array_ to minimal size. offset_ = 0; array_.clear(); // Allocate array_ can trigger spill. array_.resize(initialSize_); + tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); } return arrow::Status::OK(); } @@ -263,14 +269,13 @@ std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t b } if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { - sortedBuffer_ = nullptr; sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(rawSize, veloxPool_.get()); } auto* rawBuffer = sortedBuffer_->asMutable(); uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - // size(size_t) | bytes + // size(RowSize) | bytes auto size = array_[i].second; memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); offset += sizeof(RowSizeType); @@ -305,31 +310,51 @@ void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t minSiz auto size = std::max(std::min(memLimit >> 2, 64UL * 1024 * 1024), minSizeRequired); // Allocating new buffer can trigger spill. auto newBuffer = facebook::velox::AlignedBuffer::allocate(size, veloxPool_.get(), 0); + // If spill triggered, clear pages_. + if (offset_ == 0 && pages_.size() > 0) { + pageAddresses_.clear(); + pages_.clear(); + } + currentPage_ = newBuffer->asMutable(); + pageAddresses_.emplace_back(currentPage_); pages_.emplace_back(std::move(newBuffer)); + pageCursor_ = 0; pageNumber_ = pages_.size() - 1; - currentPage_ = pages_.back()->asMutable(); - pageAddresses_.emplace_back(currentPage_); + currenPageSize_ = pages_.back()->size(); } void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { auto arraySize = (uint32_t)array_.size(); - auto usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; - while (offset_ + rows > usableCapacity) { - arraySize <<= 1; - usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; - } - if (arraySize != array_.size()) { - auto newArray{SortArray{Allocator(allocator_.get())}}; - newArray.resize(arraySize); - if (offset_ > 0) { - std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + auto newSize = newArraySize(arraySize, rows); + if (newSize > arraySize) { + SortArray newArray; + // May trigger spill. + newArray.resize(newSize); + auto tmp = facebook::velox::AlignedBuffer::allocate(newSize * sizeof(ElementType), veloxPool_.get()); + // Check if already satisfies. + arraySize = (uint32_t)array_.size(); + if (newArraySize(arraySize, rows) > arraySize) { + if (offset_ > 0) { + std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + } array_.clear(); array_ = std::move(newArray); + tmp_ = tmp; } } } +uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) { + auto newSize = oldSize; + auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + while (offset_ + rows > usableCapacity) { + newSize <<= 1; + usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + } + return newSize; +} + int64_t VeloxSortShuffleWriter::peakBytesAllocated() const { return veloxPool_->peakBytes(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 706d306cfa9f..63136ff14cde 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -85,13 +85,14 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); + uint32_t newArraySize(uint32_t oldSize, uint32_t rows); + using ElementType = std::pair; - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using SortArray = std::vector; - std::unique_ptr allocator_; // Stores compact row id -> row SortArray array_; + facebook::velox::BufferPtr tmp_; uint32_t offset_{0}; std::list pages_; @@ -99,6 +100,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { char* currentPage_; uint32_t pageNumber_; uint32_t pageCursor_; + // For debug. + uint32_t currenPageSize_; // FIXME: Use configuration to replace hardcode. uint32_t initialSize_ = 4096; From 29b1acb191a2b59a5ac4400017b388c7b3a511e5 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 13:17:51 +0000 Subject: [PATCH 10/13] use raw buffer for array_ --- cpp/velox/shuffle/RadixSort.h | 18 +++--- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 61 +++++++++++---------- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 19 +++++-- 3 files changed, 55 insertions(+), 43 deletions(-) diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h index 06d7a89a492b..89b4a45cb884 100644 --- a/cpp/velox/shuffle/RadixSort.h +++ b/cpp/velox/shuffle/RadixSort.h @@ -21,7 +21,7 @@ namespace gluten { -template +template class RadixSort { public: /** @@ -39,11 +39,11 @@ class RadixSort { * @return The starting index of the sorted data within the given array. We return this instead * of always copying the data back to position zero for efficiency. */ - static int32_t sort(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + static int32_t sort(Element* array, size_t size, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { assert(startByteIndex >= 0 && "startByteIndex should >= 0"); assert(endByteIndex <= 7 && "endByteIndex should <= 7"); assert(endByteIndex > startByteIndex); - assert(numRecords * 2 <= array.size()); + assert(numRecords * 2 <= size); int64_t inIndex = 0; int64_t outIndex = numRecords; @@ -59,7 +59,7 @@ class RadixSort { } } - return static_cast(inIndex); + return static_cast(inIndex); } private: @@ -75,7 +75,7 @@ class RadixSort { * @param outIndex the starting index where sorted output data should be written. */ static void sortAtByte( - SortArray& array, + Element* array, int64_t numRecords, std::vector& counts, int32_t byteIdx, @@ -86,7 +86,7 @@ class RadixSort { auto offsets = transformCountsToOffsets(counts, outIndex); for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { - auto bucket = (array[offset].first >> (byteIdx * 8)) & 0xff; + auto bucket = (array[offset].value >> (byteIdx * 8)) & 0xff; array[offsets[bucket]++] = array[offset]; } } @@ -103,7 +103,7 @@ class RadixSort { * significant byte. If the byte does not need sorting the vector entry will be empty. */ static std::vector> - getCounts(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + getCounts(Element* array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { std::vector> counts; counts.resize(8); @@ -112,7 +112,7 @@ class RadixSort { int64_t bitwiseMax = 0; int64_t bitwiseMin = -1L; for (auto offset = 0; offset < numRecords; ++offset) { - auto value = array[offset].first; + auto value = array[offset].value; bitwiseMax |= value; bitwiseMin &= value; } @@ -123,7 +123,7 @@ class RadixSort { if (((bitsChanged >> (i * 8)) & 0xff) != 0) { counts[i].resize(256); for (auto offset = 0; offset < numRecords; ++offset) { - counts[i][(array[offset].first >> (i * 8)) & 0xff]++; + counts[i][(array[offset].value >> (i * 8)) & 0xff]++; } } } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 6bb14c69c3a2..c89c028c8d82 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -80,7 +80,7 @@ arrow::Status VeloxSortShuffleWriter::stop() { if (offset_ > 0) { RETURN_NOT_OK(evictAllPartitions()); } - array_.clear(); + array_ = nullptr; pages_.clear(); pageAddresses_.clear(); RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); @@ -102,8 +102,7 @@ arrow::Status VeloxSortShuffleWriter::init() { ARROW_RETURN_IF( options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); - array_.resize(initialSize_); - tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); + initArray(); return arrow::Status::OK(); } @@ -183,7 +182,9 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u VELOX_CHECK(!pages_.empty()); for (auto i = offset; i < offset + rows; ++i) { auto size = row.serialize(i, currentPage_ + pageCursor_); - array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; + arrayPtr_[offset_].value = toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_); + arrayPtr_[offset_].rowSize = size; + ++offset_; pageCursor_ += size; VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } @@ -204,17 +205,18 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { { ScopedTimer timer(&sortTime_); if (useRadixSort_) { - begin = RadixSort::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); + begin = RadixSort::sort( + arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { - std::sort(array_.begin(), array_.begin() + numRecords); + qsort(arrayPtr_, arraySize_, sizeof(Element), Element::compare); } } auto end = begin + numRecords; auto cur = begin; - auto pid = extractPartitionId(array_[begin].first); + auto pid = extractPartitionId(arrayPtr_[begin].value); while (++cur < end) { - auto curPid = extractPartitionId(array_[cur].first); + auto curPid = extractPartitionId(arrayPtr_[cur].value); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -242,12 +244,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { pageNumber_ = 0; pageCursor_ = 0; - // Reset and reallocate array_ to minimal size. + // Reset and reallocate array_ to minimal size. Allocate array_ can trigger spill. offset_ = 0; - array_.clear(); - // Allocate array_ can trigger spill. - array_.resize(initialSize_); - tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); + initArray(); } return arrow::Status::OK(); } @@ -265,7 +264,7 @@ std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t b uint32_t numRows = end - begin; uint64_t rawSize = numRows * sizeof(RowSizeType); for (auto i = begin; i < end; ++i) { - rawSize += array_[i].second; + rawSize += arrayPtr_[i].rowSize; } if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { @@ -276,10 +275,10 @@ std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t b uint64_t offset = 0; for (auto i = begin; i < end; ++i) { // size(RowSize) | bytes - auto size = array_[i].second; + auto size = arrayPtr_[i].rowSize; memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); offset += sizeof(RowSizeType); - auto index = extractPageNumberAndOffset(array_[i].first); + auto index = extractPageNumberAndOffset(arrayPtr_[i].value); memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); offset += size; } @@ -325,28 +324,26 @@ void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t minSiz } void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { - auto arraySize = (uint32_t)array_.size(); - auto newSize = newArraySize(arraySize, rows); - if (newSize > arraySize) { - SortArray newArray; + auto newSize = newArraySize(rows); + if (newSize > arraySize_) { // May trigger spill. - newArray.resize(newSize); - auto tmp = facebook::velox::AlignedBuffer::allocate(newSize * sizeof(ElementType), veloxPool_.get()); + auto newSizeBytes = newSize * sizeof(Element); + auto newArray = facebook::velox::AlignedBuffer::allocate(newSizeBytes, veloxPool_.get()); // Check if already satisfies. - arraySize = (uint32_t)array_.size(); - if (newArraySize(arraySize, rows) > arraySize) { + if (newArraySize(rows) > arraySize_) { + auto newPtr = newArray->asMutable(); if (offset_ > 0) { - std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + gluten::fastCopy(newPtr, arrayPtr_, arraySize_ * sizeof(Element)); } - array_.clear(); + arraySize_ = newSize; + arrayPtr_ = newPtr; array_ = std::move(newArray); - tmp_ = tmp; } } } -uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) { - auto newSize = oldSize; +uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) { + auto newSize = arraySize_; auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; while (offset_ + rows > usableCapacity) { newSize <<= 1; @@ -355,6 +352,12 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) { return newSize; } +void VeloxSortShuffleWriter::initArray() { + arraySize_ = initialSize_; + array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(Element), veloxPool_.get()); + arrayPtr_ = array_->asMutable(); +} + int64_t VeloxSortShuffleWriter::peakBytesAllocated() const { return veloxPool_->peakBytes(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 63136ff14cde..5af9c6f726aa 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -85,14 +85,23 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); - uint32_t newArraySize(uint32_t oldSize, uint32_t rows); + uint32_t newArraySize(uint32_t rows); - using ElementType = std::pair; - using SortArray = std::vector; + void initArray(); + + struct Element { + uint64_t value; + uint32_t rowSize; + + static int compare(const void* a, const void* b) { + return ((Element*)a)->value - ((Element*)b)->value; + } + }; // Stores compact row id -> row - SortArray array_; - facebook::velox::BufferPtr tmp_; + facebook::velox::BufferPtr array_; + Element* arrayPtr_; + uint32_t arraySize_; uint32_t offset_{0}; std::list pages_; From 18c4e54625f158928af6449dbd5198ba84feb4fa Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 26 Jul 2024 02:48:30 +0000 Subject: [PATCH 11/13] use spark conf for radix sort and buffer size --- cpp/core/CMakeLists.txt | 1 - cpp/core/jni/JniWrapper.cc | 6 +++++- cpp/core/shuffle/Options.cc | 18 ------------------ cpp/core/shuffle/Options.h | 7 +++++++ cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 ++++---- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 4 ---- .../VeloxCelebornColumnarShuffleWriter.scala | 3 +++ .../vectorized/ShuffleWriterJniWrapper.java | 10 ++++++++++ .../spark/shuffle/ColumnarShuffleWriter.scala | 4 +++- .../VeloxUniffleColumnarShuffleWriter.java | 2 ++ 10 files changed, 34 insertions(+), 29 deletions(-) delete mode 100644 cpp/core/shuffle/Options.cc diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index caad8db1eb9f..ef21ccbe855a 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -191,7 +191,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/FallbackRangePartitioner.cc shuffle/HashPartitioner.cc shuffle/LocalPartitionWriter.cc - shuffle/Options.cc shuffle/Partitioner.cc shuffle/Partitioning.cc shuffle/Payload.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 3d6da31c7e75..f39f9c92333e 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -755,6 +755,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jint compressionLevel, jint compressionThreshold, jstring compressionModeJstr, + jint sortBufferInitialSize, + jboolean useRadixSort, jstring dataFileJstr, jint numSubDirs, jstring localDirsJstr, @@ -780,7 +782,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe .partitioning = gluten::toPartitioning(jStringToCString(env, partitioningNameJstr)), .taskAttemptId = (int64_t)taskAttemptId, .startPartitionId = startPartitionId, - .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr))}; + .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)), + .sortBufferInitialSize = sortBufferInitialSize, + .useRadixSort = static_cast(useRadixSort)}; // Build PartitionWriterOptions. auto partitionWriterOptions = PartitionWriterOptions{ diff --git a/cpp/core/shuffle/Options.cc b/cpp/core/shuffle/Options.cc deleted file mode 100644 index 8e05a10d6859..000000000000 --- a/cpp/core/shuffle/Options.cc +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "shuffle/Options.h" diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 757950d03443..11fa037eb5a6 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -35,9 +35,12 @@ static constexpr int32_t kDefaultBufferAlignment = 64; static constexpr double kDefaultBufferReallocThreshold = 0.25; static constexpr double kDefaultMergeBufferThreshold = 0.25; static constexpr bool kEnableBufferedWrite = true; +static constexpr bool kDefaultUseRadixSort = true; +static constexpr int32_t kDefaultSortBufferSize = 4096; enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle }; enum PartitionWriterType { kLocal, kRss }; +enum SortAlgorithm { kRadixSort, kQuickSort }; struct ShuffleReaderOptions { arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; @@ -56,6 +59,10 @@ struct ShuffleWriterOptions { int32_t startPartitionId = 0; int64_t threadId = -1; ShuffleWriterType shuffleWriterType = kHashShuffle; + + // Sort shuffle writer. + int32_t sortBufferInitialSize = kDefaultSortBufferSize; + bool useRadixSort = kDefaultUseRadixSort; }; struct PartitionWriterOptions { diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index c89c028c8d82..48db90515d90 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -204,7 +204,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { int32_t begin = 0; { ScopedTimer timer(&sortTime_); - if (useRadixSort_) { + if (options_.useRadixSort) { begin = RadixSort::sort( arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { @@ -344,16 +344,16 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) { auto newSize = arraySize_; - auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + auto usableCapacity = options_.useRadixSort ? newSize / 2 : newSize; while (offset_ + rows > usableCapacity) { newSize <<= 1; - usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + usableCapacity = options_.useRadixSort ? newSize / 2 : newSize; } return newSize; } void VeloxSortShuffleWriter::initArray() { - arraySize_ = initialSize_; + arraySize_ = options_.sortBufferInitialSize; array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(Element), veloxPool_.get()); arrayPtr_ = array_->asMutable(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 5af9c6f726aa..82d86e181fc8 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -112,10 +112,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // For debug. uint32_t currenPageSize_; - // FIXME: Use configuration to replace hardcode. - uint32_t initialSize_ = 4096; - bool useRadixSort_ = true; - facebook::velox::BufferPtr sortedBuffer_; // Row ID -> Partition ID diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index 4069f1b44324..8f613c72835a 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers} import org.apache.gluten.vectorized._ import org.apache.spark._ +import org.apache.spark.internal.config.{SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle @@ -114,6 +115,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( compressionLevel, bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, + conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, + conf.get(SHUFFLE_SORT_USE_RADIXSORT), clientPushBufferMaxSize, clientPushSortMemoryThreshold, celebornPartitionPusher, diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index 883fc600171f..1d622d491eb5 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -61,6 +61,8 @@ public long make( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, String dataFile, int subDirsPerLocalDir, String localDirs, @@ -80,6 +82,8 @@ public long make( compressionLevel, bufferCompressThreshold, compressionMode, + sortBufferInitialSize, + useRadixSort, dataFile, subDirsPerLocalDir, localDirs, @@ -109,6 +113,8 @@ public long makeForRSS( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, int pushBufferMaxSize, long sortBufferMaxSize, Object pusher, @@ -129,6 +135,8 @@ public long makeForRSS( compressionLevel, bufferCompressThreshold, compressionMode, + sortBufferInitialSize, + useRadixSort, null, 0, null, @@ -154,6 +162,8 @@ public native long nativeMake( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, String dataFile, int subDirsPerLocalDir, String localDirs, diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 5274ec94c8d5..d62ff1d68d6d 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -24,7 +24,7 @@ import org.apache.gluten.vectorized._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.SHUFFLE_COMPRESS +import org.apache.spark.internal.config.{SHUFFLE_COMPRESS, SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.scheduler.MapStatus import org.apache.spark.sql.vectorized.ColumnarBatch @@ -151,6 +151,8 @@ class ColumnarShuffleWriter[K, V]( compressionLevel, bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, + conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, + conf.get(SHUFFLE_SORT_USE_RADIXSORT), dataTmp.getAbsolutePath, blockManager.subDirsPerLocalDir, localDirs, diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 32c672cec6be..b84c9d4ee601 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -148,6 +148,8 @@ protected void writeImpl(Iterator> records) throws IOException { compressionLevel, compressThreshold, GlutenConfig.getConf().columnarShuffleCompressionMode(), + (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), + (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()), bufferSize, bufferSize, partitionPusher, From d2d08f995d7f7ca8f75d766625e795ae7a45008b Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 26 Jul 2024 03:54:54 +0000 Subject: [PATCH 12/13] fix qsort --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 +++++++- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 6 ++---- cpp/velox/tests/VeloxShuffleWriterTest.cc | 11 +++++++---- cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 4 +++- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 48db90515d90..7da5a3db186f 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -208,7 +208,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { begin = RadixSort::sort( arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { - qsort(arrayPtr_, arraySize_, sizeof(Element), Element::compare); + auto ptr = arrayPtr_; + qsort(ptr, numRecords, sizeof(Element), compare); } } @@ -369,4 +370,9 @@ int64_t VeloxSortShuffleWriter::totalSortTime() const { int64_t VeloxSortShuffleWriter::totalC2RTime() const { return c2rTime_; } + +int VeloxSortShuffleWriter::compare(const void* a, const void* b) { + // No same values. + return ((Element*)a)->value > ((Element*)b)->value ? 1 : -1; +} } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 82d86e181fc8..0c555b929cba 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -92,12 +92,10 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { struct Element { uint64_t value; uint32_t rowSize; - - static int compare(const void* a, const void* b) { - return ((Element*)a)->value - ((Element*)b)->value; - } }; + static int compare(const void* a, const void* b); + // Stores compact row id -> row facebook::velox::BufferPtr array_; Element* arrayPtr_; diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 6b86f6a0a15c..af9d5a58db0d 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -70,10 +70,12 @@ std::vector createShuffleTestParams() { std::vector mergeBufferSizes = {0, 3, 4, 10, 4096}; for (const auto& compression : compressions) { + for (auto useRadixSort : {true, false}) { + params.push_back(ShuffleTestParams{ + ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort}); + } params.push_back( - ShuffleTestParams{ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0}); - params.push_back( - ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0}); + ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0, false}); for (const auto compressionThreshold : compressionThresholds) { for (const auto mergeBufferSize : mergeBufferSizes) { params.push_back(ShuffleTestParams{ @@ -81,7 +83,8 @@ std::vector createShuffleTestParams() { PartitionWriterType::kLocal, compression, compressionThreshold, - mergeBufferSize}); + mergeBufferSize, + false /* unused */}); } params.push_back(ShuffleTestParams{ ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0}); diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index f9c2b1d07339..d32e3272186b 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -68,12 +68,13 @@ struct ShuffleTestParams { arrow::Compression::type compressionType; int32_t compressionThreshold; int32_t mergeBufferSize; + bool useRadixSort; std::string toString() const { std::ostringstream out; out << "shuffleWriterType = " << shuffleWriterType << ", partitionWriterType = " << partitionWriterType << ", compressionType = " << compressionType << ", compressionThreshold = " << compressionThreshold - << ", mergeBufferSize = " << mergeBufferSize; + << ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " << (useRadixSort ? "true" : "false"); return out.str(); } }; @@ -250,6 +251,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam Date: Fri, 26 Jul 2024 08:38:29 +0000 Subject: [PATCH 13/13] prealloc sortedBuffer --- cpp/velox/shuffle/RadixSort.h | 6 +- cpp/velox/shuffle/VeloxShuffleReader.cc | 4 +- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 121 +++++++++++--------- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 13 +-- 4 files changed, 75 insertions(+), 69 deletions(-) diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h index 89b4a45cb884..17f05d349c8e 100644 --- a/cpp/velox/shuffle/RadixSort.h +++ b/cpp/velox/shuffle/RadixSort.h @@ -86,7 +86,7 @@ class RadixSort { auto offsets = transformCountsToOffsets(counts, outIndex); for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { - auto bucket = (array[offset].value >> (byteIdx * 8)) & 0xff; + auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff; array[offsets[bucket]++] = array[offset]; } } @@ -112,7 +112,7 @@ class RadixSort { int64_t bitwiseMax = 0; int64_t bitwiseMin = -1L; for (auto offset = 0; offset < numRecords; ++offset) { - auto value = array[offset].value; + auto value = array[offset]; bitwiseMax |= value; bitwiseMin &= value; } @@ -123,7 +123,7 @@ class RadixSort { if (((bitsChanged >> (i * 8)) & 0xff) != 0) { counts[i].resize(256); for (auto offset = 0; offset < numRecords; ++offset) { - counts[i][(array[offset].value >> (i * 8)) & 0xff]++; + counts[i][(array[offset] >> (i * 8)) & 0xff]++; } } } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index e55f4d01de82..ab93d9a33d04 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -428,8 +428,8 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo auto buffer = cur->second; const auto* rawBuffer = buffer->as(); while (rowOffset_ < cur->first && readRows < batchSize_) { - auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_); - byteOffset_ += sizeof(uint32_t); + auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_) - sizeof(RowSizeType); + byteOffset_ += sizeof(RowSizeType); data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize)); byteOffset_ += rowSize; ++rowOffset_; diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 7da5a3db186f..0015ba9d36c3 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -32,6 +32,8 @@ constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1; constexpr uint32_t kPartitionIdStartByteIndex = 5; constexpr uint32_t kPartitionIdEndByteIndex = 7; +constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024; + uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) { // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) | return (uint64_t)partitionId << 40 | (uint64_t)pageNumber << 27 | offsetInPage; @@ -80,7 +82,8 @@ arrow::Status VeloxSortShuffleWriter::stop() { if (offset_ > 0) { RETURN_NOT_OK(evictAllPartitions()); } - array_ = nullptr; + array_.reset(); + sortedBuffer_.reset(); pages_.clear(); pageAddresses_.clear(); RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); @@ -103,6 +106,8 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); initArray(); + sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(kSortedBufferSize, veloxPool_.get()); + rawBuffer_ = sortedBuffer_->asMutable(); return arrow::Status::OK(); } @@ -110,6 +115,9 @@ void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv if (UNLIKELY(!rowType_)) { rowType_ = facebook::velox::asRowType(rv->type()); fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_); + if (fixedRowSize_) { + *fixedRowSize_ += sizeof(RowSizeType); + } } } @@ -150,11 +158,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr facebook::velox::row::CompactRow row(vector); if (!fixedRowSize_) { - rowSizes_.resize(inputRows + 1); - rowSizes_[0] = 0; + rowSize_.resize(inputRows); + rowSizePrefixSum_.resize(inputRows + 1); + rowSizePrefixSum_[0] = 0; for (auto i = 0; i < inputRows; ++i) { - rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i); + auto rowSize = row.rowSize(i) + sizeof(RowSizeType); + rowSize_[i] = rowSize; + rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize; } + } else { + rowSize_.resize(inputRows, *fixedRowSize_); } uint32_t rowOffset = 0; @@ -162,7 +175,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr auto remainingRows = inputRows - rowOffset; auto rows = maxRowsToInsert(rowOffset, remainingRows); if (rows == 0) { - auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : rowSizes_[rowOffset + 1] - rowSizes_[rowOffset]; + auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : rowSize_[rowOffset]; acquireNewBuffer((uint64_t)memLimit, minSizeRequired); rows = maxRowsToInsert(rowOffset, remainingRows); ARROW_RETURN_IF( @@ -181,10 +194,12 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) { VELOX_CHECK(!pages_.empty()); for (auto i = offset; i < offset + rows; ++i) { + auto pid = row2Partition_[i]; + arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); + // size(RowSize) | bytes + memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType)); + pageCursor_ += sizeof(RowSizeType); auto size = row.serialize(i, currentPage_ + pageCursor_); - arrayPtr_[offset_].value = toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_); - arrayPtr_[offset_].rowSize = size; - ++offset_; pageCursor_ += size; VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } @@ -205,19 +220,20 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { { ScopedTimer timer(&sortTime_); if (options_.useRadixSort) { - begin = RadixSort::sort( + begin = RadixSort::sort( arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { auto ptr = arrayPtr_; - qsort(ptr, numRecords, sizeof(Element), compare); + qsort(ptr, numRecords, sizeof(uint64_t), compare); + (void)ptr; } } auto end = begin + numRecords; auto cur = begin; - auto pid = extractPartitionId(arrayPtr_[begin].value); + auto pid = extractPartitionId(arrayPtr_[begin]); while (++cur < end) { - auto curPid = extractPartitionId(arrayPtr_[cur].value); + auto curPid = extractPartitionId(arrayPtr_[cur]); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -226,8 +242,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } RETURN_NOT_OK(evictPartition(pid, begin, cur)); - sortedBuffer_ = nullptr; - if (!stopped_) { // Preserve the last page for use. auto numPages = pages_.size(); @@ -253,43 +267,39 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { - auto payload = prepareToEvict(begin, end); - RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); - return arrow::Status::OK(); -} - -std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t begin, size_t end) { ScopedTimer timer(&sortTime_); // Serialize [begin, end) - uint32_t numRows = end - begin; - uint64_t rawSize = numRows * sizeof(RowSizeType); - for (auto i = begin; i < end; ++i) { - rawSize += arrayPtr_[i].rowSize; - } - - if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { - sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(rawSize, veloxPool_.get()); - } - auto* rawBuffer = sortedBuffer_->asMutable(); - uint64_t offset = 0; - for (auto i = begin; i < end; ++i) { - // size(RowSize) | bytes - auto size = arrayPtr_[i].rowSize; - memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); - offset += sizeof(RowSizeType); - auto index = extractPageNumberAndOffset(arrayPtr_[i].value); - memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); + char* addr; + uint32_t size; + + auto index = begin; + while (index < end) { + auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]); + addr = pageAddresses_[pageIndex.first] + pageIndex.second; + size = *(RowSizeType*)addr; + if (offset + size > kSortedBufferSize) { + VELOX_CHECK(offset > 0); + auto payload = std::make_unique( + index - begin, + nullptr, + std::vector>{std::make_shared(rawBuffer_, offset)}); + RETURN_NOT_OK( + partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + begin = index; + offset = 0; + } + gluten::fastCopy(rawBuffer_ + offset, addr, size); offset += size; + index++; } - VELOX_CHECK_EQ(offset, rawSize); - - auto rawData = sortedBuffer_->as(); - std::vector> buffers; - buffers.push_back(std::make_shared(rawData, rawSize)); - - return std::make_unique(numRows, nullptr, std::move(buffers)); + auto payload = std::make_unique( + end - begin, + nullptr, + std::vector>{std::make_shared(rawBuffer_, offset)}); + RETURN_NOT_OK( + partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + return arrow::Status::OK(); } uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) { @@ -301,8 +311,8 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) if (fixedRowSize_) { return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), rows); } - auto beginIter = rowSizes_.begin() + 1 + offset; - auto iter = std::upper_bound(beginIter, rowSizes_.end(), remainingBytes); + auto beginIter = rowSizePrefixSum_.begin() + 1 + offset; + auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes); return iter - beginIter; } @@ -328,17 +338,18 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { auto newSize = newArraySize(rows); if (newSize > arraySize_) { // May trigger spill. - auto newSizeBytes = newSize * sizeof(Element); + auto newSizeBytes = newSize * sizeof(uint64_t); auto newArray = facebook::velox::AlignedBuffer::allocate(newSizeBytes, veloxPool_.get()); // Check if already satisfies. if (newArraySize(rows) > arraySize_) { - auto newPtr = newArray->asMutable(); + auto newPtr = newArray->asMutable(); if (offset_ > 0) { - gluten::fastCopy(newPtr, arrayPtr_, arraySize_ * sizeof(Element)); + gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t)); } arraySize_ = newSize; arrayPtr_ = newPtr; - array_ = std::move(newArray); + array_.reset(); + array_.swap(newArray); } } } @@ -355,8 +366,8 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) { void VeloxSortShuffleWriter::initArray() { arraySize_ = options_.sortBufferInitialSize; - array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(Element), veloxPool_.get()); - arrayPtr_ = array_->asMutable(); + array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(uint64_t), veloxPool_.get()); + arrayPtr_ = array_->asMutable(); } int64_t VeloxSortShuffleWriter::peakBytesAllocated() const { @@ -373,6 +384,6 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const { int VeloxSortShuffleWriter::compare(const void* a, const void* b) { // No same values. - return ((Element*)a)->value > ((Element*)b)->value ? 1 : -1; + return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1; } } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 0c555b929cba..747593ae457d 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -77,8 +77,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end); - std::unique_ptr prepareToEvict(size_t begin, size_t end); - uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows); void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired); @@ -89,16 +87,11 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void initArray(); - struct Element { - uint64_t value; - uint32_t rowSize; - }; - static int compare(const void* a, const void* b); // Stores compact row id -> row facebook::velox::BufferPtr array_; - Element* arrayPtr_; + uint64_t* arrayPtr_; uint32_t arraySize_; uint32_t offset_{0}; @@ -111,6 +104,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { uint32_t currenPageSize_; facebook::velox::BufferPtr sortedBuffer_; + uint8_t* rawBuffer_; // Row ID -> Partition ID // subscript: The index of row in the current input RowVector @@ -120,7 +114,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { std::shared_ptr rowType_; std::optional fixedRowSize_; - std::vector rowSizes_; + std::vector rowSize_; + std::vector rowSizePrefixSum_; int64_t c2rTime_{0}; int64_t sortTime_{0};