Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20241130) (#8112)
Browse files Browse the repository at this point in the history
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241130)

* Fix Build due to ClickHouse/ClickHouse#71406

* Fix build due to ClickHouse/ClickHouse#72460

---------

Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2024
1 parent 920bd8c commit 31e1b74
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 28 deletions.
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241129
CH_COMMIT=101ba3f944d1
CH_BRANCH=rebase_ch/20241130
CH_COMMIT=d5d38588bd3
39 changes: 21 additions & 18 deletions cpp-ch/local-engine/Common/AggregateUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
extern const SettingsUInt64 group_by_two_level_threshold;
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsUInt64 max_block_size;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsBool enable_software_prefetch_in_aggregation;
}

template <typename Method>
Expand Down Expand Up @@ -186,15 +189,15 @@ DB::Block AggregateDataBlockConverter::next()
}

DB::Aggregator::Params AggregatorParamsHelper::buildParams(
DB::ContextPtr context,
const DB::ContextPtr & context,
const DB::Names & grouping_keys,
const DB::AggregateDescriptions & agg_descriptions,
Mode mode,
Algorithm algorithm)
{
const auto & settings = context->getSettingsRef();
size_t max_rows_to_group_by = mode == Mode::PARTIAL_TO_FINISHED ? 0 : static_cast<size_t>(settings[DB::Setting::max_rows_to_group_by]);
DB::OverflowMode group_by_overflow_mode = settings[DB::Setting::group_by_overflow_mode];

size_t group_by_two_level_threshold
= algorithm == Algorithm::GlutenGraceAggregate ? static_cast<size_t>(settings[DB::Setting::group_by_two_level_threshold]) : 0;
size_t group_by_two_level_threshold_bytes = algorithm == Algorithm::GlutenGraceAggregate
Expand All @@ -207,39 +210,39 @@ DB::Aggregator::Params AggregatorParamsHelper::buildParams(
? false
: (mode == Mode::PARTIAL_TO_FINISHED ? false : static_cast<bool>(settings[DB::Setting::empty_result_for_aggregation_by_empty_set]));
DB::TemporaryDataOnDiskScopePtr tmp_data_scope = algorithm == Algorithm::GlutenGraceAggregate ? nullptr : context->getTempDataOnDisk();
size_t max_threads = settings[DB::Setting::max_threads];

size_t min_free_disk_space = algorithm == Algorithm::GlutenGraceAggregate
? 0
: static_cast<size_t>(settings[DB::Setting::min_free_disk_space_for_temporary_data]);
bool compile_aggregate_expressions = mode == Mode::PARTIAL_TO_FINISHED ? false : true;
size_t min_count_to_compile_aggregate_expression = mode == Mode::PARTIAL_TO_FINISHED ? 0 : 3;
bool compile_aggregate_expressions = mode == Mode::PARTIAL_TO_FINISHED ? false : settings[DB::Setting::compile_aggregate_expressions];
size_t min_count_to_compile_aggregate_expression = mode == Mode::PARTIAL_TO_FINISHED ? 0 : settings[DB::Setting::min_count_to_compile_aggregate_expression];
size_t max_block_size = PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]);
bool enable_prefetch = mode == Mode::PARTIAL_TO_FINISHED ? false : true;
bool enable_prefetch = mode != Mode::PARTIAL_TO_FINISHED;
bool only_merge = mode == Mode::PARTIAL_TO_FINISHED;
bool optimize_group_by_constant_keys
= mode == Mode::PARTIAL_TO_FINISHED ? false : settings[DB::Setting::optimize_group_by_constant_keys];
double min_hit_rate_to_use_consecutive_keys_optimization = settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization];

DB::Settings aggregate_settings{settings};
aggregate_settings[DB::Setting::max_rows_to_group_by] = max_rows_to_group_by;
aggregate_settings[DB::Setting::max_bytes_before_external_group_by] = max_bytes_before_external_group_by;
aggregate_settings[DB::Setting::min_free_disk_space_for_temporary_data] = min_free_disk_space;
aggregate_settings[DB::Setting::compile_aggregate_expressions] = compile_aggregate_expressions;
aggregate_settings[DB::Setting::min_count_to_compile_aggregate_expression] = min_count_to_compile_aggregate_expression;
aggregate_settings[DB::Setting::max_block_size] = max_block_size;
aggregate_settings[DB::Setting::enable_software_prefetch_in_aggregation] = enable_prefetch;
aggregate_settings[DB::Setting::optimize_group_by_constant_keys] = optimize_group_by_constant_keys;
DB::Aggregator::Params params(
aggregate_settings,
grouping_keys,
agg_descriptions,
false,
max_rows_to_group_by,
group_by_overflow_mode,
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
max_bytes_before_external_group_by,
empty_result_for_aggregation_by_empty_set,
tmp_data_scope,
max_threads,
min_free_disk_space,
compile_aggregate_expressions,
min_count_to_compile_aggregate_expression,
max_block_size,
enable_prefetch,
only_merge,
optimize_group_by_constant_keys,
min_hit_rate_to_use_consecutive_keys_optimization,
{});

