Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20240730) (#6640)
Browse files Browse the repository at this point in the history
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240730)

* Fix build due to ClickHouse/ClickHouse#65414
* FIXME: very slow after #6558

---------

Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
  • Loading branch information
3 people authored Jul 30, 2024
1 parent 97f4eb2 commit 3a5e5b1
Show file tree
Hide file tree
Showing 114 changed files with 451 additions and 510 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
}
}

test("test mergetree path based write with bucket table") {
// FIXME: very slow after https://github.com/apache/incubator-gluten/pull/6558
ignore("test mergetree path based write with bucket table") {
val dataPath = s"$basePath/lineitem_mergetree_bucket"
clearDataPath(dataPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,8 @@ class GlutenClickHouseMergeTreeWriteSuite
}
}

test("test mergetree write with bucket table") {
// FIXME: very slow after https://github.com/apache/incubator-gluten/pull/6558
ignore("test mergetree write with bucket table") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_bucket;
|""".stripMargin)
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240727
CH_COMMIT=d09605082e3
CH_BRANCH=rebase_ch/20240730
CH_COMMIT=f69def8b6a8

14 changes: 7 additions & 7 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ const DB::ColumnWithTypeAndName * NestedColumnExtractHelper::findColumn(const DB
}

const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeType(
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const std::string & type_name,
const std::string & result_name,
Expand All @@ -469,16 +469,16 @@ const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeType(
type_name_col.name = type_name;
type_name_col.column = DB::DataTypeString().createColumnConst(0, type_name_col.name);
type_name_col.type = std::make_shared<DB::DataTypeString>();
const auto * right_arg = &actions_dag->addColumn(std::move(type_name_col));
const auto * right_arg = &actions_dag.addColumn(std::move(type_name_col));
const auto * left_arg = node;
DB::CastDiagnostic diagnostic = {node->result_name, node->result_name};
DB::ActionsDAG::NodeRawConstPtrs children = {left_arg, right_arg};
return &actions_dag->addFunction(
return &actions_dag.addFunction(
DB::createInternalCastOverloadResolver(cast_type, std::move(diagnostic)), std::move(children), result_name);
}

const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeTypeIfNeeded(
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const DB::DataTypePtr & dst_type,
const std::string & result_name,
Expand Down Expand Up @@ -1079,14 +1079,14 @@ UInt64 MemoryUtil::getMemoryRSS()

void JoinUtil::reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols)
{
ActionsDAGPtr project = std::make_shared<ActionsDAG>(plan.getCurrentDataStream().header.getNamesAndTypesList());
ActionsDAG project{plan.getCurrentDataStream().header.getNamesAndTypesList()};
NamesWithAliases project_cols;
for (const auto & col : cols)
{
project_cols.emplace_back(NameWithAlias(col, col));
}
project->project(project_cols);
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), project);
project.project(project_cols);
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), std::move(project));
project_step->setStepDescription("Reorder Join Output");
plan.addStep(std::move(project_step));
}
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ class ActionsDAGUtil
{
public:
static const DB::ActionsDAG::Node * convertNodeType(
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const std::string & type_name,
const std::string & result_name = "",
DB::CastType cast_type = DB::CastType::nonAccurate);

static const DB::ActionsDAG::Node * convertNodeTypeIfNeeded(
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const DB::DataTypePtr & dst_type,
const std::string & result_name = "",
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/MergeTreeTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(
if (table.order_by_key != MergeTreeTable::TUPLE)
metadata->primary_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context);
else
metadata->primary_key.expression = std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>());
metadata->primary_key.expression = std::make_shared<ExpressionActions>(ActionsDAG{});
}
else
{
Expand Down
10 changes: 5 additions & 5 deletions cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace local_engine

DB::ActionsDAG::NodeRawConstPtrs AggregateFunctionParser::parseFunctionArguments(
const CommonFunctionInfo & func_info,
DB::ActionsDAGPtr & actions_dag) const
DB::ActionsDAG & actions_dag) const
{
DB::ActionsDAG::NodeRawConstPtrs collected_args;
for (const auto & arg : func_info.arguments)
Expand All @@ -56,7 +56,7 @@ DB::ActionsDAG::NodeRawConstPtrs AggregateFunctionParser::parseFunctionArguments
DB::ActionsDAG::NodeRawConstPtrs args;
args.emplace_back(arg_node);
const auto * node = toFunctionNode(actions_dag, "toNullable", args);
actions_dag->addOrReplaceInOutputs(*node);
actions_dag.addOrReplaceInOutputs(*node);
arg_node = node;
}

Expand Down Expand Up @@ -147,7 +147,7 @@ std::pair<String, DB::DataTypes> AggregateFunctionParser::tryApplyCHCombinator(
const DB::ActionsDAG::Node * AggregateFunctionParser::convertNodeTypeIfNeeded(
const CommonFunctionInfo & func_info,
const DB::ActionsDAG::Node * func_node,
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
bool with_nullability) const
{
const auto & output_type = func_info.output_type;
Expand All @@ -156,7 +156,7 @@ const DB::ActionsDAG::Node * AggregateFunctionParser::convertNodeTypeIfNeeded(
{
func_node = ActionsDAGUtil::convertNodeType(
actions_dag, func_node, TypeParser::parseType(output_type)->getName(), func_node->result_name);
actions_dag->addOrReplaceInOutputs(*func_node);
actions_dag.addOrReplaceInOutputs(*func_node);
}

if (output_type.has_decimal())
Expand All @@ -167,7 +167,7 @@ const DB::ActionsDAG::Node * AggregateFunctionParser::convertNodeTypeIfNeeded(
plan_parser->addColumn(actions_dag, std::make_shared<DataTypeInt32>(), output_type.decimal().precision()),
plan_parser->addColumn(actions_dag, std::make_shared<DataTypeInt32>(), output_type.decimal().scale())};
func_node = toFunctionNode(actions_dag, checkDecimalOverflowSparkOrNull, func_node->result_name, overflow_args);
actions_dag->addOrReplaceInOutputs(*func_node);
actions_dag.addOrReplaceInOutputs(*func_node);
}

return func_node;
Expand Down
16 changes: 8 additions & 8 deletions cpp-ch/local-engine/Parser/AggregateFunctionParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class AggregateFunctionParser

/// Do some preprojections for the function arguments, and return the necessary arguments for the CH function.
virtual DB::ActionsDAG::NodeRawConstPtrs
parseFunctionArguments(const CommonFunctionInfo & func_info, DB::ActionsDAGPtr & actions_dag) const;
parseFunctionArguments(const CommonFunctionInfo & func_info, DB::ActionsDAG & actions_dag) const;

// `PartialMerge` is applied on the merging stages.
// `If` is applied when the aggreate function has a filter. This should only happen on the 1st stage.
Expand All @@ -109,7 +109,7 @@ class AggregateFunctionParser
virtual const DB::ActionsDAG::Node * convertNodeTypeIfNeeded(
const CommonFunctionInfo & func_info,
const DB::ActionsDAG::Node * func_node,
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
bool with_nullability) const;

/// Parameters are only used in aggregate functions at present. e.g. percentiles(0.5)(x).
Expand All @@ -129,28 +129,28 @@ class AggregateFunctionParser
String getUniqueName(const String & name) const { return plan_parser->getUniqueName(name); }

const DB::ActionsDAG::Node *
addColumnToActionsDAG(DB::ActionsDAGPtr & actions_dag, const DB::DataTypePtr & type, const DB::Field & field) const
addColumnToActionsDAG(DB::ActionsDAG & actions_dag, const DB::DataTypePtr & type, const DB::Field & field) const
{
return &actions_dag->addColumn(ColumnWithTypeAndName(type->createColumnConst(1, field), type, getUniqueName(toString(field))));
return &actions_dag.addColumn(ColumnWithTypeAndName(type->createColumnConst(1, field), type, getUniqueName(toString(field))));
}

const DB::ActionsDAG::Node *
toFunctionNode(DB::ActionsDAGPtr & action_dag, const String & func_name, const DB::ActionsDAG::NodeRawConstPtrs & args) const
toFunctionNode(DB::ActionsDAG & action_dag, const String & func_name, const DB::ActionsDAG::NodeRawConstPtrs & args) const
{
return plan_parser->toFunctionNode(action_dag, func_name, args);
}

const DB::ActionsDAG::Node * toFunctionNode(
DB::ActionsDAGPtr & action_dag,
DB::ActionsDAG & action_dag,
const String & func_name,
const String & result_name,
const DB::ActionsDAG::NodeRawConstPtrs & args) const
{
auto function_builder = DB::FunctionFactory::instance().get(func_name, getContext());
return &action_dag->addFunction(function_builder, args, result_name);
return &action_dag.addFunction(function_builder, args, result_name);
}

const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAGPtr actions_dag, const substrait::Expression & rel) const
const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAG& actions_dag, const substrait::Expression & rel) const
{
return plan_parser->parseExpression(actions_dag, rel);
}
Expand Down
24 changes: 12 additions & 12 deletions cpp-ch/local-engine/Parser/AggregateRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ void AggregateRelParser::setup(DB::QueryPlanPtr query_plan, const substrait::Rel
void AggregateRelParser::addPreProjection()
{
auto input_header = plan->getCurrentDataStream().header;
ActionsDAGPtr projection_action = std::make_shared<ActionsDAG>(input_header.getColumnsWithTypeAndName());
std::string dag_footprint = projection_action->dumpDAG();
ActionsDAG projection_action{input_header.getColumnsWithTypeAndName()};
std::string dag_footprint = projection_action.dumpDAG();
for (auto & agg_info : aggregates)
{
auto arg_nodes = agg_info.function_parser->parseFunctionArguments(agg_info.parser_func_info, projection_action);
Expand All @@ -179,14 +179,14 @@ void AggregateRelParser::addPreProjection()
{
agg_info.arg_column_names.emplace_back(arg_node->result_name);
agg_info.arg_column_types.emplace_back(arg_node->result_type);
projection_action->addOrReplaceInOutputs(*arg_node);
projection_action.addOrReplaceInOutputs(*arg_node);
}
}
if (projection_action->dumpDAG() != dag_footprint)
if (projection_action.dumpDAG() != dag_footprint)
{
/// Avoid unnecessary evaluation
projection_action->removeUnusedActions();
auto projection_step = std::make_unique<DB::ExpressionStep>(plan->getCurrentDataStream(), projection_action);
projection_action.removeUnusedActions();
auto projection_step = std::make_unique<DB::ExpressionStep>(plan->getCurrentDataStream(), std::move(projection_action));
projection_step->setStepDescription("Projection before aggregate");
steps.emplace_back(projection_step.get());
plan->addStep(std::move(projection_step));
Expand Down Expand Up @@ -482,14 +482,14 @@ void AggregateRelParser::addAggregatingStep()
void AggregateRelParser::addPostProjection()
{
auto input_header = plan->getCurrentDataStream().header;
ActionsDAGPtr project_actions_dag = std::make_shared<ActionsDAG>(input_header.getColumnsWithTypeAndName());
auto dag_footprint = project_actions_dag->dumpDAG();
ActionsDAG project_actions_dag{input_header.getColumnsWithTypeAndName()};
auto dag_footprint = project_actions_dag.dumpDAG();

if (has_final_stage)
{
for (const auto & agg_info : aggregates)
{
for (const auto * input_node : project_actions_dag->getInputs())
for (const auto * input_node : project_actions_dag.getInputs())
{
if (input_node->result_name == agg_info.measure_column_name)
{
Expand All @@ -503,7 +503,7 @@ void AggregateRelParser::addPostProjection()
// on the complete mode, it must consider the nullability when converting node type
for (const auto & agg_info : aggregates)
{
for (const auto * output_node : project_actions_dag->getOutputs())
for (const auto * output_node : project_actions_dag.getOutputs())
{
if (output_node->result_name == agg_info.measure_column_name)
{
Expand All @@ -512,9 +512,9 @@ void AggregateRelParser::addPostProjection()
}
}
}
if (project_actions_dag->dumpDAG() != dag_footprint)
if (project_actions_dag.dumpDAG() != dag_footprint)
{
QueryPlanStepPtr convert_step = std::make_unique<ExpressionStep>(plan->getCurrentDataStream(), project_actions_dag);
QueryPlanStepPtr convert_step = std::make_unique<ExpressionStep>(plan->getCurrentDataStream(), std::move(project_actions_dag));
convert_step->setStepDescription("Post-projection for aggregate");
steps.emplace_back(convert_step.get());
plan->addStep(std::move(convert_step));
Expand Down
Loading

0 comments on commit 3a5e5b1

Please sign in to comment.