From ece1d4f8ce7db0426219701bc23014c526c4c4fe Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 04:14:19 +0000 Subject: [PATCH 01/12] add shuffle write wall time metrics --- .../gluten/backendsapi/velox/VeloxMetricsApi.scala | 1 + .../VeloxCelebornColumnarShuffleWriter.scala | 14 ++++++++++---- .../spark/shuffle/ColumnarShuffleWriter.scala | 7 +++---- .../writer/VeloxUniffleColumnarShuffleWriter.java | 6 +++--- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 30b08749e907..c05ce7cdcd4f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -272,6 +272,7 @@ class VeloxMetricsApi extends MetricsApi with Logging { "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to compress"), "decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to decompress"), "deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to deserialize"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle wall time"), // For hash shuffle writer, the peak bytes represents the maximum split buffer size. // For sort shuffle writer, the peak bytes represents the maximum // row buffer + sort buffer size. diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index baf61b8a14f2..f13ef20a59ff 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -48,6 +48,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( celebornConf, client, writeMetrics) { + private val isSort = !GlutenConfig.GLUTEN_HASH_SHUFFLE_WRITER.equals(shuffleWriterType) private val runtime = Runtimes.contextInstance("CelebornShuffleWriter") @@ -72,7 +73,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( val handle = ColumnarBatches.getNativeHandle(cb) val startTime = System.nanoTime() jniWrapper.write(nativeShuffleWriter, cb.numRows, handle, availableOffHeapPerTask()) - dep.metrics("splitTime").add(System.nanoTime() - startTime) + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(cb.numRows) dep.metrics("inputBatches").add(1) // This metric is important, AQE use it to decide if EliminateLimit @@ -84,10 +85,15 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( val startTime = System.nanoTime() splitResult = jniWrapper.stop(nativeShuffleWriter) - dep - .metrics("splitTime") + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) + val nativeMetrics = if (isSort) { + dep.metrics("splitTime") + } else { + dep.metrics("sortTime") + } + nativeMetrics .add( - System.nanoTime() - startTime - splitResult.getTotalPushTime - + dep.metrics("shuffleWallTime").value - splitResult.getTotalPushTime - splitResult.getTotalWriteTime - splitResult.getTotalCompressTime) dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum) diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 8a8248f2397f..5274ec94c8d5 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -176,9 +176,7 @@ class ColumnarShuffleWriter[K, V]( } val startTime = System.nanoTime() jniWrapper.write(nativeShuffleWriter, rows, handle, availableOffHeapPerTask()) - if (!isSort) { - dep.metrics("splitTime").add(System.nanoTime() - startTime) - } + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(rows) dep.metrics("inputBatches").add(1) // This metric is important, AQE use it to decide if EliminateLimit @@ -191,11 +189,12 @@ class ColumnarShuffleWriter[K, V]( assert(nativeShuffleWriter != -1L) splitResult = jniWrapper.stop(nativeShuffleWriter) closeShuffleWriter() + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) if (!isSort) { dep .metrics("splitTime") .add( - System.nanoTime() - startTime - splitResult.getTotalSpillTime - + dep.metrics("shuffleWallTime").value - splitResult.getTotalSpillTime - splitResult.getTotalWriteTime - splitResult.getTotalCompressTime) } else { diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 62e1971635ce..32c672cec6be 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -180,7 +180,7 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { LOG.debug("jniWrapper.write rows {}, split bytes {}", cb.numRows(), bytes); columnarDep.metrics().get("dataSize").get().add(bytes); // this metric replace part of uniffle shuffle write time - columnarDep.metrics().get("splitTime").get().add(System.nanoTime() - startTime); + columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - startTime); columnarDep.metrics().get("numInputRows").get().add(cb.numRows()); columnarDep.metrics().get("inputBatches").get().add(1); shuffleWriteMetrics.incRecordsWritten(cb.numRows()); @@ -193,13 +193,13 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { throw new IllegalStateException("nativeShuffleWriter should not be -1L"); } splitResult = jniWrapper.stop(nativeShuffleWriter); + columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - startTime); columnarDep .metrics() .get("splitTime") .get() .add( - System.nanoTime() - - startTime + columnarDep.metrics().get("shuffleWallTime").get().value() - splitResult.getTotalPushTime() - splitResult.getTotalWriteTime() - splitResult.getTotalCompressTime()); From bf5ad1ebac3ac7d8bf2d25f5dbf0730805d1762f Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 04:14:40 +0000 Subject: [PATCH 02/12] radix sort --- cpp/velox/shuffle/RadixSort.h | 157 ++++++++++++++++++++ cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +- cpp/velox/shuffle/VeloxShuffleReader.h | 3 + cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 53 ++++--- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 13 +- 5 files changed, 204 insertions(+), 27 deletions(-) create mode 100644 cpp/velox/shuffle/RadixSort.h diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h new file mode 100644 index 000000000000..49a40d2add0a --- /dev/null +++ b/cpp/velox/shuffle/RadixSort.h @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +namespace gluten { + +template +class RadixSort { + public: + /** + * Sorts a given array of longs using least-significant-digit radix sort. This routine assumes + * you have extra space at the end of the array at least equal to the number of records. The + * sort is destructive and may relocate the data positioned within the array. + * + * @param array array of long elements followed by at least that many empty slots. + * @param numRecords number of data records in the array. + * @param startByteIndex the first byte (in range [0, 7]) to sort each long by, counting from the + * least significant byte. + * @param endByteIndex the last byte (in range [0, 7]) to sort each long by, counting from the + * least significant byte. Must be greater than startByteIndex. + * + * @return The starting index of the sorted data within the given array. We return this instead + * of always copying the data back to position zero for efficiency. + */ + static int32_t sort(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + assert(startByteIndex >= 0 && "startByteIndex should >= 0"); + assert(endByteIndex <= 7 && "endByteIndex should <= 7"); + assert(endByteIndex > startByteIndex); + assert(numRecords * 2 <= array.size()); + + int64_t inIndex = 0; + int64_t outIndex = numRecords; + + if (numRecords > 0) { + auto counts = getCounts(array, numRecords, startByteIndex, endByteIndex); + + for (auto i = startByteIndex; i <= endByteIndex; i++) { + if (!counts[i].empty()) { + sortAtByte(array, numRecords, counts[i], i, inIndex, outIndex); + std::swap(inIndex, outIndex); + } + } + } + + return static_cast(inIndex); + } + + private: + /** + * Performs a partial sort by copying data into destination offsets for each byte value at the + * specified byte offset. + * + * @param array array to partially sort. + * @param numRecords number of data records in the array. + * @param counts counts for each byte value. This routine destructively modifies this array. + * @param byteIdx the byte in a long to sort at, counting from the least significant byte. + * @param inIndex the starting index in the array where input data is located. + * @param outIndex the starting index where sorted output data should be written. + */ + static void sortAtByte( + SortArray& array, + int64_t numRecords, + std::vector& counts, + int32_t byteIdx, + int64_t inIndex, + int64_t outIndex) { + assert(counts.size() == 256); + + auto offsets = transformCountsToOffsets(counts, outIndex); + + for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { + auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff; + array[offsets[bucket]++] = array[offset]; + } + } + + /** + * Computes a value histogram for each byte in the given array. + * + * @param array array to count records in. + * @param numRecords number of data records in the array. + * @param startByteIndex the first byte to compute counts for (the prior are skipped). + * @param endByteIndex the last byte to compute counts for. + * + * @return a vector of eight 256-element count arrays, one for each byte starting from the least + * significant byte. If the byte does not need sorting the vector entry will be empty. + */ + static std::vector> + getCounts(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + std::vector> counts; + counts.resize(8); + + // Optimization: do a fast pre-pass to determine which byte indices we can skip for sorting. + // If all the byte values at a particular index are the same we don't need to count it. + int64_t bitwiseMax = 0; + int64_t bitwiseMin = -1L; + for (auto offset = 0; offset < numRecords; ++offset) { + auto value = array[offset]; + bitwiseMax |= value; + bitwiseMin &= value; + } + auto bitsChanged = bitwiseMin ^ bitwiseMax; + + // Compute counts for each byte index. + for (auto i = startByteIndex; i <= endByteIndex; i++) { + if (((bitsChanged >> (i * 8)) & 0xff) != 0) { + counts[i].resize(256); + for (auto offset = 0; offset < numRecords; ++offset) { + counts[i][(array[offset] >> (i * 8)) & 0xff]++; + } + } + } + + return counts; + } + + /** + * Transforms counts into the proper output offsets for the sort type. + * + * @param counts counts for each byte value. This routine destructively modifies this vector. + * @param numRecords number of data records in the original data array. + * @param outputOffset output offset in bytes from the base array object. + * + * @return the input counts vector. + */ + static std::vector& transformCountsToOffsets(std::vector& counts, int64_t outputOffset) { + assert(counts.size() == 256); + + int64_t pos = outputOffset; + for (auto i = 0; i < 256; i++) { + auto tmp = counts[i & 0xff]; + counts[i & 0xff] = pos; + pos += tmp; + } + + return counts; + } +}; + +} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index e55f4d01de82..10673cceeaaf 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -428,9 +428,8 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo auto buffer = cur->second; const auto* rawBuffer = buffer->as(); while (rowOffset_ < cur->first && readRows < batchSize_) { - auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_); - byteOffset_ += sizeof(uint32_t); - data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize)); + auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_); + data.push_back(std::string_view(rawBuffer + byteOffset_ + sizeof(RowSizeType), rowSize - sizeof(RowSizeType))); byteOffset_ += rowSize; ++rowOffset_; ++readRows; diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 96273948526d..2be913aa13a7 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -20,6 +20,7 @@ #include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "shuffle/Payload.h" #include "shuffle/ShuffleReader.h" +#include "shuffle/VeloxSortShuffleWriter.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" @@ -64,6 +65,8 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator { public: + using RowSizeType = VeloxSortShuffleWriter::RowSizeType; + VeloxSortShuffleReaderDeserializer( std::shared_ptr in, const std::shared_ptr& schema, diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index cb81ea5f6c8a..f274cb1a291d 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -29,6 +29,8 @@ namespace gluten { namespace { constexpr uint32_t kMaskLower27Bits = (1 << 27) - 1; constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1; +constexpr uint32_t kPartitionIdStartByteIndex = 5; +constexpr uint32_t kPartitionIdEndByteIndex = 7; uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) { // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) | @@ -101,6 +103,7 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); + partitionRawSize_.resize(numPartitions_, 0); return arrow::Status::OK(); } @@ -108,6 +111,9 @@ void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv if (UNLIKELY(!rowType_)) { rowType_ = facebook::velox::asRowType(rv->type()); fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_); + if (fixedRowSize_) { + *fixedRowSize_ += sizeof(RowSizeType); + } } } @@ -151,7 +157,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr rowSizes_.resize(inputRows + 1); rowSizes_[0] = 0; for (auto i = 0; i < inputRows; ++i) { - rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i); + rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i) + sizeof(RowSizeType); } } @@ -177,8 +183,15 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u // Allocate newArray can trigger spill. growArrayIfNecessary(rows); for (auto i = offset; i < offset + rows; ++i) { - auto size = row.serialize(i, currentPage_ + pageCursor_); - array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; + auto pid = row2Partition_[i]; + array_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); + + // (RowSizeType)size | serialized row + auto dest = currentPage_ + pageCursor_; + RowSizeType size = sizeof(RowSizeType) + row.serialize(i, dest + sizeof(RowSizeType)); + memcpy(dest, &size, sizeof(RowSizeType)); + + partitionRawSize_[pid] += size; pageCursor_ += size; } } @@ -192,17 +205,23 @@ arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) { } arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { + auto numRecords = offset_; + int32_t begin = 0; { ScopedTimer timer(&sortTime_); // TODO: Add radix sort to align with Spark. - std::sort(array_.begin(), array_.begin() + offset_); + if (useRadixSort_) { + begin = RadixSort::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); + } else { + std::sort(array_.begin(), array_.begin() + numRecords); + } } - size_t begin = 0; - size_t cur = 0; - auto pid = extractPartitionId(array_[begin].first); - while (++cur < offset_) { - auto curPid = extractPartitionId(array_[cur].first); + auto end = begin + numRecords; + auto cur = begin; + auto pid = extractPartitionId(array_[begin]); + while (++cur < end) { + auto curPid = extractPartitionId(array_[cur]); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -230,10 +249,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { // Serialize [begin, end) uint32_t numRows = end - begin; - uint64_t rawSize = numRows * sizeof(RowSizeType); - for (auto i = begin; i < end; ++i) { - rawSize += array_[i].second; - } + uint64_t rawSize = partitionRawSize_[partitionId]; + partitionRawSize_[partitionId] = 0; if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { sortedBuffer_ = nullptr; @@ -243,12 +260,10 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - // size(size_t) | bytes - auto size = array_[i].second; - memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); - offset += sizeof(RowSizeType); - auto index = extractPageNumberAndOffset(array_[i].first); - memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); + auto index = extractPageNumberAndOffset(array_[i]); + const auto* src = pageAddresses_[index.first] + index.second; + auto size = *(RowSizeType*)(src); + memcpy(rawBuffer + offset, src, size); offset += size; } VELOX_CHECK_EQ(offset, rawSize); diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 8f01997b11e1..cb393ea5474f 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -17,6 +17,7 @@ #pragma once +#include "shuffle/RadixSort.h" #include "shuffle/VeloxShuffleWriter.h" #include @@ -31,6 +32,8 @@ namespace gluten { class VeloxSortShuffleWriter final : public VeloxShuffleWriter { public: + using RowSizeType = uint32_t; + static arrow::Result> create( uint32_t numPartitions, std::unique_ptr partitionWriter, @@ -80,10 +83,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); - using RowSizeType = uint32_t; - using ElementType = std::pair; - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using Allocator = facebook::velox::StlAllocator; + using SortArray = std::vector; std::unique_ptr allocator_; // Stores compact row id -> row @@ -98,7 +99,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // FIXME: Use configuration to replace hardcode. uint32_t initialSize_ = 4096; - bool useRadixSort_ = false; + bool useRadixSort_ = true; facebook::velox::BufferPtr sortedBuffer_; @@ -108,6 +109,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // Updated for each input RowVector. std::vector row2Partition_; + std::vector partitionRawSize_; + std::shared_ptr rowType_; std::optional fixedRowSize_; std::vector rowSizes_; From d892f0929b790abf77f14e5f062fb0e9a4e896c0 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 04:26:46 +0000 Subject: [PATCH 03/12] partial revert --- cpp/velox/shuffle/RadixSort.h | 6 ++-- cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +-- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 38 ++++++++------------- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 7 ++-- 4 files changed, 24 insertions(+), 32 deletions(-) diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h index 49a40d2add0a..06d7a89a492b 100644 --- a/cpp/velox/shuffle/RadixSort.h +++ b/cpp/velox/shuffle/RadixSort.h @@ -86,7 +86,7 @@ class RadixSort { auto offsets = transformCountsToOffsets(counts, outIndex); for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { - auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff; + auto bucket = (array[offset].first >> (byteIdx * 8)) & 0xff; array[offsets[bucket]++] = array[offset]; } } @@ -112,7 +112,7 @@ class RadixSort { int64_t bitwiseMax = 0; int64_t bitwiseMin = -1L; for (auto offset = 0; offset < numRecords; ++offset) { - auto value = array[offset]; + auto value = array[offset].first; bitwiseMax |= value; bitwiseMin &= value; } @@ -123,7 +123,7 @@ class RadixSort { if (((bitsChanged >> (i * 8)) & 0xff) != 0) { counts[i].resize(256); for (auto offset = 0; offset < numRecords; ++offset) { - counts[i][(array[offset] >> (i * 8)) & 0xff]++; + counts[i][(array[offset].first >> (i * 8)) & 0xff]++; } } } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 10673cceeaaf..e55f4d01de82 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -428,8 +428,9 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo auto buffer = cur->second; const auto* rawBuffer = buffer->as(); while (rowOffset_ < cur->first && readRows < batchSize_) { - auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_); - data.push_back(std::string_view(rawBuffer + byteOffset_ + sizeof(RowSizeType), rowSize - sizeof(RowSizeType))); + auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_); + byteOffset_ += sizeof(uint32_t); + data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize)); byteOffset_ += rowSize; ++rowOffset_; ++readRows; diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index f274cb1a291d..6bb83bb1e931 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -103,7 +103,6 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); - partitionRawSize_.resize(numPartitions_, 0); return arrow::Status::OK(); } @@ -111,9 +110,6 @@ void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv if (UNLIKELY(!rowType_)) { rowType_ = facebook::velox::asRowType(rv->type()); fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_); - if (fixedRowSize_) { - *fixedRowSize_ += sizeof(RowSizeType); - } } } @@ -157,7 +153,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr rowSizes_.resize(inputRows + 1); rowSizes_[0] = 0; for (auto i = 0; i < inputRows; ++i) { - rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i) + sizeof(RowSizeType); + rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i); } } @@ -183,15 +179,8 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u // Allocate newArray can trigger spill. growArrayIfNecessary(rows); for (auto i = offset; i < offset + rows; ++i) { - auto pid = row2Partition_[i]; - array_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); - - // (RowSizeType)size | serialized row - auto dest = currentPage_ + pageCursor_; - RowSizeType size = sizeof(RowSizeType) + row.serialize(i, dest + sizeof(RowSizeType)); - memcpy(dest, &size, sizeof(RowSizeType)); - - partitionRawSize_[pid] += size; + auto size = row.serialize(i, currentPage_ + pageCursor_); + array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; pageCursor_ += size; } } @@ -209,7 +198,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { int32_t begin = 0; { ScopedTimer timer(&sortTime_); - // TODO: Add radix sort to align with Spark. if (useRadixSort_) { begin = RadixSort::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { @@ -219,9 +207,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { auto end = begin + numRecords; auto cur = begin; - auto pid = extractPartitionId(array_[begin]); + auto pid = extractPartitionId(array_[begin].first); while (++cur < end) { - auto curPid = extractPartitionId(array_[cur]); + auto curPid = extractPartitionId(array_[cur].first); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -249,8 +237,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { // Serialize [begin, end) uint32_t numRows = end - begin; - uint64_t rawSize = partitionRawSize_[partitionId]; - partitionRawSize_[partitionId] = 0; + uint64_t rawSize = numRows * sizeof(RowSizeType); + for (auto i = begin; i < end; ++i) { + rawSize += array_[i].second; + } if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { sortedBuffer_ = nullptr; @@ -260,10 +250,12 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - auto index = extractPageNumberAndOffset(array_[i]); - const auto* src = pageAddresses_[index.first] + index.second; - auto size = *(RowSizeType*)(src); - memcpy(rawBuffer + offset, src, size); + // size(size_t) | bytes + auto size = array_[i].second; + memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); + offset += sizeof(RowSizeType); + auto index = extractPageNumberAndOffset(array_[i].first); + memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); offset += size; } VELOX_CHECK_EQ(offset, rawSize); diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index cb393ea5474f..d866dfc419a0 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -83,8 +83,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using ElementType = std::pair; + using Allocator = facebook::velox::StlAllocator; + using SortArray = std::vector; std::unique_ptr allocator_; // Stores compact row id -> row @@ -109,8 +110,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // Updated for each input RowVector. std::vector row2Partition_; - std::vector partitionRawSize_; - std::shared_ptr rowType_; std::optional fixedRowSize_; std::vector rowSizes_; From 4655d31f706913288d55bf32d7df785cbd1c5dd5 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 08:03:51 +0000 Subject: [PATCH 04/12] update metrics --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 12 ++++++++---- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 ++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 6bb83bb1e931..1ab6c6c51e6c 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -235,6 +235,13 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { + auto payload = prepareToEvict(begin, end); + RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + return arrow::Status::OK(); +} + +std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t begin, size_t end) { + ScopedTimer timer(&sortTime_); // Serialize [begin, end) uint32_t numRows = end - begin; uint64_t rawSize = numRows * sizeof(RowSizeType); @@ -264,10 +271,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ std::vector> buffers; buffers.push_back(std::make_shared(rawData, rawSize)); - auto payload = std::make_unique(numRows, nullptr, std::move(buffers)); - RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); - return arrow::Status::OK(); + return std::make_unique(numRows, nullptr, std::move(buffers)); } uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t rows) { diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index d866dfc419a0..0185662cc0b4 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -77,6 +77,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end); + std::unique_ptr prepareToEvict(size_t begin, size_t end); + uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows); void acquireNewBuffer(int64_t memLimit, uint64_t minSizeRequired); From 2c46dbc41790976d14231640c9cf875e58a50c8b Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 24 Jul 2024 15:45:39 +0000 Subject: [PATCH 05/12] address comments & fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 9 +++++++-- .../shuffle/VeloxCelebornColumnarBatchSerializer.scala | 2 ++ .../src/main/scala/org/apache/gluten/GlutenConfig.scala | 1 - 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 1ab6c6c51e6c..5439b73402f2 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -236,7 +236,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { auto payload = prepareToEvict(begin, end); - RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + RETURN_NOT_OK( + partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); return arrow::Status::OK(); } @@ -307,7 +308,11 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; } if (arraySize != array_.size()) { - array_.resize(arraySize); + auto newArray{SortArray{Allocator(allocator_.get())}}; + newArray.resize(arraySize); + std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + array_.clear(); + array_ = std::move(newArray); } } diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index dd4904964957..6f21b528f1c2 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle import org.apache.gluten.GlutenConfig +import org.apache.gluten.GlutenConfig.{GLUTEN_RSS_SORT_SHUFFLE_WRITER, GLUTEN_SORT_SHUFFLE_WRITER} import org.apache.gluten.exec.Runtimes import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.utils.ArrowAbiUtil @@ -84,6 +85,7 @@ private class CelebornColumnarBatchSerializerInstance( val compressionCodecBackend = GlutenConfig.getConf.columnarShuffleCodecBackend.orNull val shuffleWriterType = GlutenConfig.getConf.celebornShuffleWriterType + .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER) val jniWrapper = ShuffleReaderJniWrapper.create(runtime) val batchSize = GlutenConfig.getConf.maxBatchSize val handle = jniWrapper diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index b2aa176fd205..5311e15f173e 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -137,7 +137,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { conf .getConfString("spark.celeborn.client.spark.shuffle.writer", GLUTEN_HASH_SHUFFLE_WRITER) .toLowerCase(Locale.ROOT) - .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER) def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED) From aeeb632e982a18e289c8931b81ef9bdecfb8a0c8 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 02:45:03 +0000 Subject: [PATCH 06/12] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 27 +++++++++++++------ cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 +- .../VeloxCelebornColumnarShuffleWriter.scala | 4 +-- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 5439b73402f2..f03065eeed39 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -168,7 +168,10 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr ARROW_RETURN_IF( rows == 0, arrow::Status::Invalid("Failed to insert rows. Remaining rows: " + std::to_string(remainingRows))); } + // Spill to avoid offset_ overflow. RETURN_NOT_OK(maybeSpill(rows)); + // Allocate newArray can trigger spill. + growArrayIfNecessary(rows); insertRows(row, rowOffset, rows); rowOffset += rows; } @@ -176,8 +179,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr } void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) { - // Allocate newArray can trigger spill. - growArrayIfNecessary(rows); + VELOX_CHECK(!pages_.empty()); for (auto i = offset; i < offset + rows; ++i) { auto size = row.serialize(i, currentPage_ + pageCursor_); array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; @@ -218,18 +220,27 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } RETURN_NOT_OK(evictPartition(pid, begin, cur)); - pageCursor_ = 0; - pages_.clear(); - pageAddresses_.clear(); - + sortedBuffer_ = nullptr; offset_ = 0; array_.clear(); - sortedBuffer_ = nullptr; - if (!stopped_) { + auto numPages = pages_.size(); + while (--numPages) { + pages_.pop_front(); + } + auto& page = pages_.front(); + memset(page->asMutable(), 0, page->size()); + pageAddresses_.resize(1); + pageAddresses_[0] = currentPage_; + pageNumber_ = 0; + pageCursor_ = 0; + // Allocate array_ can trigger spill. array_.resize(initialSize_); + } else { + pages_.clear(); + pageAddresses_.clear(); } return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 0185662cc0b4..45f54556081d 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -94,7 +94,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { SortArray array_; uint32_t offset_{0}; - std::vector pages_; + std::list pages_; std::vector pageAddresses_; char* currentPage_; uint32_t pageNumber_; diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index f13ef20a59ff..4069f1b44324 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -87,9 +87,9 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) val nativeMetrics = if (isSort) { - dep.metrics("splitTime") - } else { dep.metrics("sortTime") + } else { + dep.metrics("splitTime") } nativeMetrics .add( From 42062d8fc7b9fca8977503a77cc81ed9ba65f914 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 03:52:12 +0000 Subject: [PATCH 07/12] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index f03065eeed39..6293061fb506 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -78,10 +78,13 @@ arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr cb, i arrow::Status VeloxSortShuffleWriter::stop() { ARROW_RETURN_IF(evictState_ == EvictState::kUnevictable, arrow::Status::Invalid("Unevictable state in stop.")); - EvictGuard evictGuard{evictState_}; - stopped_ = true; - RETURN_NOT_OK(evictAllPartitions()); + if (offset_ > 0) { + RETURN_NOT_OK(evictAllPartitions()); + } + array_.clear(); + pages_.clear(); + pageAddresses_.clear(); RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); return arrow::Status::OK(); } @@ -91,7 +94,6 @@ arrow::Status VeloxSortShuffleWriter::reclaimFixedSize(int64_t size, int64_t* ac *actual = 0; return arrow::Status::OK(); } - EvictGuard evictGuard{evictState_}; auto beforeReclaim = veloxPool_->usedBytes(); RETURN_NOT_OK(evictAllPartitions()); *actual = beforeReclaim - veloxPool_->usedBytes(); @@ -189,13 +191,14 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) { if ((uint64_t)offset_ + nextRows > std::numeric_limits::max()) { - EvictGuard evictGuard{evictState_}; RETURN_NOT_OK(evictAllPartitions()); } return arrow::Status::OK(); } arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { + EvictGuard evictGuard{evictState_}; + auto numRecords = offset_; int32_t begin = 0; { @@ -221,10 +224,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { RETURN_NOT_OK(evictPartition(pid, begin, cur)); sortedBuffer_ = nullptr; - offset_ = 0; - array_.clear(); if (!stopped_) { + // Preserve the last page for use. auto numPages = pages_.size(); while (--numPages) { pages_.pop_front(); @@ -236,11 +238,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { pageNumber_ = 0; pageCursor_ = 0; + offset_ = 0; + array_.clear(); // Allocate array_ can trigger spill. array_.resize(initialSize_); - } else { - pages_.clear(); - pageAddresses_.clear(); } return arrow::Status::OK(); } From f24c410549929bf7ffd50f22d9cc1b964c564690 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 01:14:33 +0000 Subject: [PATCH 08/12] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 6293061fb506..1310d0227b7f 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -322,9 +322,11 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { if (arraySize != array_.size()) { auto newArray{SortArray{Allocator(allocator_.get())}}; newArray.resize(arraySize); - std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); - array_.clear(); - array_ = std::move(newArray); + if (offset_ > 0) { + std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + array_.clear(); + array_ = std::move(newArray); + } } } From 501a1160cbd0f9fe40a5c6f000c2fecfe5fdf048 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 08:09:49 +0000 Subject: [PATCH 09/12] fake array_ allocation --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 61 +++++++++++++++------ cpp/velox/shuffle/VeloxSortShuffleWriter.h | 9 ++- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 1310d0227b7f..d014ddf6315c 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -64,9 +64,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter( ShuffleWriterOptions options, std::shared_ptr veloxPool, arrow::MemoryPool* pool) - : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool), - allocator_{std::make_unique(veloxPool_.get())}, - array_{SortArray{Allocator(allocator_.get())}} {} + : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool) {} arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr cb, int64_t memLimit) { ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb)); @@ -105,6 +103,7 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); array_.resize(initialSize_); + tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); return arrow::Status::OK(); } @@ -186,6 +185,7 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u auto size = row.serialize(i, currentPage_ + pageCursor_); array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; pageCursor_ += size; + VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } } @@ -231,17 +231,23 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { while (--numPages) { pages_.pop_front(); } - auto& page = pages_.front(); + auto& page = pages_.back(); + // Clear page for serialization. memset(page->asMutable(), 0, page->size()); + // currentPage_ should always point to the last page. + VELOX_CHECK(currentPage_ == page->asMutable()); + pageAddresses_.resize(1); pageAddresses_[0] = currentPage_; pageNumber_ = 0; pageCursor_ = 0; + // Reset and reallocate array_ to minimal size. offset_ = 0; array_.clear(); // Allocate array_ can trigger spill. array_.resize(initialSize_); + tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); } return arrow::Status::OK(); } @@ -263,14 +269,13 @@ std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t b } if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { - sortedBuffer_ = nullptr; sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(rawSize, veloxPool_.get()); } auto* rawBuffer = sortedBuffer_->asMutable(); uint64_t offset = 0; for (auto i = begin; i < end; ++i) { - // size(size_t) | bytes + // size(RowSize) | bytes auto size = array_[i].second; memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); offset += sizeof(RowSizeType); @@ -305,31 +310,51 @@ void VeloxSortShuffleWriter::acquireNewBuffer(int64_t memLimit, uint64_t minSize auto size = std::max(std::min((uint64_t)memLimit >> 2, 64UL * 1024 * 1024), minSizeRequired); // Allocating new buffer can trigger spill. auto newBuffer = facebook::velox::AlignedBuffer::allocate(size, veloxPool_.get(), 0); + // If spill triggered, clear pages_. + if (offset_ == 0 && pages_.size() > 0) { + pageAddresses_.clear(); + pages_.clear(); + } + currentPage_ = newBuffer->asMutable(); + pageAddresses_.emplace_back(currentPage_); pages_.emplace_back(std::move(newBuffer)); + pageCursor_ = 0; pageNumber_ = pages_.size() - 1; - currentPage_ = pages_.back()->asMutable(); - pageAddresses_.emplace_back(currentPage_); + currenPageSize_ = pages_.back()->size(); } void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { auto arraySize = (uint32_t)array_.size(); - auto usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; - while (offset_ + rows > usableCapacity) { - arraySize <<= 1; - usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize; - } - if (arraySize != array_.size()) { - auto newArray{SortArray{Allocator(allocator_.get())}}; - newArray.resize(arraySize); - if (offset_ > 0) { - std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + auto newSize = newArraySize(arraySize, rows); + if (newSize > arraySize) { + SortArray newArray; + // May trigger spill. + newArray.resize(newSize); + auto tmp = facebook::velox::AlignedBuffer::allocate(newSize * sizeof(ElementType), veloxPool_.get()); + // Check if already satisfies. + arraySize = (uint32_t)array_.size(); + if (newArraySize(arraySize, rows) > arraySize) { + if (offset_ > 0) { + std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + } array_.clear(); array_ = std::move(newArray); + tmp_ = tmp; } } } +uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) { + auto newSize = oldSize; + auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + while (offset_ + rows > usableCapacity) { + newSize <<= 1; + usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + } + return newSize; +} + int64_t VeloxSortShuffleWriter::peakBytesAllocated() const { return veloxPool_->peakBytes(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 45f54556081d..6d8ec47d7c92 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -85,13 +85,14 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); + uint32_t newArraySize(uint32_t oldSize, uint32_t rows); + using ElementType = std::pair; - using Allocator = facebook::velox::StlAllocator; - using SortArray = std::vector; + using SortArray = std::vector; - std::unique_ptr allocator_; // Stores compact row id -> row SortArray array_; + facebook::velox::BufferPtr tmp_; uint32_t offset_{0}; std::list pages_; @@ -99,6 +100,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { char* currentPage_; uint32_t pageNumber_; uint32_t pageCursor_; + // For debug. + uint32_t currenPageSize_; // FIXME: Use configuration to replace hardcode. uint32_t initialSize_ = 4096; From bd221e8ba809b889849662916b318f89f260ec16 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 13:17:51 +0000 Subject: [PATCH 10/12] use raw buffer for array_ --- cpp/velox/shuffle/RadixSort.h | 18 +++--- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 61 +++++++++++---------- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 19 +++++-- 3 files changed, 55 insertions(+), 43 deletions(-) diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h index 06d7a89a492b..89b4a45cb884 100644 --- a/cpp/velox/shuffle/RadixSort.h +++ b/cpp/velox/shuffle/RadixSort.h @@ -21,7 +21,7 @@ namespace gluten { -template +template class RadixSort { public: /** @@ -39,11 +39,11 @@ class RadixSort { * @return The starting index of the sorted data within the given array. We return this instead * of always copying the data back to position zero for efficiency. */ - static int32_t sort(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + static int32_t sort(Element* array, size_t size, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { assert(startByteIndex >= 0 && "startByteIndex should >= 0"); assert(endByteIndex <= 7 && "endByteIndex should <= 7"); assert(endByteIndex > startByteIndex); - assert(numRecords * 2 <= array.size()); + assert(numRecords * 2 <= size); int64_t inIndex = 0; int64_t outIndex = numRecords; @@ -59,7 +59,7 @@ class RadixSort { } } - return static_cast(inIndex); + return static_cast(inIndex); } private: @@ -75,7 +75,7 @@ class RadixSort { * @param outIndex the starting index where sorted output data should be written. */ static void sortAtByte( - SortArray& array, + Element* array, int64_t numRecords, std::vector& counts, int32_t byteIdx, @@ -86,7 +86,7 @@ class RadixSort { auto offsets = transformCountsToOffsets(counts, outIndex); for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) { - auto bucket = (array[offset].first >> (byteIdx * 8)) & 0xff; + auto bucket = (array[offset].value >> (byteIdx * 8)) & 0xff; array[offsets[bucket]++] = array[offset]; } } @@ -103,7 +103,7 @@ class RadixSort { * significant byte. If the byte does not need sorting the vector entry will be empty. */ static std::vector> - getCounts(SortArray& array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { + getCounts(Element* array, int64_t numRecords, int32_t startByteIndex, int32_t endByteIndex) { std::vector> counts; counts.resize(8); @@ -112,7 +112,7 @@ class RadixSort { int64_t bitwiseMax = 0; int64_t bitwiseMin = -1L; for (auto offset = 0; offset < numRecords; ++offset) { - auto value = array[offset].first; + auto value = array[offset].value; bitwiseMax |= value; bitwiseMin &= value; } @@ -123,7 +123,7 @@ class RadixSort { if (((bitsChanged >> (i * 8)) & 0xff) != 0) { counts[i].resize(256); for (auto offset = 0; offset < numRecords; ++offset) { - counts[i][(array[offset].first >> (i * 8)) & 0xff]++; + counts[i][(array[offset].value >> (i * 8)) & 0xff]++; } } } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index d014ddf6315c..147b13cfd03e 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -80,7 +80,7 @@ arrow::Status VeloxSortShuffleWriter::stop() { if (offset_ > 0) { RETURN_NOT_OK(evictAllPartitions()); } - array_.clear(); + array_ = nullptr; pages_.clear(); pageAddresses_.clear(); RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); @@ -102,8 +102,7 @@ arrow::Status VeloxSortShuffleWriter::init() { ARROW_RETURN_IF( options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); - array_.resize(initialSize_); - tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); + initArray(); return arrow::Status::OK(); } @@ -183,7 +182,9 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u VELOX_CHECK(!pages_.empty()); for (auto i = offset; i < offset + rows; ++i) { auto size = row.serialize(i, currentPage_ + pageCursor_); - array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; + arrayPtr_[offset_].value = toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_); + arrayPtr_[offset_].rowSize = size; + ++offset_; pageCursor_ += size; VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } @@ -204,17 +205,18 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { { ScopedTimer timer(&sortTime_); if (useRadixSort_) { - begin = RadixSort::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); + begin = RadixSort::sort( + arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { - std::sort(array_.begin(), array_.begin() + numRecords); + qsort(arrayPtr_, arraySize_, sizeof(Element), Element::compare); } } auto end = begin + numRecords; auto cur = begin; - auto pid = extractPartitionId(array_[begin].first); + auto pid = extractPartitionId(arrayPtr_[begin].value); while (++cur < end) { - auto curPid = extractPartitionId(array_[cur].first); + auto curPid = extractPartitionId(arrayPtr_[cur].value); if (curPid != pid) { RETURN_NOT_OK(evictPartition(pid, begin, cur)); pid = curPid; @@ -242,12 +244,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { pageNumber_ = 0; pageCursor_ = 0; - // Reset and reallocate array_ to minimal size. + // Reset and reallocate array_ to minimal size. Allocate array_ can trigger spill. offset_ = 0; - array_.clear(); - // Allocate array_ can trigger spill. - array_.resize(initialSize_); - tmp_ = facebook::velox::AlignedBuffer::allocate(initialSize_ * sizeof(ElementType), veloxPool_.get()); + initArray(); } return arrow::Status::OK(); } @@ -265,7 +264,7 @@ std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t b uint32_t numRows = end - begin; uint64_t rawSize = numRows * sizeof(RowSizeType); for (auto i = begin; i < end; ++i) { - rawSize += array_[i].second; + rawSize += arrayPtr_[i].rowSize; } if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) { @@ -276,10 +275,10 @@ std::unique_ptr VeloxSortShuffleWriter::prepareToEvict(size_t b uint64_t offset = 0; for (auto i = begin; i < end; ++i) { // size(RowSize) | bytes - auto size = array_[i].second; + auto size = arrayPtr_[i].rowSize; memcpy(rawBuffer + offset, &size, sizeof(RowSizeType)); offset += sizeof(RowSizeType); - auto index = extractPageNumberAndOffset(array_[i].first); + auto index = extractPageNumberAndOffset(arrayPtr_[i].value); memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size); offset += size; } @@ -325,28 +324,26 @@ void VeloxSortShuffleWriter::acquireNewBuffer(int64_t memLimit, uint64_t minSize } void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { - auto arraySize = (uint32_t)array_.size(); - auto newSize = newArraySize(arraySize, rows); - if (newSize > arraySize) { - SortArray newArray; + auto newSize = newArraySize(rows); + if (newSize > arraySize_) { // May trigger spill. - newArray.resize(newSize); - auto tmp = facebook::velox::AlignedBuffer::allocate(newSize * sizeof(ElementType), veloxPool_.get()); + auto newSizeBytes = newSize * sizeof(Element); + auto newArray = facebook::velox::AlignedBuffer::allocate(newSizeBytes, veloxPool_.get()); // Check if already satisfies. - arraySize = (uint32_t)array_.size(); - if (newArraySize(arraySize, rows) > arraySize) { + if (newArraySize(rows) > arraySize_) { + auto newPtr = newArray->asMutable(); if (offset_ > 0) { - std::copy(array_.begin(), array_.begin() + offset_, newArray.begin()); + gluten::fastCopy(newPtr, arrayPtr_, arraySize_ * sizeof(Element)); } - array_.clear(); + arraySize_ = newSize; + arrayPtr_ = newPtr; array_ = std::move(newArray); - tmp_ = tmp; } } } -uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) { - auto newSize = oldSize; +uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) { + auto newSize = arraySize_; auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; while (offset_ + rows > usableCapacity) { newSize <<= 1; @@ -355,6 +352,12 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t oldSize, uint32_t rows) { return newSize; } +void VeloxSortShuffleWriter::initArray() { + arraySize_ = initialSize_; + array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(Element), veloxPool_.get()); + arrayPtr_ = array_->asMutable(); +} + int64_t VeloxSortShuffleWriter::peakBytesAllocated() const { return veloxPool_->peakBytes(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 6d8ec47d7c92..9c92f26c1924 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -85,14 +85,23 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void growArrayIfNecessary(uint32_t rows); - uint32_t newArraySize(uint32_t oldSize, uint32_t rows); + uint32_t newArraySize(uint32_t rows); - using ElementType = std::pair; - using SortArray = std::vector; + void initArray(); + + struct Element { + uint64_t value; + uint32_t rowSize; + + static int compare(const void* a, const void* b) { + return ((Element*)a)->value - ((Element*)b)->value; + } + }; // Stores compact row id -> row - SortArray array_; - facebook::velox::BufferPtr tmp_; + facebook::velox::BufferPtr array_; + Element* arrayPtr_; + uint32_t arraySize_; uint32_t offset_{0}; std::list pages_; From 8eb89dda6b3e5e49db86bee89ef1012a3b08e2e9 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 26 Jul 2024 02:48:30 +0000 Subject: [PATCH 11/12] use spark conf for radix sort and buffer size --- cpp/core/CMakeLists.txt | 1 - cpp/core/jni/JniWrapper.cc | 6 +++++- cpp/core/shuffle/Options.cc | 18 ------------------ cpp/core/shuffle/Options.h | 7 +++++++ cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 ++++---- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 4 ---- .../VeloxCelebornColumnarShuffleWriter.scala | 3 +++ .../vectorized/ShuffleWriterJniWrapper.java | 10 ++++++++++ .../spark/shuffle/ColumnarShuffleWriter.scala | 4 +++- .../VeloxUniffleColumnarShuffleWriter.java | 2 ++ 10 files changed, 34 insertions(+), 29 deletions(-) delete mode 100644 cpp/core/shuffle/Options.cc diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index caad8db1eb9f..ef21ccbe855a 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -191,7 +191,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/FallbackRangePartitioner.cc shuffle/HashPartitioner.cc shuffle/LocalPartitionWriter.cc - shuffle/Options.cc shuffle/Partitioner.cc shuffle/Partitioning.cc shuffle/Payload.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 3d6da31c7e75..f39f9c92333e 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -755,6 +755,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jint compressionLevel, jint compressionThreshold, jstring compressionModeJstr, + jint sortBufferInitialSize, + jboolean useRadixSort, jstring dataFileJstr, jint numSubDirs, jstring localDirsJstr, @@ -780,7 +782,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe .partitioning = gluten::toPartitioning(jStringToCString(env, partitioningNameJstr)), .taskAttemptId = (int64_t)taskAttemptId, .startPartitionId = startPartitionId, - .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr))}; + .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)), + .sortBufferInitialSize = sortBufferInitialSize, + .useRadixSort = static_cast(useRadixSort)}; // Build PartitionWriterOptions. auto partitionWriterOptions = PartitionWriterOptions{ diff --git a/cpp/core/shuffle/Options.cc b/cpp/core/shuffle/Options.cc deleted file mode 100644 index 8e05a10d6859..000000000000 --- a/cpp/core/shuffle/Options.cc +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "shuffle/Options.h" diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 757950d03443..11fa037eb5a6 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -35,9 +35,12 @@ static constexpr int32_t kDefaultBufferAlignment = 64; static constexpr double kDefaultBufferReallocThreshold = 0.25; static constexpr double kDefaultMergeBufferThreshold = 0.25; static constexpr bool kEnableBufferedWrite = true; +static constexpr bool kDefaultUseRadixSort = true; +static constexpr int32_t kDefaultSortBufferSize = 4096; enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle }; enum PartitionWriterType { kLocal, kRss }; +enum SortAlgorithm { kRadixSort, kQuickSort }; struct ShuffleReaderOptions { arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; @@ -56,6 +59,10 @@ struct ShuffleWriterOptions { int32_t startPartitionId = 0; int64_t threadId = -1; ShuffleWriterType shuffleWriterType = kHashShuffle; + + // Sort shuffle writer. + int32_t sortBufferInitialSize = kDefaultSortBufferSize; + bool useRadixSort = kDefaultUseRadixSort; }; struct PartitionWriterOptions { diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 147b13cfd03e..09b0a783d99a 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -204,7 +204,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { int32_t begin = 0; { ScopedTimer timer(&sortTime_); - if (useRadixSort_) { + if (options_.useRadixSort) { begin = RadixSort::sort( arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { @@ -344,16 +344,16 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) { auto newSize = arraySize_; - auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + auto usableCapacity = options_.useRadixSort ? newSize / 2 : newSize; while (offset_ + rows > usableCapacity) { newSize <<= 1; - usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + usableCapacity = options_.useRadixSort ? newSize / 2 : newSize; } return newSize; } void VeloxSortShuffleWriter::initArray() { - arraySize_ = initialSize_; + arraySize_ = options_.sortBufferInitialSize; array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(Element), veloxPool_.get()); arrayPtr_ = array_->asMutable(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 9c92f26c1924..cbf7f32ad3bb 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -112,10 +112,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // For debug. uint32_t currenPageSize_; - // FIXME: Use configuration to replace hardcode. - uint32_t initialSize_ = 4096; - bool useRadixSort_ = true; - facebook::velox::BufferPtr sortedBuffer_; // Row ID -> Partition ID diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index 4069f1b44324..8f613c72835a 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers} import org.apache.gluten.vectorized._ import org.apache.spark._ +import org.apache.spark.internal.config.{SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle @@ -114,6 +115,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( compressionLevel, bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, + conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, + conf.get(SHUFFLE_SORT_USE_RADIXSORT), clientPushBufferMaxSize, clientPushSortMemoryThreshold, celebornPartitionPusher, diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index 883fc600171f..1d622d491eb5 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -61,6 +61,8 @@ public long make( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, String dataFile, int subDirsPerLocalDir, String localDirs, @@ -80,6 +82,8 @@ public long make( compressionLevel, bufferCompressThreshold, compressionMode, + sortBufferInitialSize, + useRadixSort, dataFile, subDirsPerLocalDir, localDirs, @@ -109,6 +113,8 @@ public long makeForRSS( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, int pushBufferMaxSize, long sortBufferMaxSize, Object pusher, @@ -129,6 +135,8 @@ public long makeForRSS( compressionLevel, bufferCompressThreshold, compressionMode, + sortBufferInitialSize, + useRadixSort, null, 0, null, @@ -154,6 +162,8 @@ public native long nativeMake( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, String dataFile, int subDirsPerLocalDir, String localDirs, diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 5274ec94c8d5..d62ff1d68d6d 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -24,7 +24,7 @@ import org.apache.gluten.vectorized._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.SHUFFLE_COMPRESS +import org.apache.spark.internal.config.{SHUFFLE_COMPRESS, SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.scheduler.MapStatus import org.apache.spark.sql.vectorized.ColumnarBatch @@ -151,6 +151,8 @@ class ColumnarShuffleWriter[K, V]( compressionLevel, bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, + conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, + conf.get(SHUFFLE_SORT_USE_RADIXSORT), dataTmp.getAbsolutePath, blockManager.subDirsPerLocalDir, localDirs, diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 32c672cec6be..b84c9d4ee601 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -148,6 +148,8 @@ protected void writeImpl(Iterator> records) throws IOException { compressionLevel, compressThreshold, GlutenConfig.getConf().columnarShuffleCompressionMode(), + (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), + (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()), bufferSize, bufferSize, partitionPusher, From 42e1c1aa921d7918873d0d08c3969c96219f9435 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 26 Jul 2024 03:54:54 +0000 Subject: [PATCH 12/12] fix qsort --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 +++++++- cpp/velox/shuffle/VeloxSortShuffleWriter.h | 6 ++---- cpp/velox/tests/VeloxShuffleWriterTest.cc | 11 +++++++---- cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 4 +++- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 09b0a783d99a..3f4181286bb7 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -208,7 +208,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { begin = RadixSort::sort( arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { - qsort(arrayPtr_, arraySize_, sizeof(Element), Element::compare); + auto ptr = arrayPtr_; + qsort(ptr, numRecords, sizeof(Element), compare); } } @@ -369,4 +370,9 @@ int64_t VeloxSortShuffleWriter::totalSortTime() const { int64_t VeloxSortShuffleWriter::totalC2RTime() const { return c2rTime_; } + +int VeloxSortShuffleWriter::compare(const void* a, const void* b) { + // No same values. + return ((Element*)a)->value > ((Element*)b)->value ? 1 : -1; +} } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index cbf7f32ad3bb..961700b6da07 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -92,12 +92,10 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { struct Element { uint64_t value; uint32_t rowSize; - - static int compare(const void* a, const void* b) { - return ((Element*)a)->value - ((Element*)b)->value; - } }; + static int compare(const void* a, const void* b); + // Stores compact row id -> row facebook::velox::BufferPtr array_; Element* arrayPtr_; diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 6b86f6a0a15c..af9d5a58db0d 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -70,10 +70,12 @@ std::vector createShuffleTestParams() { std::vector mergeBufferSizes = {0, 3, 4, 10, 4096}; for (const auto& compression : compressions) { + for (auto useRadixSort : {true, false}) { + params.push_back(ShuffleTestParams{ + ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort}); + } params.push_back( - ShuffleTestParams{ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0}); - params.push_back( - ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0}); + ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0, false}); for (const auto compressionThreshold : compressionThresholds) { for (const auto mergeBufferSize : mergeBufferSizes) { params.push_back(ShuffleTestParams{ @@ -81,7 +83,8 @@ std::vector createShuffleTestParams() { PartitionWriterType::kLocal, compression, compressionThreshold, - mergeBufferSize}); + mergeBufferSize, + false /* unused */}); } params.push_back(ShuffleTestParams{ ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0}); diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index f9c2b1d07339..d32e3272186b 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -68,12 +68,13 @@ struct ShuffleTestParams { arrow::Compression::type compressionType; int32_t compressionThreshold; int32_t mergeBufferSize; + bool useRadixSort; std::string toString() const { std::ostringstream out; out << "shuffleWriterType = " << shuffleWriterType << ", partitionWriterType = " << partitionWriterType << ", compressionType = " << compressionType << ", compressionThreshold = " << compressionThreshold - << ", mergeBufferSize = " << mergeBufferSize; + << ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " << (useRadixSort ? "true" : "false"); return out.str(); } }; @@ -250,6 +251,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam