Skip to content

Commit

Permalink
[GLUTEN-3598][CH] Support to config the hash algorithm for the ch shu…
Browse files Browse the repository at this point in the history
…ffle hash partitioner

Now the hash algorithm of the ch shuffle hash partitioner is cityHash64, which is different from vanilla spark, when there is one side shuffle of the join fallbacking, the hash id are different between the ch and vanilla spark, so add a configuration to control the hash algorithm for the ch shuffle hash partitioner.

Close #3598.
  • Loading branch information
zzcclp committed Nov 2, 2023
1 parent 805d909 commit 1d1dc58
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public BlockSplitIterator(Iterator<Long> in, IteratorOptions options) {
options.getExpr(),
options.getRequiredFields(),
options.getPartitionNum(),
options.getBufferSize());
options.getBufferSize(),
options.getHashAlgorithm());
}

private native long nativeCreate(
Expand All @@ -42,7 +43,8 @@ private native long nativeCreate(
String expr,
String schema,
int partitionNum,
int bufferSize);
int bufferSize,
String hashAlgorithm);

private native void nativeClose(long instance);

Expand Down Expand Up @@ -80,6 +82,8 @@ public static class IteratorOptions implements Serializable {
private String expr;
private String requiredFields;

private String hashAlgorithm;

public int getPartitionNum() {
return partitionNum;
}
Expand Down Expand Up @@ -119,5 +123,13 @@ public String getRequiredFields() {
public void setRequiredFields(String requiredFields) {
this.requiredFields = requiredFields;
}

public String getHashAlgorithm() {
return hashAlgorithm;
}

public void setHashAlgorithm(String hashAlgorithm) {
this.hashAlgorithm = hashAlgorithm;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public long make(
String localDirs,
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold) {
long spillThreshold,
String hashAlgorithm) {
return nativeMake(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -45,7 +46,8 @@ public long make(
localDirs,
subDirsPerLocalDir,
preferSpill,
spillThreshold);
spillThreshold,
hashAlgorithm);
}

public long makeForRSS(
Expand All @@ -55,6 +57,7 @@ public long makeForRSS(
int bufferSize,
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher) {
return nativeMakeForRSS(
part.getShortName(),
Expand All @@ -66,6 +69,7 @@ public long makeForRSS(
bufferSize,
codec,
spillThreshold,
hashAlgorithm,
pusher);
}

Expand All @@ -82,7 +86,8 @@ public native long nativeMake(
String localDirs,
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold);
long spillThreshold,
String hashAlgorithm);

public native long nativeMakeForRSS(
String shortName,
Expand All @@ -94,6 +99,7 @@ public native long nativeMakeForRSS(
int bufferSize,
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher);

public native void split(long splitterId, long block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ object CHBackendSettings extends BackendSettingsApi with Logging {

private val GLUTEN_CLICKHOUSE_SHUFFLE_SUPPORTED_CODEC: Set[String] = Set("lz4", "zstd", "snappy")

// The algorithm for hash partition of the shuffle
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".shuffle.hash.algorithm"
// valid values are: cityHash64 or murmurHash3_32
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "cityHash64"
lazy val shuffleHashAlgorithm: String = {
val algorithm = SparkEnv.get.conf.get(
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM,
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT
)
if (!algorithm.equals("cityHash64") && !algorithm.equals("murmurHash3_32")) {
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT
} else {
algorithm
}
}

override def supportFileFormatRead(
format: ReadFileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.shuffle

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.memory.alloc.CHNativeMemoryAllocators
import io.glutenproject.memory.memtarget.{MemoryTarget, Spiller}
import io.glutenproject.vectorized._
Expand Down Expand Up @@ -100,7 +101,8 @@ class CHColumnarShuffleWriter[K, V](
localDirs,
subDirsPerLocalDir,
preferSpill,
spillThreshold
spillThreshold,
CHBackendSettings.shuffleHashAlgorithm
)
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.utils

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.expression.ConverterUtils
import io.glutenproject.row.SparkRowInfo
import io.glutenproject.vectorized._
Expand Down Expand Up @@ -210,6 +211,7 @@ object CHExecUtil extends Logging {
options.setName(nativePartitioning.getShortName)
options.setPartitionNum(nativePartitioning.getNumPartitions)
options.setExpr(new String(nativePartitioning.getExprList))
options.setHashAlgorithm(CHBackendSettings.shuffleHashAlgorithm)
options.setRequiredFields(if (nativePartitioning.getRequiredFields != null) {
new String(nativePartitioning.getRequiredFields)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class GlutenClickHouseTPCHParquetAQESuite
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions
{
hash_fields.push_back(std::stoi(expr));
}
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}
else if (short_name == "single")
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}

void HashNativeSplitter::computePartitionId(Block & block)
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Shuffle/NativeSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class NativeSplitter : BlockIterator
size_t partition_nums;
std::string exprs_buffer;
std::string schema_buffer;
std::string hash_algorithm;
};

struct Holder
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(op
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}
std::unique_ptr<ShuffleSplitter> HashSplitter::create(SplitOptions && options_)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct SplitOptions
std::string compress_method = "zstd";
int compress_level;
size_t spill_threshold = 300 * 1024 * 1024;

std::string hash_algorithm;
};

class ColumnsBuffer
Expand Down
14 changes: 10 additions & 4 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
jstring local_dirs,
jint num_sub_dirs,
jboolean prefer_spill,
jlong spill_threshold)
jlong spill_threshold,
jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
Expand Down Expand Up @@ -694,7 +695,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold)};
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
if (prefer_spill)
Expand All @@ -721,6 +723,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
jint split_size,
jstring codec,
jlong spill_threshold,
jstring hash_algorithm,
jobject pusher)
{
LOCAL_ENGINE_JNI_METHOD_START
Expand Down Expand Up @@ -753,7 +756,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold)};
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)};
Expand Down Expand Up @@ -1060,12 +1064,14 @@ Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JN

// BlockSplitIterator
JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate(
JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size)
JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size, jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Options options;
options.partition_nums = partition_num;
options.buffer_size = buffer_size;
auto hash_algorithm_str = jstring2string(env, hash_algorithm);
options.hash_algorithm.swap(hash_algorithm_str);
auto expr_str = jstring2string(env, expr);
std::string schema_str;
if (schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.glutenproject.memory.alloc.CHNativeMemoryAllocators
import io.glutenproject.memory.memtarget.MemoryTarget
import io.glutenproject.memory.memtarget.Spiller
import io.glutenproject.vectorized._
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings

import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
Expand Down Expand Up @@ -68,6 +69,7 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
nativeBufferSize,
customizedCompressCodec,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher
)
CHNativeMemoryAllocators.createSpillable(
Expand Down

0 comments on commit 1d1dc58

Please sign in to comment.