return params;
}

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/AggregateUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class AggregatorParamsHelper

// for using grace aggregating, never enable ch spill, otherwise there will be data lost.
static DB::Aggregator::Params buildParams(
DB::ContextPtr context,
const DB::ContextPtr & context,
const DB::Names & grouping_keys,
const DB::AggregateDescriptions & agg_descriptions,
Mode mode,
Expand Down
11 changes: 9 additions & 2 deletions cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const substrait::CrossRel & join, DB:
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);
join_step->setStepDescription("CROSS_JOIN");
steps.emplace_back(join_step.get());
std::vector<QueryPlanPtr> plans;
Expand Down Expand Up @@ -254,7 +256,12 @@ void CrossRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & left
NameSet left_columns_set;
for (const auto & col : left.getCurrentHeader().getNames())
left_columns_set.emplace(col);
table_join.setColumnsFromJoinedTable(right.getCurrentHeader().getNamesAndTypesList(), left_columns_set, getUniqueName("right") + ".");

table_join.setColumnsFromJoinedTable(
right.getCurrentHeader().getNamesAndTypesList(),
left_columns_set,
getUniqueName("right") + ".",
left.getCurrentHeader().getNamesAndTypesList());

// fix right table key duplicate
NamesWithAliases right_table_alias;
Expand Down
18 changes: 14 additions & 4 deletions cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);

join_step->setStepDescription("SORT_MERGE_JOIN");
steps.emplace_back(join_step.get());
Expand Down Expand Up @@ -390,7 +392,11 @@ void JoinRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & left,
NameSet left_columns_set;
for (const auto & col : left.getCurrentHeader().getNames())
left_columns_set.emplace(col);
table_join.setColumnsFromJoinedTable(right.getCurrentHeader().getNamesAndTypesList(), left_columns_set, getUniqueName("right") + ".");
table_join.setColumnsFromJoinedTable(
right.getCurrentHeader().getNamesAndTypesList(),
left_columns_set,
getUniqueName("right") + ".",
left.getCurrentHeader().getNamesAndTypesList());

// fix right table key duplicate
NamesWithAliases right_table_alias;
Expand Down Expand Up @@ -787,7 +793,9 @@ DB::QueryPlanPtr JoinRelParser::buildMultiOnClauseHashJoin(
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);
join_step->setStepDescription("Multi join on clause hash join");
steps.emplace_back(join_step.get());
std::vector<QueryPlanPtr> plans;
Expand Down Expand Up @@ -827,7 +835,9 @@ DB::QueryPlanPtr JoinRelParser::buildSingleOnClauseHashJoin(
context->getSettingsRef()[Setting::max_block_size],
context->getSettingsRef()[Setting::min_joined_block_size_bytes],
1,
false);
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);

join_step->setStepDescription("HASH_JOIN");
steps.emplace_back(join_step.get());
Expand Down
6 changes: 5 additions & 1 deletion cpp-ch/local-engine/tests/gtest_ch_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ TEST(TestJoin, simple)
for (const auto & column : join->columnsFromJoinedTable())
join->addJoinedColumn(column);

auto columns_from_left_table = left_plan.getCurrentHeader().getNamesAndTypesList();
for (auto & column_from_joined_table : columns_from_left_table)
join->setUsedColumn(column_from_joined_table, JoinTableSide::Left);

auto left_keys = left.getNamesAndTypesList();
join->addJoinedColumnsAndCorrectTypes(left_keys, true);
std::cerr << "after join:\n";
Expand All @@ -123,7 +127,7 @@ TEST(TestJoin, simple)
auto hash_join = std::make_shared<HashJoin>(join, right_plan.getCurrentHeader());

QueryPlanStepPtr join_step
= std::make_unique<JoinStep>(left_plan.getCurrentHeader(), right_plan.getCurrentHeader(), hash_join, 8192, 8192, 1, false);
= std::make_unique<JoinStep>(left_plan.getCurrentHeader(), right_plan.getCurrentHeader(), hash_join, 8192, 8192, 1, NameSet{}, false, false);

std::cerr << "join step:" << join_step->getOutputHeader().dumpStructure() << std::endl;

Expand Down

0 comments on commit 31e1b74

Please sign in to comment.