diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java index facb9f5eed20..3c0d919f6f5e 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java @@ -33,7 +33,8 @@ public BlockSplitIterator(Iterator in, IteratorOptions options) { options.getExpr(), options.getRequiredFields(), options.getPartitionNum(), - options.getBufferSize()); + options.getBufferSize(), + options.getHashAlgorithm()); } private native long nativeCreate( @@ -42,7 +43,8 @@ private native long nativeCreate( String expr, String schema, int partitionNum, - int bufferSize); + int bufferSize, + String hashAlgorithm); private native void nativeClose(long instance); @@ -80,6 +82,8 @@ public static class IteratorOptions implements Serializable { private String expr; private String requiredFields; + private String hashAlgorithm; + public int getPartitionNum() { return partitionNum; } @@ -119,5 +123,13 @@ public String getRequiredFields() { public void setRequiredFields(String requiredFields) { this.requiredFields = requiredFields; } + + public String getHashAlgorithm() { + return hashAlgorithm; + } + + public void setHashAlgorithm(String hashAlgorithm) { + this.hashAlgorithm = hashAlgorithm; + } } } diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java index bd40b8fef5fc..ec773343a09c 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java @@ -31,7 +31,8 @@ public long make( String localDirs, int subDirsPerLocalDir, boolean preferSpill, - long spillThreshold) { + long spillThreshold, + String hashAlgorithm) { return nativeMake( part.getShortName(), part.getNumPartitions(), @@ -45,7 +46,8 @@ public long make( localDirs, subDirsPerLocalDir, preferSpill, - spillThreshold); + spillThreshold, + hashAlgorithm); } public long makeForRSS( @@ -55,6 +57,7 @@ public long makeForRSS( int bufferSize, String codec, long spillThreshold, + String hashAlgorithm, Object pusher) { return nativeMakeForRSS( part.getShortName(), @@ -66,6 +69,7 @@ public long makeForRSS( bufferSize, codec, spillThreshold, + hashAlgorithm, pusher); } @@ -82,7 +86,8 @@ public native long nativeMake( String localDirs, int subDirsPerLocalDir, boolean preferSpill, - long spillThreshold); + long spillThreshold, + String hashAlgorithm); public native long nativeMakeForRSS( String shortName, @@ -94,6 +99,7 @@ public native long nativeMakeForRSS( int bufferSize, String codec, long spillThreshold, + String hashAlgorithm, Object pusher); public native void split(long splitterId, long block); diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index 62f784828444..5765e0293313 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -90,6 +90,24 @@ object CHBackendSettings extends BackendSettingsApi with Logging { private val GLUTEN_CLICKHOUSE_SHUFFLE_SUPPORTED_CODEC: Set[String] = Set("lz4", "zstd", "snappy") + // The algorithm for hash partition of the shuffle + private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String = + GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + + ".shuffle.hash.algorithm" + // valid values are: cityHash64 or murmurHash3_32 + private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "cityHash64" + lazy val shuffleHashAlgorithm: String = { + val algorithm = SparkEnv.get.conf.get( + CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM, + CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT + ) + if (!algorithm.equals("cityHash64") && !algorithm.equals("murmurHash3_32")) { + CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT + } else { + algorithm + } + } + override def supportFileFormatRead( format: ReadFileFormat, fields: Array[StructField], diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index 0248dec768f4..8af1a46875ec 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle import io.glutenproject.GlutenConfig +import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import io.glutenproject.memory.alloc.CHNativeMemoryAllocators import io.glutenproject.memory.memtarget.{MemoryTarget, Spiller} import io.glutenproject.vectorized._ @@ -100,7 +101,8 @@ class CHColumnarShuffleWriter[K, V]( localDirs, subDirsPerLocalDir, preferSpill, - spillThreshold + spillThreshold, + CHBackendSettings.shuffleHashAlgorithm ) CHNativeMemoryAllocators.createSpillable( "ShuffleWriter", diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 0544ad4b90b7..3144522c34e5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.utils import io.glutenproject.GlutenConfig +import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import io.glutenproject.expression.ConverterUtils import io.glutenproject.row.SparkRowInfo import io.glutenproject.vectorized._ @@ -210,6 +211,7 @@ object CHExecUtil extends Logging { options.setName(nativePartitioning.getShortName) options.setPartitionNum(nativePartitioning.getNumPartitions) options.setExpr(new String(nativePartitioning.getExprList)) + options.setHashAlgorithm(CHBackendSettings.shuffleHashAlgorithm) options.setRequiredFields(if (nativePartitioning.getRequiredFields != null) { new String(nativePartitioning.getRequiredFields) } else { diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 9b7cc2518d28..6d304ee0ba79 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -43,6 +43,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.gluten.sql.columnar.backend.ch.use.v2", "false") .set("spark.sql.adaptive.enabled", "true") + .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32") } override protected def createTPCHNotNullTables(): Unit = { diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala index 292a78310470..6f70ac2c3065 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala @@ -46,6 +46,7 @@ class GlutenClickHouseTPCHParquetAQESuite .set("spark.gluten.sql.columnar.backend.ch.use.v2", "false") .set("spark.sql.adaptive.enabled", "true") .set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") + .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32") } override protected def createTPCHNotNullTables(): Unit = { diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 4f3ae465078e..676231e048b9 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -50,7 +50,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions { hash_fields.push_back(std::stoi(expr)); } - partitioner = std::make_unique(options.partition_nums, hash_fields, "cityHash64"); + partitioner = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); } else if (short_name == "single") { diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp index 4b8b56f99c57..eec4e05fffb9 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp @@ -209,7 +209,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.partition_nums, hash_fields, "cityHash64"); + selector_builder = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); } void HashNativeSplitter::computePartitionId(Block & block) diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.h b/cpp-ch/local-engine/Shuffle/NativeSplitter.h index c883da4bfb4a..c30f235b6550 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.h +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.h @@ -41,6 +41,7 @@ class NativeSplitter : BlockIterator size_t partition_nums; std::string exprs_buffer; std::string schema_buffer; + std::string hash_algorithm; }; struct Holder diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp index 2f7d14410284..266c343a59e7 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp @@ -383,7 +383,7 @@ HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(op output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.partition_nums, hash_fields, "cityHash64"); + selector_builder = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); } std::unique_ptr HashSplitter::create(SplitOptions && options_) { diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index 49fa967fc03e..e9a59c339cae 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -47,7 +47,7 @@ struct SplitOptions std::string compress_method = "zstd"; int compress_level; size_t spill_threshold = 300 * 1024 * 1024; - + std::string hash_algorithm; }; class ColumnsBuffer diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 5d6add257854..6813a5c9335f 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -653,7 +653,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jstring local_dirs, jint num_sub_dirs, jboolean prefer_spill, - jlong spill_threshold) + jlong spill_threshold, + jstring hash_algorithm) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; @@ -694,7 +695,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), - .spill_threshold = static_cast(spill_threshold)}; + .spill_threshold = static_cast(spill_threshold), + .hash_algorithm = jstring2string(env, hash_algorithm)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; if (prefer_spill) @@ -721,6 +723,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jint split_size, jstring codec, jlong spill_threshold, + jstring hash_algorithm, jobject pusher) { LOCAL_ENGINE_JNI_METHOD_START @@ -753,7 +756,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), - .spill_threshold = static_cast(spill_threshold)}; + .spill_threshold = static_cast(spill_threshold), + .hash_algorithm = jstring2string(env, hash_algorithm)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; splitter = new local_engine::SplitterHolder{.splitter = std::make_unique(name, options, pusher)}; @@ -1060,12 +1064,14 @@ Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JN // BlockSplitIterator JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( - JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size) + JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size, jstring hash_algorithm) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Options options; options.partition_nums = partition_num; options.buffer_size = buffer_size; + auto hash_algorithm_str = jstring2string(env, hash_algorithm); + options.hash_algorithm.swap(hash_algorithm_str); auto expr_str = jstring2string(env, expr); std::string schema_str; if (schema) diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala index 3cb52dcb4104..8d22624919c2 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala @@ -21,6 +21,7 @@ import io.glutenproject.memory.alloc.CHNativeMemoryAllocators import io.glutenproject.memory.memtarget.MemoryTarget import io.glutenproject.memory.memtarget.Spiller import io.glutenproject.vectorized._ +import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import org.apache.spark._ import org.apache.spark.scheduler.MapStatus @@ -68,6 +69,7 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V]( nativeBufferSize, customizedCompressCodec, GlutenConfig.getConf.chColumnarShuffleSpillThreshold, + CHBackendSettings.shuffleHashAlgorithm, celebornPartitionPusher ) CHNativeMemoryAllocators.createSpillable(