Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20240621) (#6170)
Browse files Browse the repository at this point in the history
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240621)

* Fix Build/UT due to ClickHouse/ClickHouse#65234

* fix style

* refactor test

---------

Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
  • Loading branch information
3 people authored Jun 21, 2024
1 parent e25ab2e commit ef7b2c5
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 51 deletions.
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240620
CH_COMMIT=f9c3886a767
CH_BRANCH=rebase_ch/20240621
CH_COMMIT=acf666c1c4f
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ void JoinRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & left,
rename_dag->getOutputs()[pos] = &alias;
}
}
rename_dag->projectInput();

QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(right.getCurrentDataStream(), rename_dag);
project_step->setStepDescription("Right Table Rename");
steps.emplace_back(project_step.get());
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ PrewhereInfoPtr MergeTreeRelParser::parsePreWhereInfo(const substrait::Expressio
prewhere_info->prewhere_column_name = filter_name;
prewhere_info->need_filter = true;
prewhere_info->remove_prewhere_column = true;
prewhere_info->prewhere_actions->projectInput(false);

for (const auto & name : input.getNames())
prewhere_info->prewhere_actions->tryRestoreColumn(name);
return prewhere_info;
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Parser/ProjectRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ ProjectRelParser::SplittedActionsDAGs ProjectRelParser::splitActionsDAGInGenerat
std::unordered_set<const ActionsDAG::Node *> first_split_nodes(array_join_node->children.begin(), array_join_node->children.end());
auto first_split_result = actions_dag->split(first_split_nodes);
res.before_array_join = first_split_result.first;
res.before_array_join->projectInput(true);

array_join_node = findArrayJoinNode(first_split_result.second);
std::unordered_set<const ActionsDAG::Node *> second_split_nodes = {array_join_node};
Expand Down
9 changes: 5 additions & 4 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ std::shared_ptr<ActionsDAG> SerializedPlanParser::expressionsToActionsDAG(
throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported projection type {}.", magic_enum::enum_name(expr.rex_type_case()));
}
actions_dag->project(required_columns);
actions_dag->appendInputsForUnusedColumns(header);
return actions_dag;
}

Expand Down Expand Up @@ -1790,7 +1791,7 @@ QueryPlanPtr SerializedPlanParser::parse(const std::string & plan)
QueryPlanPtr SerializedPlanParser::parseJson(const std::string & json_plan)
{
auto plan_ptr = std::make_unique<substrait::Plan>();
auto s = google::protobuf::util::JsonStringToMessage(absl::string_view(json_plan.c_str()), plan_ptr.get());
auto s = google::protobuf::util::JsonStringToMessage(absl::string_view(json_plan), plan_ptr.get());
if (!s.ok())
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from json string failed: {}", s.ToString());
return parse(std::move(plan_ptr));
Expand Down Expand Up @@ -1831,7 +1832,7 @@ void SerializedPlanParser::collectJoinKeys(
}
}

