Skip to content

Commit

Permalink
Fix build due to ClickHouse/ClickHouse#65414
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Jul 30, 2024
1 parent 8c44957 commit db79858
Show file tree
Hide file tree
Showing 111 changed files with 443 additions and 504 deletions.
12 changes: 6 additions & 6 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ const DB::ColumnWithTypeAndName * NestedColumnExtractHelper::findColumn(const DB
}

const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeType(
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const std::string & type_name,
const std::string & result_name,
Expand All @@ -469,11 +469,11 @@ const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeType(
type_name_col.name = type_name;
type_name_col.column = DB::DataTypeString().createColumnConst(0, type_name_col.name);
type_name_col.type = std::make_shared<DB::DataTypeString>();
const auto * right_arg = &actions_dag->addColumn(std::move(type_name_col));
const auto * right_arg = &actions_dag.addColumn(std::move(type_name_col));
const auto * left_arg = node;
DB::CastDiagnostic diagnostic = {node->result_name, node->result_name};
DB::ActionsDAG::NodeRawConstPtrs children = {left_arg, right_arg};
return &actions_dag->addFunction(
return &actions_dag.addFunction(
DB::createInternalCastOverloadResolver(cast_type, std::move(diagnostic)), std::move(children), result_name);
}

Expand Down Expand Up @@ -1079,14 +1079,14 @@ UInt64 MemoryUtil::getMemoryRSS()

void JoinUtil::reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols)
{
ActionsDAGPtr project = std::make_shared<ActionsDAG>(plan.getCurrentDataStream().header.getNamesAndTypesList());
ActionsDAG project{plan.getCurrentDataStream().header.getNamesAndTypesList()};
NamesWithAliases project_cols;
for (const auto & col : cols)
{
project_cols.emplace_back(NameWithAlias(col, col));
}
project->project(project_cols);
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), project);
project.project(project_cols);
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), std::move(project));
project_step->setStepDescription("Reorder Join Output");
plan.addStep(std::move(project_step));
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class ActionsDAGUtil
{
public:
static const DB::ActionsDAG::Node * convertNodeType(
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const std::string & type_name,
const std::string & result_name = "",
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/MergeTreeTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(
if (table.order_by_key != MergeTreeTable::TUPLE)
metadata->primary_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context);
else
metadata->primary_key.expression = std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>());
metadata->primary_key.expression = std::make_shared<ExpressionActions>(ActionsDAG{});
}
else
{
Expand Down
10 changes: 5 additions & 5 deletions cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace local_engine

DB::ActionsDAG::NodeRawConstPtrs AggregateFunctionParser::parseFunctionArguments(
const CommonFunctionInfo & func_info,
DB::ActionsDAGPtr & actions_dag) const
DB::ActionsDAG & actions_dag) const
{
DB::ActionsDAG::NodeRawConstPtrs collected_args;
for (const auto & arg : func_info.arguments)
Expand All @@ -56,7 +56,7 @@ DB::ActionsDAG::NodeRawConstPtrs AggregateFunctionParser::parseFunctionArguments
DB::ActionsDAG::NodeRawConstPtrs args;
args.emplace_back(arg_node);
const auto * node = toFunctionNode(actions_dag, "toNullable", args);
actions_dag->addOrReplaceInOutputs(*node);
actions_dag.addOrReplaceInOutputs(*node);
arg_node = node;
}

