Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3598][CH] Support to config the hash algorithm for the ch shuffle hash partitioner #3604

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public BlockSplitIterator(Iterator<Long> in, IteratorOptions options) {
options.getExpr(),
options.getRequiredFields(),
options.getPartitionNum(),
options.getBufferSize());
options.getBufferSize(),
options.getHashAlgorithm());
}

private native long nativeCreate(
Expand All @@ -42,7 +43,8 @@ private native long nativeCreate(
String expr,
String schema,
int partitionNum,
int bufferSize);
int bufferSize,
String hashAlgorithm);

private native void nativeClose(long instance);

Expand Down Expand Up @@ -80,6 +82,8 @@ public static class IteratorOptions implements Serializable {
private String expr;
private String requiredFields;

private String hashAlgorithm;

public int getPartitionNum() {
return partitionNum;
}
Expand Down Expand Up @@ -119,5 +123,13 @@ public String getRequiredFields() {
public void setRequiredFields(String requiredFields) {
this.requiredFields = requiredFields;
}

public String getHashAlgorithm() {
return hashAlgorithm;
}

public void setHashAlgorithm(String hashAlgorithm) {
this.hashAlgorithm = hashAlgorithm;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public long make(
String localDirs,
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold) {
long spillThreshold,
String hashAlgorithm) {
return nativeMake(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -45,7 +46,8 @@ public long make(
localDirs,
subDirsPerLocalDir,
preferSpill,
spillThreshold);
spillThreshold,
hashAlgorithm);
}

public long makeForRSS(
Expand All @@ -55,6 +57,7 @@ public long makeForRSS(
int bufferSize,
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher) {
return nativeMakeForRSS(
part.getShortName(),
Expand All @@ -66,6 +69,7 @@ public long makeForRSS(
bufferSize,
codec,
spillThreshold,
hashAlgorithm,
pusher);
}

Expand All @@ -82,7 +86,8 @@ public native long nativeMake(
String localDirs,
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold);
long spillThreshold,
String hashAlgorithm);

public native long nativeMakeForRSS(
String shortName,
Expand All @@ -94,6 +99,7 @@ public native long nativeMakeForRSS(
int bufferSize,
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher);

public native void split(long splitterId, long block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ object CHBackendSettings extends BackendSettingsApi with Logging {

private val GLUTEN_CLICKHOUSE_SHUFFLE_SUPPORTED_CODEC: Set[String] = Set("lz4", "zstd", "snappy")

// The algorithm for hash partition of the shuffle
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".shuffle.hash.algorithm"
// valid values are: cityHash64 or murmurHash3_32
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "cityHash64"
lazy val shuffleHashAlgorithm: String = {
val algorithm = SparkEnv.get.conf.get(
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM,
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT
)
if (!algorithm.equals("cityHash64") && !algorithm.equals("murmurHash3_32")) {
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT
} else {
algorithm
}
}

override def supportFileFormatRead(
format: ReadFileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.shuffle

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.memory.alloc.CHNativeMemoryAllocators
import io.glutenproject.memory.memtarget.{MemoryTarget, Spiller}
import io.glutenproject.vectorized._
Expand Down Expand Up @@ -100,7 +101,8 @@ class CHColumnarShuffleWriter[K, V](
localDirs,
subDirsPerLocalDir,
preferSpill,
spillThreshold
spillThreshold,
CHBackendSettings.shuffleHashAlgorithm
)
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.utils

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.expression.ConverterUtils
import io.glutenproject.row.SparkRowInfo
import io.glutenproject.vectorized._
Expand Down Expand Up @@ -210,6 +211,7 @@ object CHExecUtil extends Logging {
options.setName(nativePartitioning.getShortName)
options.setPartitionNum(nativePartitioning.getNumPartitions)
options.setExpr(new String(nativePartitioning.getExprList))
options.setHashAlgorithm(CHBackendSettings.shuffleHashAlgorithm)
options.setRequiredFields(if (nativePartitioning.getRequiredFields != null) {
new String(nativePartitioning.getRequiredFields)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class GlutenClickHouseTPCHParquetAQESuite
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions
{
hash_fields.push_back(std::stoi(expr));
}
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}
else if (short_name == "single")
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}

void HashNativeSplitter::computePartitionId(Block & block)
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Shuffle/NativeSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class NativeSplitter : BlockIterator
size_t partition_nums;
std::string exprs_buffer;
std::string schema_buffer;
std::string hash_algorithm;
};

struct Holder
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(op
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}
std::unique_ptr<ShuffleSplitter> HashSplitter::create(SplitOptions && options_)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct SplitOptions
std::string compress_method = "zstd";
int compress_level;
size_t spill_threshold = 300 * 1024 * 1024;

std::string hash_algorithm;
};

class ColumnsBuffer
Expand Down
14 changes: 10 additions & 4 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
jstring local_dirs,
jint num_sub_dirs,
jboolean prefer_spill,
jlong spill_threshold)
jlong spill_threshold,
jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
Expand Down Expand Up @@ -694,7 +695,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold)};
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
if (prefer_spill)
Expand All @@ -721,6 +723,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
jint split_size,
jstring codec,
jlong spill_threshold,
jstring hash_algorithm,
jobject pusher)
{
LOCAL_ENGINE_JNI_METHOD_START
Expand Down Expand Up @@ -753,7 +756,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold)};
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)};
Expand Down Expand Up @@ -1060,12 +1064,14 @@ Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JN

// BlockSplitIterator
JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate(
JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size)
JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size, jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Options options;
options.partition_nums = partition_num;
options.buffer_size = buffer_size;
auto hash_algorithm_str = jstring2string(env, hash_algorithm);
options.hash_algorithm.swap(hash_algorithm_str);
auto expr_str = jstring2string(env, expr);
std::string schema_str;
if (schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.glutenproject.memory.alloc.CHNativeMemoryAllocators
import io.glutenproject.memory.memtarget.MemoryTarget
import io.glutenproject.memory.memtarget.Spiller
import io.glutenproject.vectorized._
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings

import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
Expand Down Expand Up @@ -68,6 +69,7 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
nativeBufferSize,
customizedCompressCodec,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher
)
CHNativeMemoryAllocators.createSpillable(
Expand Down
Loading