ActionsDAGPtr ASTParser::convertToActions(const NamesAndTypesList & name_and_types, const ASTPtr & ast)
ActionsDAG ASTParser::convertToActions(const NamesAndTypesList & name_and_types, const ASTPtr & ast) const
{
NamesAndTypesList aggregation_keys;
ColumnNumbersList aggregation_keys_indexes_list;
Expand All @@ -1840,9 +1841,9 @@ ActionsDAGPtr ASTParser::convertToActions(const NamesAndTypesList & name_and_typ
ActionsMatcher::Data visitor_data(
context,
size_limits_for_set,
size_t(0),
static_cast<size_t>(0),
name_and_types,
std::make_shared<ActionsDAG>(name_and_types),
ActionsDAG(name_and_types),
std::make_shared<PreparedSets>(),
false /* no_subqueries */,
false /* no_makeset */,
Expand Down
10 changes: 5 additions & 5 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
{"rand", "randCanonical"},
{"isnan", "isNaN"},
{"bin", "sparkBin"},
{"rint", "sparkRint"},
{"rint", "sparkRint"},

/// string functions
{"like", "like"},
Expand Down Expand Up @@ -151,7 +151,7 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
{"initcap", "initcapUTF8"},
{"conv", "sparkConv"},
{"uuid", "generateUUIDv4"},
{"levenshteinDistance", "editDistanceUTF8"},
{"levenshteinDistance", "editDistanceUTF8"},

/// hash functions
{"crc32", "CRC32"},
Expand Down Expand Up @@ -278,7 +278,7 @@ class SerializedPlanParser
materialize_inputs.emplace_back(materialize_input);
}

void addSplitInfo(std::string & split_info) { split_infos.emplace_back(std::move(split_info)); }
void addSplitInfo(std::string && split_info) { split_infos.emplace_back(std::move(split_info)); }

int nextSplitInfoIndex()
{
Expand Down Expand Up @@ -419,6 +419,7 @@ class LocalExecutor : public BlockIterator
RelMetricPtr getMetric() const { return metric; }
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);

Expand All @@ -434,7 +435,6 @@ class LocalExecutor : public BlockIterator
DB::QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;

};


Expand All @@ -450,7 +450,7 @@ class ASTParser
~ASTParser() = default;

ASTPtr parseToAST(const Names & names, const substrait::Expression & rel);
ActionsDAGPtr convertToActions(const NamesAndTypesList & name_and_types, const ASTPtr & ast);
ActionsDAG convertToActions(const NamesAndTypesList & name_and_types, const ASTPtr & ast) const;

private:
ContextPtr context;
Expand Down
72 changes: 37 additions & 35 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
#include <Shuffle/ShuffleWriter.h>
#include <Shuffle/ShuffleWriterBase.h>
#include <Shuffle/WriteBufferFromJavaOutputStream.h>
#include <Storages/Mergetree/MergeSparkMergeTreeTask.h>
#include <Storages/Mergetree/MetaDataHelper.h>
#include <Storages/Mergetree/SparkMergeTreeWriter.h>
#include <Storages/Output/BlockStripeSplitter.h>
#include <Storages/Output/FileWriterWrappers.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <google/protobuf/wrappers.pb.h>
#include <jni/ReservationListenerWrapper.h>
#include <jni/SharedPointerWrapper.h>
#include <jni/jni_common.h>
Expand All @@ -51,10 +55,6 @@
#include <Common/ExceptionUtils.h>
#include <Common/JNIUtils.h>
#include <Common/QueryContext.h>
#include <google/protobuf/wrappers.pb.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/Mergetree/MergeSparkMergeTreeTask.h>
#include <Storages/Mergetree/MetaDataHelper.h>


#ifdef __cplusplus
Expand Down Expand Up @@ -269,13 +269,12 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
parser.addInputIter(iter, materialize_input);
}

for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos); i < split_info_arr_size; i++) {
for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos); i < split_info_arr_size; i++)
{
jbyteArray split_info = static_cast<jbyteArray>(env->GetObjectArrayElement(split_infos, i));
jsize split_info_size = env->GetArrayLength(split_info);
std::string::size_type split_info_size = env->GetArrayLength(split_info);
jbyte * split_info_addr = env->GetByteArrayElements(split_info, nullptr);
std::string split_info_str;
split_info_str.assign(reinterpret_cast<const char *>(split_info_addr), split_info_size);
parser.addSplitInfo(split_info_str);
parser.addSplitInfo(std::string{reinterpret_cast<const char *>(split_info_addr), split_info_size});
}

jsize plan_size = env->GetArrayLength(plan);
Expand Down Expand Up @@ -630,8 +629,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size),
.spill_firstly_before_stop = static_cast<bool>(spill_firstly_before_stop),
.force_external_sort = static_cast<bool>(force_external_sort),
.force_mermory_sort = static_cast<bool>(force_memory_sort)
};
.force_mermory_sort = static_cast<bool>(force_memory_sort)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
if (prefer_spill)
Expand Down Expand Up @@ -696,8 +694,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.throw_if_memory_exceed = static_cast<bool>(throw_if_memory_exceed),
.flush_block_buffer_before_evict = static_cast<bool>(flush_block_buffer_before_evict),
.force_external_sort = static_cast<bool>(force_external_sort),
.force_mermory_sort = static_cast<bool>(force_memory_sort)
};
.force_mermory_sort = static_cast<bool>(force_memory_sort)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)};
Expand Down Expand Up @@ -768,8 +765,8 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_clo
}

// CHBlockConverterJniWrapper
JNIEXPORT jobject
Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_convertColumnarToRow(JNIEnv * env, jclass, jlong block_address, jintArray masks)
JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_convertColumnarToRow(
JNIEnv * env, jclass, jlong block_address, jintArray masks)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::CHColumnToSparkRow converter;
Expand Down Expand Up @@ -958,21 +955,18 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(
reinterpret_cast<const uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
coded_in.SetRecursionLimit(100000);

auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
if (!ok)
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");

substrait::ReadRel::ExtensionTable extension_table =
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
substrait::ReadRel::ExtensionTable extension_table = local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);

auto merge_tree_table = local_engine::MergeTreeRelParser::parseMergeTreeTable(extension_table);
auto uuid = uuid_str + "_" + task_id;
auto * writer = new local_engine::SparkMergeTreeWriter(
merge_tree_table, query_context, uuid, partition_dir, bucket_dir);
auto * writer = new local_engine::SparkMergeTreeWriter(merge_tree_table, query_context, uuid, partition_dir, bucket_dir);

