Skip to content

Commit

Permalink
Merge branch 'oap-project:main' into gayangya/disable_columnar_table_…
Browse files Browse the repository at this point in the history
…cache_default
  • Loading branch information
gaoyangxiaozhu authored Nov 3, 2023
2 parents 1debec0 + 07ba657 commit a971e69
Show file tree
Hide file tree
Showing 39 changed files with 178 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public BlockSplitIterator(Iterator<Long> in, IteratorOptions options) {
options.getExpr(),
options.getRequiredFields(),
options.getPartitionNum(),
options.getBufferSize());
options.getBufferSize(),
options.getHashAlgorithm());
}

private native long nativeCreate(
Expand All @@ -42,7 +43,8 @@ private native long nativeCreate(
String expr,
String schema,
int partitionNum,
int bufferSize);
int bufferSize,
String hashAlgorithm);

private native void nativeClose(long instance);

Expand Down Expand Up @@ -80,6 +82,8 @@ public static class IteratorOptions implements Serializable {
private String expr;
private String requiredFields;

private String hashAlgorithm;

public int getPartitionNum() {
return partitionNum;
}
Expand Down Expand Up @@ -119,5 +123,13 @@ public String getRequiredFields() {
public void setRequiredFields(String requiredFields) {
this.requiredFields = requiredFields;
}

public String getHashAlgorithm() {
return hashAlgorithm;
}

public void setHashAlgorithm(String hashAlgorithm) {
this.hashAlgorithm = hashAlgorithm;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.glutenproject.substrait.plan.PlanNode;

import com.google.protobuf.Any;
import io.substrait.proto.Plan;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

Expand Down Expand Up @@ -80,12 +79,12 @@ private PlanNode buildNativeConfNode(Map<String, String> confs) {
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public GeneralOutIterator createKernelWithBatchIterator(
Plan wsPlan, List<GeneralInIterator> iterList, boolean materializeInput) {
byte[] wsPlan, List<GeneralInIterator> iterList, boolean materializeInput) {
long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
allocId,
getPlanBytesBuf(wsPlan),
wsPlan,
iterList.toArray(new GeneralInIterator[0]),
buildNativeConfNode(
GlutenConfig.getNativeBackendConf(
Expand Down Expand Up @@ -115,10 +114,6 @@ public GeneralOutIterator createKernelWithBatchIterator(
return createOutIterator(handle);
}

private byte[] getPlanBytesBuf(Plan planNode) {
return planNode.toByteArray();
}

private GeneralOutIterator createOutIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public long make(
String localDirs,
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold) {
long spillThreshold,
String hashAlgorithm) {
return nativeMake(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -45,7 +46,8 @@ public long make(
localDirs,
subDirsPerLocalDir,
preferSpill,
spillThreshold);
spillThreshold,
hashAlgorithm);
}

public long makeForRSS(
Expand All @@ -55,6 +57,7 @@ public long makeForRSS(
int bufferSize,
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher) {
return nativeMakeForRSS(
part.getShortName(),
Expand All @@ -66,6 +69,7 @@ public long makeForRSS(
bufferSize,
codec,
spillThreshold,
hashAlgorithm,
pusher);
}

Expand All @@ -82,7 +86,8 @@ public native long nativeMake(
String localDirs,
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold);
long spillThreshold,
String hashAlgorithm);

public native long nativeMakeForRSS(
String shortName,
Expand All @@ -94,6 +99,7 @@ public native long nativeMakeForRSS(
int bufferSize,
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher);

public native void split(long splitterId, long block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ object CHBackendSettings extends BackendSettingsApi with Logging {

private val GLUTEN_CLICKHOUSE_SHUFFLE_SUPPORTED_CODEC: Set[String] = Set("lz4", "zstd", "snappy")

// The algorithm for hash partition of the shuffle
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".shuffle.hash.algorithm"
// valid values are: cityHash64 or murmurHash3_32
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "cityHash64"
lazy val shuffleHashAlgorithm: String = {
val algorithm = SparkEnv.get.conf.get(
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM,
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT
)
if (!algorithm.equals("cityHash64") && !algorithm.equals("murmurHash3_32")) {
CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT
} else {
algorithm
}
}

override def supportFileFormatRead(
format: ReadFileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
fileFormats(i)),
SoftAffinityUtil.getFilePartitionLocations(f))
case _ =>
throw new UnsupportedOperationException(s"Unsupport operators.")
throw new UnsupportedOperationException(s"Unsupported input partition.")
})
wsCxt.substraitContext.initLocalFilesNodesIndex(0)
wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1))
val substraitPlan = wsCxt.root.toProtobuf
if (index < 3) {
if (index == 0) {
logOnLevel(
GlutenConfig.getConf.substraitPlanLogLevel,
s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil
.substraitPlanToJson(substraitPlan)}"
)
}
GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2)
GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2)
}

/**
Expand Down Expand Up @@ -185,7 +185,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}.asJava)
// we need to complete dependency RDD's firstly
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf,
rootNode.toProtobuf.toByteArray,
columnarNativeIterator,
materializeInput)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.shuffle

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.memory.alloc.CHNativeMemoryAllocators
import io.glutenproject.memory.memtarget.{MemoryTarget, Spiller}
import io.glutenproject.vectorized._
Expand Down Expand Up @@ -100,7 +101,8 @@ class CHColumnarShuffleWriter[K, V](
localDirs,
subDirsPerLocalDir,
preferSpill,
spillThreshold
spillThreshold,
CHBackendSettings.shuffleHashAlgorithm
)
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.utils

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.expression.ConverterUtils
import io.glutenproject.row.SparkRowInfo
import io.glutenproject.vectorized._
Expand Down Expand Up @@ -210,6 +211,7 @@ object CHExecUtil extends Logging {
options.setName(nativePartitioning.getShortName)
options.setPartitionNum(nativePartitioning.getNumPartitions)
options.setExpr(new String(nativePartitioning.getExprList))
options.setHashAlgorithm(CHBackendSettings.shuffleHashAlgorithm)
options.setRequiredFields(if (nativePartitioning.getRequiredFields != null) {
new String(nativePartitioning.getRequiredFields)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class GlutenClickHouseTPCHParquetAQESuite
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
wsCxt.substraitContext.initLocalFilesNodesIndex(0)
wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1))
val substraitPlan = wsCxt.root.toProtobuf
GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2)
GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2)
}

/**
Expand Down Expand Up @@ -187,7 +187,9 @@ class IteratorApiImpl extends IteratorApi with Logging {
iter => new ColumnarBatchInIterator(iter.asJava)
}.asJava)
val nativeResultIterator =
transKernel.createKernelWithBatchIterator(rootNode.toProtobuf, columnarNativeIterator)
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
columnarNativeIterator)

pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild)

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions
{
hash_fields.push_back(std::stoi(expr));
}
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}
else if (short_name == "single")
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}

void HashNativeSplitter::computePartitionId(Block & block)
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Shuffle/NativeSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class NativeSplitter : BlockIterator
size_t partition_nums;
std::string exprs_buffer;
std::string schema_buffer;
std::string hash_algorithm;
};

struct Holder
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(op
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
}
std::unique_ptr<ShuffleSplitter> HashSplitter::create(SplitOptions && options_)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct SplitOptions
std::string compress_method = "zstd";
int compress_level;
size_t spill_threshold = 300 * 1024 * 1024;

std::string hash_algorithm;
};

class ColumnsBuffer
Expand Down
14 changes: 10 additions & 4 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
jstring local_dirs,
jint num_sub_dirs,
jboolean prefer_spill,
jlong spill_threshold)
jlong spill_threshold,
jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
Expand Down Expand Up @@ -694,7 +695,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold)};
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
if (prefer_spill)
Expand All @@ -721,6 +723,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
jint split_size,
jstring codec,
jlong spill_threshold,
jstring hash_algorithm,
jobject pusher)
{
LOCAL_ENGINE_JNI_METHOD_START
Expand Down Expand Up @@ -753,7 +756,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold)};
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)};
Expand Down Expand Up @@ -1060,12 +1064,14 @@ Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JN

// BlockSplitIterator
JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate(
JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size)
JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size, jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Options options;
options.partition_nums = partition_num;
options.buffer_size = buffer_size;
auto hash_algorithm_str = jstring2string(env, hash_algorithm);
options.hash_algorithm.swap(hash_algorithm_str);
auto expr_str = jstring2string(env, expr);
std::string schema_str;
if (schema)
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,9 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
env->ReleaseStringUTFChars(dataFileJstr, dataFileC);

auto localDirs = env->GetStringUTFChars(localDirsJstr, JNI_FALSE);
setenv(gluten::kGlutenSparkLocalDirs.c_str(), localDirs, 1);
shuffleWriterOptions.local_dirs = std::string(localDirs);
env->ReleaseStringUTFChars(localDirsJstr, localDirs);

partitionWriterCreator = std::make_shared<LocalPartitionWriterCreator>();
} else if (partitionWriterType == "celeborn") {
shuffleWriterOptions.partition_writer_type = PartitionWriterType::kCeleborn;
Expand Down
13 changes: 2 additions & 11 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <thread>
#include "shuffle/Utils.h"
#include "utils/DebugOut.h"
#include "utils/StringUtil.h"
#include "utils/Timer.h"

namespace gluten {
Expand Down Expand Up @@ -169,22 +170,12 @@ std::string LocalPartitionWriter::nextSpilledFileDir() {
}

arrow::Status LocalPartitionWriter::setLocalDirs() {
ARROW_ASSIGN_OR_RAISE(configuredDirs_, getConfiguredLocalDirs());
configuredDirs_ = splitPaths(shuffleWriter_->options().local_dirs);
// Shuffle the configured local directories. This prevents each task from using the same directory for spilled files.
std::random_device rd;
std::default_random_engine engine(rd());
std::shuffle(configuredDirs_.begin(), configuredDirs_.end(), engine);

subDirSelection_.assign(configuredDirs_.size(), 0);

// Both data_file and shuffle_index_file should be set through jni.
// For test purpose, Create a temporary subdirectory in the system temporary
// dir with prefix "columnar-shuffle"
if (shuffleWriter_->options().data_file.length() == 0) {
std::string dataFileTemp;
size_t id = std::hash<std::thread::id>{}(std::this_thread::get_id()) % configuredDirs_.size();
ARROW_ASSIGN_OR_RAISE(shuffleWriter_->options().data_file, createTempShuffleFile(configuredDirs_[id]));
}
return arrow::Status::OK();
}

Expand Down
Loading

0 comments on commit a971e69

Please sign in to comment.