diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index caad8db1eb9f..ef21ccbe855a 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -191,7 +191,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/FallbackRangePartitioner.cc shuffle/HashPartitioner.cc shuffle/LocalPartitionWriter.cc - shuffle/Options.cc shuffle/Partitioner.cc shuffle/Partitioning.cc shuffle/Payload.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 3d6da31c7e75..f39f9c92333e 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -755,6 +755,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jint compressionLevel, jint compressionThreshold, jstring compressionModeJstr, + jint sortBufferInitialSize, + jboolean useRadixSort, jstring dataFileJstr, jint numSubDirs, jstring localDirsJstr, @@ -780,7 +782,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe .partitioning = gluten::toPartitioning(jStringToCString(env, partitioningNameJstr)), .taskAttemptId = (int64_t)taskAttemptId, .startPartitionId = startPartitionId, - .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr))}; + .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)), + .sortBufferInitialSize = sortBufferInitialSize, + .useRadixSort = static_cast(useRadixSort)}; // Build PartitionWriterOptions. auto partitionWriterOptions = PartitionWriterOptions{ diff --git a/cpp/core/shuffle/Options.cc b/cpp/core/shuffle/Options.cc deleted file mode 100644 index 8e05a10d6859..000000000000 --- a/cpp/core/shuffle/Options.cc +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "shuffle/Options.h" diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 757950d03443..11fa037eb5a6 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -35,9 +35,12 @@ static constexpr int32_t kDefaultBufferAlignment = 64; static constexpr double kDefaultBufferReallocThreshold = 0.25; static constexpr double kDefaultMergeBufferThreshold = 0.25; static constexpr bool kEnableBufferedWrite = true; +static constexpr bool kDefaultUseRadixSort = true; +static constexpr int32_t kDefaultSortBufferSize = 4096; enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle }; enum PartitionWriterType { kLocal, kRss }; +enum SortAlgorithm { kRadixSort, kQuickSort }; struct ShuffleReaderOptions { arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; @@ -56,6 +59,10 @@ struct ShuffleWriterOptions { int32_t startPartitionId = 0; int64_t threadId = -1; ShuffleWriterType shuffleWriterType = kHashShuffle; + + // Sort shuffle writer. + int32_t sortBufferInitialSize = kDefaultSortBufferSize; + bool useRadixSort = kDefaultUseRadixSort; }; struct PartitionWriterOptions { diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 147b13cfd03e..09b0a783d99a 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -204,7 +204,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { int32_t begin = 0; { ScopedTimer timer(&sortTime_); - if (useRadixSort_) { + if (options_.useRadixSort) { begin = RadixSort::sort( arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex); } else { @@ -344,16 +344,16 @@ void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) { uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) { auto newSize = arraySize_; - auto usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + auto usableCapacity = options_.useRadixSort ? newSize / 2 : newSize; while (offset_ + rows > usableCapacity) { newSize <<= 1; - usableCapacity = useRadixSort_ ? newSize / 2 : newSize; + usableCapacity = options_.useRadixSort ? newSize / 2 : newSize; } return newSize; } void VeloxSortShuffleWriter::initArray() { - arraySize_ = initialSize_; + arraySize_ = options_.sortBufferInitialSize; array_ = facebook::velox::AlignedBuffer::allocate(arraySize_ * sizeof(Element), veloxPool_.get()); arrayPtr_ = array_->asMutable(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 9c92f26c1924..cbf7f32ad3bb 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -112,10 +112,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // For debug. uint32_t currenPageSize_; - // FIXME: Use configuration to replace hardcode. - uint32_t initialSize_ = 4096; - bool useRadixSort_ = true; - facebook::velox::BufferPtr sortedBuffer_; // Row ID -> Partition ID diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index 4069f1b44324..8f613c72835a 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers} import org.apache.gluten.vectorized._ import org.apache.spark._ +import org.apache.spark.internal.config.{SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle @@ -114,6 +115,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( compressionLevel, bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, + conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, + conf.get(SHUFFLE_SORT_USE_RADIXSORT), clientPushBufferMaxSize, clientPushSortMemoryThreshold, celebornPartitionPusher, diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index 883fc600171f..1d622d491eb5 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -61,6 +61,8 @@ public long make( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, String dataFile, int subDirsPerLocalDir, String localDirs, @@ -80,6 +82,8 @@ public long make( compressionLevel, bufferCompressThreshold, compressionMode, + sortBufferInitialSize, + useRadixSort, dataFile, subDirsPerLocalDir, localDirs, @@ -109,6 +113,8 @@ public long makeForRSS( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, int pushBufferMaxSize, long sortBufferMaxSize, Object pusher, @@ -129,6 +135,8 @@ public long makeForRSS( compressionLevel, bufferCompressThreshold, compressionMode, + sortBufferInitialSize, + useRadixSort, null, 0, null, @@ -154,6 +162,8 @@ public native long nativeMake( int compressionLevel, int bufferCompressThreshold, String compressionMode, + int sortBufferInitialSize, + boolean useRadixSort, String dataFile, int subDirsPerLocalDir, String localDirs, diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 5274ec94c8d5..d62ff1d68d6d 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -24,7 +24,7 @@ import org.apache.gluten.vectorized._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.SHUFFLE_COMPRESS +import org.apache.spark.internal.config.{SHUFFLE_COMPRESS, SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT} import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.scheduler.MapStatus import org.apache.spark.sql.vectorized.ColumnarBatch @@ -151,6 +151,8 @@ class ColumnarShuffleWriter[K, V]( compressionLevel, bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, + conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, + conf.get(SHUFFLE_SORT_USE_RADIXSORT), dataTmp.getAbsolutePath, blockManager.subDirsPerLocalDir, localDirs, diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 32c672cec6be..b84c9d4ee601 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -148,6 +148,8 @@ protected void writeImpl(Iterator> records) throws IOException { compressionLevel, compressThreshold, GlutenConfig.getConf().columnarShuffleCompressionMode(), + (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), + (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()), bufferSize, bufferSize, partitionPusher,