env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
Expand Down Expand Up @@ -1044,8 +1038,8 @@ JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWr
LOCAL_ENGINE_JNI_METHOD_END(env, )
}

JNIEXPORT void
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree(JNIEnv * env, jobject, jlong instanceId, jlong block_address)
JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree(
JNIEnv * env, jobject, jlong instanceId, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::SparkMergeTreeWriter *>(instanceId);
Expand All @@ -1054,7 +1048,8 @@ Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMe
LOCAL_ENGINE_JNI_METHOD_END(env, )
}

JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId)
JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::SparkMergeTreeWriter *>(instanceId);
Expand All @@ -1067,7 +1062,14 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
}

JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeMergeMTParts(
JNIEnv * env, jobject, jbyteArray plan_, jbyteArray split_info_, jstring uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_)
JNIEnv * env,
jobject,
jbyteArray plan_,
jbyteArray split_info_,
jstring uuid_,
jstring task_id_,
jstring partition_dir_,
jstring bucket_dir_)
{
LOCAL_ENGINE_JNI_METHOD_START

Expand Down Expand Up @@ -1095,16 +1097,14 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(
reinterpret_cast<const uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
coded_in.SetRecursionLimit(100000);

auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
if (!ok)
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");

substrait::ReadRel::ExtensionTable extension_table =
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
substrait::ReadRel::ExtensionTable extension_table = local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
auto merge_tree_table = local_engine::parseMergeTreeTableString(table.value());
Expand All @@ -1114,12 +1114,12 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
= local_engine::MergeTreeRelParser::copyToVirtualStorage(merge_tree_table, local_engine::SerializedPlanParser::global_context);

local_engine::TempStorageFreer freer{temp_storage->getStorageID()}; // to release temp CustomStorageMergeTree with RAII
std::vector<DB::DataPartPtr> selected_parts
= local_engine::StorageMergeTreeFactory::instance().getDataPartsByNames(temp_storage->getStorageID(), "", merge_tree_table.getPartNames());
std::vector<DB::DataPartPtr> selected_parts = local_engine::StorageMergeTreeFactory::instance().getDataPartsByNames(
temp_storage->getStorageID(), "", merge_tree_table.getPartNames());

std::unordered_map<String, String> partition_values;
std::vector<MergeTreeDataPartPtr> loaded =
local_engine::mergeParts(selected_parts, partition_values, uuid_str, temp_storage, partition_dir, bucket_dir);
std::vector<MergeTreeDataPartPtr> loaded
= local_engine::mergeParts(selected_parts, partition_values, uuid_str, temp_storage, partition_dir, bucket_dir);

std::vector<local_engine::PartInfo> res;
for (auto & partPtr : loaded)
Expand Down Expand Up @@ -1156,7 +1156,8 @@ JNIEXPORT jobject Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
partition_col_indice_vec.push_back(pIndice[i]);

env->ReleaseIntArrayElements(partitionColIndice, pIndice, JNI_ABORT);
local_engine::BlockStripes bs = local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_partition_columns);
local_engine::BlockStripes bs
= local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_partition_columns);


auto * addresses = env->NewLongArray(bs.block_addresses.size());
Expand Down Expand Up @@ -1366,7 +1367,8 @@ JNIEXPORT jlong Java_org_apache_gluten_memory_alloc_CHNativeMemoryAllocator_getD
return -1;
}

JNIEXPORT jlong Java_org_apache_gluten_memory_alloc_CHNativeMemoryAllocator_createListenableAllocator(JNIEnv * env, jclass, jobject listener)
JNIEXPORT jlong
Java_org_apache_gluten_memory_alloc_CHNativeMemoryAllocator_createListenableAllocator(JNIEnv * env, jclass, jobject listener)
{
LOCAL_ENGINE_JNI_METHOD_START
auto listener_wrapper = std::make_shared<local_engine::ReservationListenerWrapper>(env->NewGlobalRef(listener));
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/tests/gluten_test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ ActionsDAGPtr parseFilter(const std::string & filter, const AnotherRowType & nam
size_limits_for_set,
static_cast<size_t>(0),
name_and_types,
std::make_shared<ActionsDAG>(name_and_types),
ActionsDAG(name_and_types),
prepared_sets /* prepared_sets */,
false /* no_subqueries */,
false /* no_makeset */,
false /* only_consts */,
info);
ActionsVisitor(visitor_data).visit(ast_exp);
return ActionsDAG::buildFilterActionsDAG({visitor_data.getActions()->getOutputs().back()}, node_name_to_input_column);
return ActionsDAG::buildFilterActionsDAG({visitor_data.getActions().getOutputs().back()}, node_name_to_input_column);
}

const char * get_data_dir()
Expand Down
Loading

0 comments on commit ef7b2c5

Please sign in to comment.