Expand Down Expand Up @@ -147,7 +147,7 @@ std::pair<String, DB::DataTypes> AggregateFunctionParser::tryApplyCHCombinator(
const DB::ActionsDAG::Node * AggregateFunctionParser::convertNodeTypeIfNeeded(
const CommonFunctionInfo & func_info,
const DB::ActionsDAG::Node * func_node,
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
bool with_nullability) const
{
const auto & output_type = func_info.output_type;
Expand All @@ -156,7 +156,7 @@ const DB::ActionsDAG::Node * AggregateFunctionParser::convertNodeTypeIfNeeded(
{
func_node = ActionsDAGUtil::convertNodeType(
actions_dag, func_node, TypeParser::parseType(output_type)->getName(), func_node->result_name);
actions_dag->addOrReplaceInOutputs(*func_node);
actions_dag.addOrReplaceInOutputs(*func_node);
}

if (output_type.has_decimal())
Expand All @@ -167,7 +167,7 @@ const DB::ActionsDAG::Node * AggregateFunctionParser::convertNodeTypeIfNeeded(
plan_parser->addColumn(actions_dag, std::make_shared<DataTypeInt32>(), output_type.decimal().precision()),
plan_parser->addColumn(actions_dag, std::make_shared<DataTypeInt32>(), output_type.decimal().scale())};
func_node = toFunctionNode(actions_dag, checkDecimalOverflowSparkOrNull, func_node->result_name, overflow_args);
actions_dag->addOrReplaceInOutputs(*func_node);
actions_dag.addOrReplaceInOutputs(*func_node);
}

return func_node;
Expand Down
16 changes: 8 additions & 8 deletions cpp-ch/local-engine/Parser/AggregateFunctionParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class AggregateFunctionParser

/// Do some preprojections for the function arguments, and return the necessary arguments for the CH function.
virtual DB::ActionsDAG::NodeRawConstPtrs
parseFunctionArguments(const CommonFunctionInfo & func_info, DB::ActionsDAGPtr & actions_dag) const;
parseFunctionArguments(const CommonFunctionInfo & func_info, DB::ActionsDAG & actions_dag) const;

// `PartialMerge` is applied on the merging stages.
// `If` is applied when the aggreate function has a filter. This should only happen on the 1st stage.
Expand All @@ -109,7 +109,7 @@ class AggregateFunctionParser
virtual const DB::ActionsDAG::Node * convertNodeTypeIfNeeded(
const CommonFunctionInfo & func_info,
const DB::ActionsDAG::Node * func_node,
DB::ActionsDAGPtr & actions_dag,
DB::ActionsDAG & actions_dag,
bool with_nullability) const;

/// Parameters are only used in aggregate functions at present. e.g. percentiles(0.5)(x).
Expand All @@ -129,28 +129,28 @@ class AggregateFunctionParser
String getUniqueName(const String & name) const { return plan_parser->getUniqueName(name); }

const DB::ActionsDAG::Node *
addColumnToActionsDAG(DB::ActionsDAGPtr & actions_dag, const DB::DataTypePtr & type, const DB::Field & field) const
addColumnToActionsDAG(DB::ActionsDAG & actions_dag, const DB::DataTypePtr & type, const DB::Field & field) const
{
return &actions_dag->addColumn(ColumnWithTypeAndName(type->createColumnConst(1, field), type, getUniqueName(toString(field))));
return &actions_dag.addColumn(ColumnWithTypeAndName(type->createColumnConst(1, field), type, getUniqueName(toString(field))));
}

const DB::ActionsDAG::Node *
toFunctionNode(DB::ActionsDAGPtr & action_dag, const String & func_name, const DB::ActionsDAG::NodeRawConstPtrs & args) const
toFunctionNode(DB::ActionsDAG & action_dag, const String & func_name, const DB::ActionsDAG::NodeRawConstPtrs & args) const
{
return plan_parser->toFunctionNode(action_dag, func_name, args);
}

const DB::ActionsDAG::Node * toFunctionNode(
DB::ActionsDAGPtr & action_dag,
DB::ActionsDAG & action_dag,
const String & func_name,
const String & result_name,
const DB::ActionsDAG::NodeRawConstPtrs & args) const
{
auto function_builder = DB::FunctionFactory::instance().get(func_name, getContext());
return &action_dag->addFunction(function_builder, args, result_name);
return &action_dag.addFunction(function_builder, args, result_name);
}

const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAGPtr actions_dag, const substrait::Expression & rel) const
const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAG& actions_dag, const substrait::Expression & rel) const
{
return plan_parser->parseExpression(actions_dag, rel);
}
Expand Down
24 changes: 12 additions & 12 deletions cpp-ch/local-engine/Parser/AggregateRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ void AggregateRelParser::setup(DB::QueryPlanPtr query_plan, const substrait::Rel
void AggregateRelParser::addPreProjection()
{
auto input_header = plan->getCurrentDataStream().header;
ActionsDAGPtr projection_action = std::make_shared<ActionsDAG>(input_header.getColumnsWithTypeAndName());
std::string dag_footprint = projection_action->dumpDAG();
ActionsDAG projection_action{input_header.getColumnsWithTypeAndName()};
std::string dag_footprint = projection_action.dumpDAG();
for (auto & agg_info : aggregates)
{
auto arg_nodes = agg_info.function_parser->parseFunctionArguments(agg_info.parser_func_info, projection_action);
Expand All @@ -179,14 +179,14 @@ void AggregateRelParser::addPreProjection()
{
agg_info.arg_column_names.emplace_back(arg_node->result_name);
agg_info.arg_column_types.emplace_back(arg_node->result_type);
projection_action->addOrReplaceInOutputs(*arg_node);
projection_action.addOrReplaceInOutputs(*arg_node);
}
}
if (projection_action->dumpDAG() != dag_footprint)
if (projection_action.dumpDAG() != dag_footprint)
{
/// Avoid unnecessary evaluation
projection_action->removeUnusedActions();
auto projection_step = std::make_unique<DB::ExpressionStep>(plan->getCurrentDataStream(), projection_action);
projection_action.removeUnusedActions();
auto projection_step = std::make_unique<DB::ExpressionStep>(plan->getCurrentDataStream(), std::move(projection_action));
projection_step->setStepDescription("Projection before aggregate");
steps.emplace_back(projection_step.get());
plan->addStep(std::move(projection_step));
Expand Down Expand Up @@ -482,14 +482,14 @@ void AggregateRelParser::addAggregatingStep()
void AggregateRelParser::addPostProjection()
{
auto input_header = plan->getCurrentDataStream().header;
ActionsDAGPtr project_actions_dag = std::make_shared<ActionsDAG>(input_header.getColumnsWithTypeAndName());
auto dag_footprint = project_actions_dag->dumpDAG();
ActionsDAG project_actions_dag{input_header.getColumnsWithTypeAndName()};
auto dag_footprint = project_actions_dag.dumpDAG();

if (has_final_stage)
{
for (const auto & agg_info : aggregates)
{
for (const auto * input_node : project_actions_dag->getInputs())
for (const auto * input_node : project_actions_dag.getInputs())
{
if (input_node->result_name == agg_info.measure_column_name)
{
Expand All @@ -503,7 +503,7 @@ void AggregateRelParser::addPostProjection()
// on the complete mode, it must consider the nullability when converting node type
for (const auto & agg_info : aggregates)
{
for (const auto * output_node : project_actions_dag->getOutputs())
for (const auto * output_node : project_actions_dag.getOutputs())
{
if (output_node->result_name == agg_info.measure_column_name)
{
Expand All @@ -512,9 +512,9 @@ void AggregateRelParser::addPostProjection()
}
}
}
if (project_actions_dag->dumpDAG() != dag_footprint)
if (project_actions_dag.dumpDAG() != dag_footprint)
{
QueryPlanStepPtr convert_step = std::make_unique<ExpressionStep>(plan->getCurrentDataStream(), project_actions_dag);
QueryPlanStepPtr convert_step = std::make_unique<ExpressionStep>(plan->getCurrentDataStream(), std::move(project_actions_dag));
convert_step->setStepDescription("Post-projection for aggregate");
steps.emplace_back(convert_step.get());
plan->addStep(std::move(convert_step));
Expand Down
49 changes: 21 additions & 28 deletions cpp-ch/local-engine/Parser/CrossRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,17 @@ DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, std::list<c

void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & right, const StorageJoinFromReadBuffer & storage_join)
{
ActionsDAGPtr project = nullptr;
/// To support mixed join conditions, we must make sure that the column names in the right be the same as
/// storage_join's right sample block.
auto right_ori_header = right.getCurrentDataStream().header.getColumnsWithTypeAndName();
if (right_ori_header.size() > 0 && right_ori_header[0].name != BlockUtil::VIRTUAL_ROW_COUNT_COLUMN)
{
project = ActionsDAG::makeConvertingActions(
ActionsDAG right_project = ActionsDAG::makeConvertingActions(
right_ori_header, storage_join.getRightSampleBlock().getColumnsWithTypeAndName(), ActionsDAG::MatchColumnsMode::Position);
if (project)
{
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(right.getCurrentDataStream(), project);
project_step->setStepDescription("Rename Broadcast Table Name");
steps.emplace_back(project_step.get());
right.addStep(std::move(project_step));
}
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(right.getCurrentDataStream(), std::move(right_project));
project_step->setStepDescription("Rename Broadcast Table Name");
steps.emplace_back(project_step.get());
right.addStep(std::move(project_step));
}

/// If the columns name in right table is duplicated with left table, we need to rename the left table's columns,
Expand All @@ -130,15 +126,12 @@ void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & rig
else
new_left_cols.emplace_back(col.column, col.type, col.name);
auto left_header = left.getCurrentDataStream().header.getColumnsWithTypeAndName();
project = ActionsDAG::makeConvertingActions(left_header, new_left_cols, ActionsDAG::MatchColumnsMode::Position);
ActionsDAG left_project = ActionsDAG::makeConvertingActions(left_header, new_left_cols, ActionsDAG::MatchColumnsMode::Position);

if (project)
{
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(left.getCurrentDataStream(), project);
project_step->setStepDescription("Rename Left Table Name for broadcast join");
steps.emplace_back(project_step.get());
left.addStep(std::move(project_step));
}
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(left.getCurrentDataStream(), std::move(left_project));
project_step->setStepDescription("Rename Left Table Name for broadcast join");
steps.emplace_back(project_step.get());
left.addStep(std::move(project_step));
}

DB::QueryPlanPtr CrossRelParser::parseJoin(const substrait::CrossRel & join, DB::QueryPlanPtr left, DB::QueryPlanPtr right)
Expand Down Expand Up @@ -229,7 +222,7 @@ void CrossRelParser::addPostFilter(DB::QueryPlan & query_plan, const substrait::

auto expression = join_rel.expression();
std::string filter_name;
auto actions_dag = std::make_shared<ActionsDAG>(query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
ActionsDAG actions_dag(query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
if (!expression.has_scalar_function())
{
// It may be singular_or_list
Expand All @@ -238,9 +231,9 @@ void CrossRelParser::addPostFilter(DB::QueryPlan & query_plan, const substrait::
}
else
{
getPlanParser()->parseFunction(query_plan.getCurrentDataStream().header, expression, filter_name, actions_dag, true);
getPlanParser()->parseFunctionWithDAG(expression, filter_name, actions_dag, true);
}
auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(), actions_dag, filter_name, true);
auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(), std::move(actions_dag), filter_name, true);
filter_step->setStepDescription("Post Join Filter");
steps.emplace_back(filter_step.get());
query_plan.addStep(std::move(filter_step));
Expand Down Expand Up @@ -268,19 +261,19 @@ void CrossRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & left
}
if (!right_table_alias.empty())
{
ActionsDAGPtr rename_dag = std::make_shared<ActionsDAG>(right.getCurrentDataStream().header.getNamesAndTypesList());
ActionsDAG rename_dag(right.getCurrentDataStream().header.getNamesAndTypesList());
auto original_right_columns = right.getCurrentDataStream().header;
for (const auto & column_alias : right_table_alias)
{
if (original_right_columns.has(column_alias.first))
{
auto pos = original_right_columns.getPositionByName(column_alias.first);
const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], column_alias.second);
rename_dag->getOutputs()[pos] = &alias;
const auto & alias = rename_dag.addAlias(*rename_dag.getInputs()[pos], column_alias.second);
rename_dag.getOutputs()[pos] = &alias;
}
}

QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(right.getCurrentDataStream(), rename_dag);
QueryPlanStepPtr project_step = std::make_unique<ExpressionStep>(right.getCurrentDataStream(), std::move(rename_dag));
project_step->setStepDescription("Right Table Rename");
steps.emplace_back(project_step.get());
right.addStep(std::move(project_step));
Expand All @@ -290,22 +283,22 @@ void CrossRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & left
{
table_join.addJoinedColumn(column);
}
ActionsDAGPtr left_convert_actions = nullptr;
ActionsDAGPtr right_convert_actions = nullptr;
std::optional<ActionsDAG> left_convert_actions;
std::optional<ActionsDAG> right_convert_actions;
std::tie(left_convert_actions, right_convert_actions) = table_join.createConvertingActions(
left.getCurrentDataStream().header.getColumnsWithTypeAndName(), right.getCurrentDataStream().header.getColumnsWithTypeAndName());

if (right_convert_actions)
{
auto converting_step = std::make_unique<ExpressionStep>(right.getCurrentDataStream(), right_convert_actions);
auto converting_step = std::make_unique<ExpressionStep>(right.getCurrentDataStream(), std::move(*right_convert_actions));
converting_step->setStepDescription("Convert joined columns");
steps.emplace_back(converting_step.get());
right.addStep(std::move(converting_step));
}

if (left_convert_actions)
{
auto converting_step = std::make_unique<ExpressionStep>(left.getCurrentDataStream(), left_convert_actions);
auto converting_step = std::make_unique<ExpressionStep>(left.getCurrentDataStream(), std::move(*left_convert_actions));
converting_step->setStepDescription("Convert joined columns");
steps.emplace_back(converting_step.get());
left.addStep(std::move(converting_step));
Expand Down
Loading

0 comments on commit db79858

Please sign in to comment.