diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index 85045941cdee..ca25692bf013 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -47,6 +47,7 @@ add_subdirectory(proto) add_headers_and_sources(builder Builder) add_headers_and_sources(join Join) add_headers_and_sources(parser Parser) +add_headers_and_sources(parser Parser/RelParsers) add_headers_and_sources(rewriter Rewriter) add_headers_and_sources(storages Storages) add_headers_and_sources(storages Storages/Output) diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 94a214e5e571..b51683197c7e 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -48,7 +48,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp index 5f20c17bfb1f..bccdbded5149 100644 --- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp +++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp b/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp index b843d1565fce..d7f8ff6c03e5 100644 --- a/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp +++ b/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp similarity index 100% rename from cpp-ch/local-engine/Parser/AggregateRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.h similarity index 93% rename from cpp-ch/local-engine/Parser/AggregateRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.h index 8f68f858fc51..8b4ec9c4af9c 100644 --- a/cpp-ch/local-engine/Parser/AggregateRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.h @@ -16,7 +16,7 @@ */ #pragma once #include -#include +#include #include #include @@ -30,7 +30,7 @@ class AggregateRelParser : public RelParser ~AggregateRelParser() override = default; DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override; - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override { return rel.aggregate().input(); } + std::optional getSingleInput(const substrait::Rel & rel) override { return &rel.aggregate().input(); } private: struct AggregateInfo diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp similarity index 94% rename from cpp-ch/local-engine/Parser/CrossRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp index 3cb6ff7ede61..99ade18028a1 100644 --- a/cpp-ch/local-engine/Parser/CrossRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ #include "CrossRelParser.h" +#include #include #include @@ -73,12 +74,7 @@ CrossRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const substrait::Rel & /* throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call parse()."); } -const substrait::Rel & CrossRelParser::getSingleInput(const substrait::Rel & /*rel*/) -{ - throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); -} - -DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) +std::vector CrossRelParser::getInputs(const substrait::Rel & rel) { const auto & join = rel.cross(); if (!join.has_left() || !join.has_right()) @@ -86,12 +82,19 @@ DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, std::listparseOp(join.left(), rel_stack); - auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack); - rel_stack.pop_back(); + return {&join.left(), &join.right()}; +} +std::optional CrossRelParser::getSingleInput(const substrait::Rel & /*rel*/) +{ + throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); +} - return parseJoin(join, std::move(left_plan), std::move(right_plan)); +DB::QueryPlanPtr +CrossRelParser::parse(std::vector & input_plans_, const substrait::Rel & rel, std::list &) +{ + assert(input_plans_.size() == 2); + const auto & join = rel.cross(); + return parseJoin(join, std::move(input_plans_[0]), std::move(input_plans_[1])); } void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & right, const StorageJoinFromReadBuffer & storage_join) @@ -194,7 +197,8 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const substrait::CrossRel & join, DB: else { JoinPtr hash_join = std::make_shared(table_join, right->getCurrentDataStream().header.cloneEmpty()); - QueryPlanStepPtr join_step = std::make_unique(left->getCurrentDataStream(), right->getCurrentDataStream(), hash_join, 8192, 1, false); + QueryPlanStepPtr join_step + = std::make_unique(left->getCurrentDataStream(), right->getCurrentDataStream(), hash_join, 8192, 1, false); join_step->setStepDescription("CROSS_JOIN"); steps.emplace_back(join_step.get()); std::vector plans; diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h similarity index 70% rename from cpp-ch/local-engine/Parser/CrossRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h index f1cd60385e26..635a2fb629fb 100644 --- a/cpp-ch/local-engine/Parser/CrossRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h @@ -17,7 +17,8 @@ #pragma once #include -#include +#include +#include #include namespace DB @@ -37,12 +38,13 @@ class CrossRelParser : public RelParser explicit CrossRelParser(SerializedPlanParser * plan_paser_); ~CrossRelParser() override = default; + DB::QueryPlanPtr parse( + std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) override; DB::QueryPlanPtr - parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, std::list & rel_stack_) override; + parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override; - DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack) override; - - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override; + std::vector getInputs(const substrait::Rel & rel) override; + std::optional getSingleInput(const substrait::Rel & rel) override; private: std::unordered_map & function_mapping; @@ -55,7 +57,11 @@ class CrossRelParser : public RelParser void addConvertStep(TableJoin & table_join, DB::QueryPlan & left, DB::QueryPlan & right); void addPostFilter(DB::QueryPlan & query_plan, const substrait::CrossRel & join); bool applyJoinFilter( - DB::TableJoin & table_join, const substrait::CrossRel & join_rel, DB::QueryPlan & left, DB::QueryPlan & right, bool allow_mixed_condition); + DB::TableJoin & table_join, + const substrait::CrossRel & join_rel, + DB::QueryPlan & left, + DB::QueryPlan & right, + bool allow_mixed_condition); }; } diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp similarity index 99% rename from cpp-ch/local-engine/Parser/ExpandRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp index c621332db662..aaf98baf2205 100644 --- a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h similarity index 86% rename from cpp-ch/local-engine/Parser/ExpandRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h index 1ca7cc8149af..4cfa358b70b0 100644 --- a/cpp-ch/local-engine/Parser/ExpandRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h @@ -15,7 +15,8 @@ * limitations under the License. */ #pragma once -#include +#include +#include namespace local_engine @@ -29,6 +30,6 @@ class ExpandRelParser : public RelParser DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override; - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override { return rel.expand().input(); } + std::optional getSingleInput(const substrait::Rel & rel) override { return &rel.expand().input(); } }; } diff --git a/cpp-ch/local-engine/Parser/RelParsers/FetchRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/FetchRelParser.cpp new file mode 100644 index 000000000000..9594741a5e34 --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelParsers/FetchRelParser.cpp @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "RelParser.h" +namespace local_engine +{ +class FetchRelParser : public RelParser +{ +public: + explicit FetchRelParser(SerializedPlanParser * plan_parser_) : RelParser(plan_parser_) { } + ~FetchRelParser() override = default; + + DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) + { + const auto & limit = rel.fetch(); + auto limit_step = std::make_unique(query_plan->getCurrentDataStream(), limit.count(), limit.offset()); + limit_step->setStepDescription("LIMIT"); + steps.push_back(limit_step.get()); + query_plan->addStep(std::move(limit_step)); + return query_plan; + } + std::optional getSingleInput(const substrait::Rel & rel) override { return &rel.fetch().input(); } +}; + +void registerFetchRelParser(RelParserFactory & factory) +{ + auto builder = [](SerializedPlanParser * plan_parser_) { return std::make_unique(plan_parser_); }; + factory.registerBuilder(substrait::Rel::RelTypeCase::kFetch, builder); +} + +} diff --git a/cpp-ch/local-engine/Parser/FilterRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp similarity index 100% rename from cpp-ch/local-engine/Parser/FilterRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp diff --git a/cpp-ch/local-engine/Parser/FilterRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.h similarity index 86% rename from cpp-ch/local-engine/Parser/FilterRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.h index a7151595243f..cf0e633d56b4 100644 --- a/cpp-ch/local-engine/Parser/FilterRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.h @@ -17,7 +17,8 @@ #pragma once -#include +#include +#include #include #include @@ -29,11 +30,12 @@ class FilterRelParser : public RelParser explicit FilterRelParser(SerializedPlanParser * plan_paser_); ~FilterRelParser() override = default; - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override { return rel.filter().input(); } + std::optional getSingleInput(const substrait::Rel & rel) override { return &rel.filter().input(); } DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override; + private: // Poco::Logger * logger = &Poco::Logger::get("ProjectRelParser"); -}; +}; } diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp similarity index 95% rename from cpp-ch/local-engine/Parser/JoinRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp index 46b68a4d3b8f..39212cf8dcbf 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ #include "JoinRelParser.h" +#include #include #include @@ -27,10 +28,10 @@ #include #include #include +#include #include #include #include -#include #include #include #include @@ -56,8 +57,8 @@ namespace local_engine { std::shared_ptr createDefaultTableJoin(substrait::JoinRel_JoinType join_type, bool is_existence_join, ContextPtr & context) { - auto table_join = std::make_shared( - context->getSettingsRef(), context->getGlobalTemporaryVolume(), context->getTempDataOnDisk()); + auto table_join + = std::make_shared(context->getSettingsRef(), context->getGlobalTemporaryVolume(), context->getTempDataOnDisk()); std::pair kind_and_strictness = JoinUtil::getJoinKindAndStrictness(join_type, is_existence_join); table_join->setKind(kind_and_strictness.first); @@ -79,12 +80,7 @@ JoinRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const substrait::Rel & /*r throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call parse()."); } -const substrait::Rel & JoinRelParser::getSingleInput(const substrait::Rel & /*rel*/) -{ - throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); -} - -DB::QueryPlanPtr JoinRelParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) +std::vector JoinRelParser::getInputs(const substrait::Rel & rel) { const auto & join = rel.join(); if (!join.has_left() || !join.has_right()) @@ -92,12 +88,19 @@ DB::QueryPlanPtr JoinRelParser::parseOp(const substrait::Rel & rel, std::listparseOp(join.left(), rel_stack); - auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack); - rel_stack.pop_back(); + return {&join.left(), &join.right()}; +} +std::optional JoinRelParser::getSingleInput(const substrait::Rel & /*rel*/) +{ + throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); +} - return parseJoin(join, std::move(left_plan), std::move(right_plan)); +DB::QueryPlanPtr JoinRelParser::parse( + std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) +{ + assert(input_plans_.size() == 2); + const auto & join = rel.join(); + return parseJoin(join, std::move(input_plans_[0]), std::move(input_plans_[1])); } std::unordered_set JoinRelParser::extractTableSidesFromExpression( @@ -282,13 +285,22 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q auto input_header = left->getCurrentDataStream().header; DB::ActionsDAG filter_is_not_null_dag{input_header.getColumnsWithTypeAndName()}; // when is_null_aware_anti_join is true, there is only one join key - const auto * key_field = filter_is_not_null_dag.getInputs()[join.expression().scalar_function().arguments().at(0).value().selection().direct_reference().struct_field().field()]; + const auto * key_field = filter_is_not_null_dag.getInputs()[join.expression() + .scalar_function() + .arguments() + .at(0) + .value() + .selection() + .direct_reference() + .struct_field() + .field()]; auto result_node = filter_is_not_null_dag.tryFindInOutputs(key_field->result_name); // add a function isNotNull to filter the null key on the left side const auto * cond_node = plan_parser->toFunctionNode(filter_is_not_null_dag, "isNotNull", {result_node}); filter_is_not_null_dag.addOrReplaceInOutputs(*cond_node); - auto filter_step = std::make_unique(left->getCurrentDataStream(), std::move(filter_is_not_null_dag), cond_node->result_name, true); + auto filter_step = std::make_unique( + left->getCurrentDataStream(), std::move(filter_is_not_null_dag), cond_node->result_name, true); left->addStep(std::move(filter_step)); } // other case: is_empty_hash_table, don't need to handle @@ -342,8 +354,7 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q = couldRewriteToMultiJoinOnClauses(table_join->getOnlyClause(), join_on_clauses, join, left_header, right_header); if (is_multi_join_on_clauses && join_config.prefer_multi_join_on_clauses && join_opt_info.right_table_rows > 0 && join_opt_info.partitions_num > 0 - && join_opt_info.right_table_rows / join_opt_info.partitions_num - < join_config.multi_join_on_clauses_build_side_rows_limit) + && join_opt_info.right_table_rows / join_opt_info.partitions_num < join_config.multi_join_on_clauses_build_side_rows_limit) { query_plan = buildMultiOnClauseHashJoin(table_join, std::move(left), std::move(right), join_on_clauses); } diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h similarity index 84% rename from cpp-ch/local-engine/Parser/JoinRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h index 7e43187be308..42ebc5c0a98e 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include namespace DB @@ -42,9 +42,11 @@ class JoinRelParser : public RelParser DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, std::list & rel_stack_) override; - DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack) override; + DB::QueryPlanPtr parse( + std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) override; - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override; + std::vector getInputs(const substrait::Rel & rel) override; + std::optional getSingleInput(const substrait::Rel & rel) override; private: std::unordered_map & function_mapping; @@ -69,8 +71,8 @@ class JoinRelParser : public RelParser void existenceJoinPostProject(DB::QueryPlan & plan, const DB::Names & left_input_cols); - static std::unordered_set extractTableSidesFromExpression( - const substrait::Expression & expr, const DB::Block & left_header, const DB::Block & right_header); + static std::unordered_set + extractTableSidesFromExpression(const substrait::Expression & expr, const DB::Block & left_header, const DB::Block & right_header); bool couldRewriteToMultiJoinOnClauses( const DB::TableJoin::JoinOnClause & prefix_clause, diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp similarity index 100% rename from cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h similarity index 95% rename from cpp-ch/local-engine/Parser/MergeTreeRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h index 94b4809d3969..5e7be5fa43cd 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h @@ -17,9 +17,10 @@ #pragma once #include +#include #include -#include +#include namespace DB { @@ -50,7 +51,7 @@ class MergeTreeRelParser : public RelParser DB::QueryPlanPtr parseReadRel( DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel, const substrait::ReadRel::ExtensionTable & extension_table); - const substrait::Rel & getSingleInput(const substrait::Rel &) override + std::optional getSingleInput(const substrait::Rel &) override { throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser can't call getSingleInput()."); } diff --git a/cpp-ch/local-engine/Parser/ProjectRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp similarity index 100% rename from cpp-ch/local-engine/Parser/ProjectRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp diff --git a/cpp-ch/local-engine/Parser/ProjectRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h similarity index 86% rename from cpp-ch/local-engine/Parser/ProjectRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h index 94accff2dc51..87f5e20edee5 100644 --- a/cpp-ch/local-engine/Parser/ProjectRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h @@ -15,9 +15,10 @@ * limitations under the License. */ #pragma once +#include #include #include -#include +#include #include namespace local_engine @@ -29,7 +30,7 @@ class ProjectRelParser : public RelParser { ActionsDAG before_array_join; /// Optional ActionsDAG array_join; - ActionsDAG after_array_join; /// Optional + ActionsDAG after_array_join; /// Optional }; explicit ProjectRelParser(SerializedPlanParser * plan_paser_); @@ -44,21 +45,21 @@ class ProjectRelParser : public RelParser DB::QueryPlanPtr parseProject(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_); DB::QueryPlanPtr parseGenerate(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_); - static const DB::ActionsDAG::Node * findArrayJoinNode(const ActionsDAG& actions_dag); + static const DB::ActionsDAG::Node * findArrayJoinNode(const ActionsDAG & actions_dag); /// Split actions_dag of generate rel into 3 parts: before array join + during array join + after array join - static SplittedActionsDAGs splitActionsDAGInGenerate(const ActionsDAG& actions_dag); + static SplittedActionsDAGs splitActionsDAGInGenerate(const ActionsDAG & actions_dag); bool isReplicateRows(substrait::GenerateRel rel); DB::QueryPlanPtr parseReplicateRows(QueryPlanPtr query_plan, substrait::GenerateRel generate_rel); - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override + std::optional getSingleInput(const substrait::Rel & rel) override { if (rel.has_generate()) - return rel.generate().input(); + return &rel.generate().input(); - return rel.project().input(); + return &rel.project().input(); } }; } diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp new file mode 100644 index 000000000000..5532baca5342 --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ReadRelParser.h" +#include + +namespace DB::ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + +namespace local_engine +{ +DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) +{ + if (query_plan) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null"); + const auto & read = rel.read(); + if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read))) + { + assert(read.has_base_schema()); + DB::QueryPlanStepPtr read_step; + if (isReadRelFromJava(read)) + read_step = parseReadRelWithJavaIter(read); + else + read_step = parseReadRelWithLocalFile(read); + query_plan = std::make_unique(); + steps.emplace_back(read_step.get()); + query_plan->addStep(std::move(read_step)); + + if (getContext()->getSettingsRef().max_threads > 1) + { + auto buffer_step = std::make_unique(query_plan->getCurrentDataStream()); + steps.emplace_back(buffer_step.get()); + query_plan->addStep(std::move(buffer_step)); + } + } + else + { + substrait::ReadRel::ExtensionTable extension_table; + if (read.has_extension_table()) + extension_table = read.extension_table(); + else + { + extension_table = BinaryToMessage(split_info); + logDebugMessage(extension_table, "extension_table"); + } + MergeTreeRelParser mergeTreeParser(getPlanParser(), getContext()); + query_plan = mergeTreeParser.parseReadRel(std::make_unique(), read, extension_table); + steps = mergeTreeParser.getSteps(); + } + return query_plan; +} + +bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel) +{ + return rel.has_local_files() && rel.local_files().items().size() == 1 + && rel.local_files().items().at(0).uri_file().starts_with("iterator"); +} + +bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel) +{ + assert(rel.has_advanced_extension()); + bool is_read_from_merge_tree; + google::protobuf::StringValue optimization; + optimization.ParseFromString(rel.advanced_extension().optimization().value()); + ReadBufferFromString in(optimization.value()); + if (!checkString("isMergeTree=", in)) + return false; + readBoolText(is_read_from_merge_tree, in); + assertChar('\n', in); + return is_read_from_merge_tree; +} + +DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const substrait::ReadRel & rel) +{ + GET_JNIENV(env) + SCOPE_EXIT({CLEAN_JNIENV}); + auto first_block = SourceFromJavaIter::peekBlock(env, input_iter); + + /// Try to decide header from the first block read from Java iterator. Thus AggregateFunction with parameters has more precise types. + auto header = first_block.has_value() ? first_block->cloneEmpty() : TypeParser::buildBlockFromNamedStruct(rel.base_schema()); + auto source = std::make_shared( + getContext(), std::move(header), input_iter, is_input_iter_materialize, std::move(first_block)); + + QueryPlanStepPtr source_step = std::make_unique(Pipe(source)); + source_step->setStepDescription("Read From Java Iter"); + return source_step; +} + +QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadRel & rel) +{ + auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema()); + substrait::ReadRel::LocalFiles local_files; + if (rel.has_local_files()) + local_files = rel.local_files(); + else + { + local_files = BinaryToMessage(getPlanParser()->nextSplitInfo()); + logDebugMessage(local_files, "local_files"); + } + auto source = std::make_shared(getContext(), header, local_files); + auto source_pipe = Pipe(source); + auto source_step = std::make_unique(getContext(), std::move(source_pipe), "substrait local files"); + source_step->setStepDescription("read local files"); + if (rel.has_filter()) + { + DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)}; + const DB::ActionsDAG::Node * filter_node = parseExpression(actions_dag, rel.filter()); + actions_dag.addOrReplaceInOutputs(*filter_node); + assert(filter_node == &(actions_dag.findInOutputs(filter_node->result_name))); + source_step->addFilter(std::move(actions_dag), filter_node->result_name); + } + return source_step; +} + +void registerReadRelParser(RelParserFactory & factory) +{ + auto builder = [](SerializedPlanParser * plan_parser_) { return std::make_unique(plan_parser_); }; + factory.registerBuilder(substrait::Rel::RelTypeCase::kRead, builder); +} +} diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h new file mode 100644 index 000000000000..8f9c578fba5e --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class ReadRelParser : public RelParser +{ +public: + explicit ReadRelParser(SerializedPlanParser * plan_parser_) : RelParser(plan_parser_) { } + ~ReadRelParser() override = default; + + DB::QueryPlanPtr + parse(std::vector &, const substrait::Rel & rel, std::list & rel_stack_) override + { + DB::QueryPlanPtr query_plan; + return parse(std::move(query_plan), rel, rel_stack_); + } + + DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) override; + // This is source node, there is no input + std::optional getSingleInput(const substrait::Rel & rel) override { return {}; } + + bool isReadRelFromJava(const substrait::ReadRel & rel); + bool isReadFromMergeTree(const substrait::ReadRel & rel); + + void setInputIter(jobject input_iter_, bool is_materialze) + { + input_iter = input_iter_; + is_input_iter_materialize = is_materialze; + } + + void setSplitInfo(String split_info_) { split_info = split_info_; } + +private: + jobject input_iter; + bool is_input_iter_materialize; + String split_info; + DB::QueryPlanStepPtr parseReadRelWithJavaIter(const substrait::ReadRel & rel); + QueryPlanStepPtr parseReadRelWithLocalFile(const substrait::ReadRel & rel); +}; +} diff --git a/cpp-ch/local-engine/Parser/RelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp similarity index 89% rename from cpp-ch/local-engine/Parser/RelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp index a7f6d0586455..c5e7d4b4788b 100644 --- a/cpp-ch/local-engine/Parser/RelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp @@ -22,9 +22,10 @@ #include #include #include +#include #include #include - +#include namespace DB { @@ -37,6 +38,14 @@ extern const int LOGICAL_ERROR; namespace local_engine { + +std::vector RelParser::getInputs(const substrait::Rel & rel) +{ + auto input = getSingleInput(rel); + if (!input) + return {}; + return {*input}; +} AggregateFunctionPtr RelParser::getAggregateFunction( const String & name, DB::DataTypes arg_types, DB::AggregateFunctionProperties & properties, const DB::Array & parameters) { @@ -80,13 +89,12 @@ std::optional RelParser::parseFunctionName(UInt32 function_ref, const su } return plan_parser->getFunctionName(*sigature_name, function); } -DB::QueryPlanPtr RelParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) + +DB::QueryPlanPtr +RelParser::parse(std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) { - SerializedPlanParser & planParser = *getPlanParser(); - rel_stack.push_back(&rel); - auto query_plan = planParser.parseOp(getSingleInput(rel), rel_stack); - rel_stack.pop_back(); - return parse(std::move(query_plan), rel, rel_stack); + assert(input_plans_.size() == 1); + return parse(std::move(input_plans_[0]), rel, rel_stack_); } std::map @@ -160,6 +168,8 @@ void registerProjectRelParser(RelParserFactory & factory); void registerJoinRelParser(RelParserFactory & factory); void registerFilterRelParser(RelParserFactory & factory); void registerCrossRelParser(RelParserFactory & factory); +void registerFetchRelParser(RelParserFactory & factory); +void registerReadRelParser(RelParserFactory & factory); void registerRelParsers() { @@ -173,5 +183,7 @@ void registerRelParsers() registerJoinRelParser(factory); registerCrossRelParser(factory); registerFilterRelParser(factory); + registerFetchRelParser(factory); + registerReadRelParser(factory); } } diff --git a/cpp-ch/local-engine/Parser/RelParser.h b/cpp-ch/local-engine/Parser/RelParsers/RelParser.h similarity index 78% rename from cpp-ch/local-engine/Parser/RelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/RelParser.h index 885622281eaa..2e38c30c69d1 100644 --- a/cpp-ch/local-engine/Parser/RelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/RelParser.h @@ -39,8 +39,10 @@ class RelParser virtual DB::QueryPlanPtr parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list & rel_stack_) = 0; - virtual DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack); - virtual const substrait::Rel & getSingleInput(const substrait::Rel & rel) = 0; + virtual DB::QueryPlanPtr + parse(std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_); + virtual std::vector getInputs(const substrait::Rel & rel); + virtual std::optional getSingleInput(const substrait::Rel & rel) = 0; const std::vector & getSteps() const { return steps; } static AggregateFunctionPtr getAggregateFunction( @@ -59,12 +61,12 @@ class RelParser // Get coresponding function name in ClickHouse. std::optional parseFunctionName(UInt32 function_ref, const substrait::Expression_ScalarFunction & function); - const DB::ActionsDAG::Node * parseArgument(ActionsDAG& action_dag, const substrait::Expression & rel) + const DB::ActionsDAG::Node * parseArgument(ActionsDAG & action_dag, const substrait::Expression & rel) { return plan_parser->parseExpression(action_dag, rel); } - const DB::ActionsDAG::Node * parseExpression(ActionsDAG& action_dag, const substrait::Expression & rel) + const DB::ActionsDAG::Node * parseExpression(ActionsDAG & action_dag, const substrait::Expression & rel) { return plan_parser->parseExpression(action_dag, rel); } @@ -77,13 +79,15 @@ class RelParser std::vector steps; const ActionsDAG::Node * - buildFunctionNode(ActionsDAG& action_dag, const String & function, const DB::ActionsDAG::NodeRawConstPtrs & args) + buildFunctionNode(ActionsDAG & action_dag, const String & function, const DB::ActionsDAG::NodeRawConstPtrs & args) { return plan_parser->toFunctionNode(action_dag, function, args); } - static std::map parseFormattedRelAdvancedOptimization(const substrait::extensions::AdvancedExtension &advanced_extension); - static std::string getStringConfig(const std::map & configs, const std::string & key, const std::string & default_value = ""); + static std::map + parseFormattedRelAdvancedOptimization(const substrait::extensions::AdvancedExtension & advanced_extension); + static std::string + getStringConfig(const std::map & configs, const std::string & key, const std::string & default_value = ""); SerializedPlanParser * plan_parser; }; diff --git a/cpp-ch/local-engine/Parser/SortRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.cpp similarity index 99% rename from cpp-ch/local-engine/Parser/SortRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/SortRelParser.cpp index 8fb97d6da5dd..48e29234e1a2 100644 --- a/cpp-ch/local-engine/Parser/SortRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.cpp @@ -17,7 +17,7 @@ #include "SortRelParser.h" #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/SortRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.h similarity index 88% rename from cpp-ch/local-engine/Parser/SortRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/SortRelParser.h index 5426b267bae1..2b9d0915f65c 100644 --- a/cpp-ch/local-engine/Parser/SortRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/SortRelParser.h @@ -15,9 +15,10 @@ * limitations under the License. */ #pragma once +#include #include #include -#include +#include #include namespace local_engine { @@ -32,7 +33,7 @@ class SortRelParser : public RelParser static DB::SortDescription parseSortDescription(const google::protobuf::RepeatedPtrField & sort_fields, const DB::Block & header); - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override { return rel.sort().input(); } + std::optional getSingleInput(const substrait::Rel & rel) override { return &rel.sort().input(); } private: size_t parseLimit(std::list & rel_stack_); diff --git a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp similarity index 96% rename from cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp index f6c10386f405..75488937e0a4 100644 --- a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp @@ -16,15 +16,11 @@ */ #include "WindowGroupLimitRelParser.h" -#include #include #include -#include -#include #include #include #include -#include "AdvancedParametersParseUtil.h" namespace DB::ErrorCodes { diff --git a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h similarity index 84% rename from cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h index c9c503ed4745..7939d232ce9e 100644 --- a/cpp-ch/local-engine/Parser/WindowGroupLimitRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h @@ -15,13 +15,8 @@ * limitations under the License. */ #pragma once -#include -#include -#include -#include -#include -#include -#include +#include +#include #include #include #include @@ -40,7 +35,7 @@ class WindowGroupLimitRelParser : public RelParser ~WindowGroupLimitRelParser() override = default; DB::QueryPlanPtr parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list & rel_stack_) override; - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override { return rel.windowgrouplimit().input(); } + std::optional getSingleInput(const substrait::Rel & rel) override { return &rel.windowgrouplimit().input(); } private: DB::QueryPlanPtr current_plan; diff --git a/cpp-ch/local-engine/Parser/WindowRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.cpp similarity index 99% rename from cpp-ch/local-engine/Parser/WindowRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.cpp index 0676924c1f57..7506daabf3dd 100644 --- a/cpp-ch/local-engine/Parser/WindowRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.cpp @@ -29,8 +29,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/WindowRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.h similarity index 95% rename from cpp-ch/local-engine/Parser/WindowRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.h index 6610915fb586..6168bebe4c82 100644 --- a/cpp-ch/local-engine/Parser/WindowRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/WindowRelParser.h @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -35,7 +35,7 @@ class WindowRelParser : public RelParser ~WindowRelParser() override = default; DB::QueryPlanPtr parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list & rel_stack_) override; - const substrait::Rel & getSingleInput(const substrait::Rel & rel) override { return rel.window().input(); } + std::optional getSingleInput(const substrait::Rel & rel) override { return &rel.window().input(); } private: struct WindowInfo diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp similarity index 100% rename from cpp-ch/local-engine/Parser/WriteRelParser.cpp rename to cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.h similarity index 100% rename from cpp-ch/local-engine/Parser/WriteRelParser.h rename to cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.h diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index e893dd35b2ce..aa4bf5aac404 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -52,14 +52,16 @@ #include #include #include -#include -#include +#include +#include +#include +#include +#include #include #include -#include #include #include -#include +#include #include #include #include @@ -83,8 +85,7 @@ #include #include #include -#include -#include +#include "RelParsers/RelParser.h" namespace DB { @@ -117,7 +118,7 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v, char c) return res; } -const ActionsDAG::Node * SerializedPlanParser::addColumn(ActionsDAG& actions_dag, const DataTypePtr & type, const Field & field) +const ActionsDAG::Node * SerializedPlanParser::addColumn(ActionsDAG & actions_dag, const DataTypePtr & type, const Field & field) { return &actions_dag.addColumn( ColumnWithTypeAndName(type->createColumnConst(1, field), type, getUniqueName(toString(field).substr(0, 10)))); @@ -242,74 +243,6 @@ std::string getDecimalFunction(const substrait::Type_Decimal & decimal, bool nul return ch_function_name; } -bool SerializedPlanParser::isReadRelFromJava(const substrait::ReadRel & rel) -{ - return rel.has_local_files() && rel.local_files().items().size() == 1 - && rel.local_files().items().at(0).uri_file().starts_with("iterator"); -} - -bool SerializedPlanParser::isReadFromMergeTree(const substrait::ReadRel & rel) -{ - assert(rel.has_advanced_extension()); - bool is_read_from_merge_tree; - google::protobuf::StringValue optimization; - optimization.ParseFromString(rel.advanced_extension().optimization().value()); - ReadBufferFromString in(optimization.value()); - assertString("isMergeTree=", in); - readBoolText(is_read_from_merge_tree, in); - assertChar('\n', in); - return is_read_from_merge_tree; -} - -QueryPlanStepPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrait::ReadRel & rel) -{ - auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema()); - substrait::ReadRel::LocalFiles local_files; - if (rel.has_local_files()) - local_files = rel.local_files(); - else - { - local_files = BinaryToMessage(split_infos.at(nextSplitInfoIndex())); - logDebugMessage(local_files, "local_files"); - } - auto source = std::make_shared(context, header, local_files); - auto source_pipe = Pipe(source); - auto source_step = std::make_unique(context, std::move(source_pipe), "substrait local files"); - source_step->setStepDescription("read local files"); - if (rel.has_filter()) - { - ActionsDAG actions_dag{blockToNameAndTypeList(header)}; - const ActionsDAG::Node * filter_node = parseExpression(actions_dag, rel.filter()); - actions_dag.addOrReplaceInOutputs(*filter_node); - assert(filter_node == &(actions_dag.findInOutputs(filter_node->result_name))); - source_step->addFilter(std::move(actions_dag), filter_node->result_name); - } - return source_step; -} - -QueryPlanStepPtr SerializedPlanParser::parseReadRealWithJavaIter(const substrait::ReadRel & rel) -{ - assert(rel.has_local_files()); - assert(rel.local_files().items().size() == 1); - auto iter = rel.local_files().items().at(0).uri_file(); - auto pos = iter.find(':'); - auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); - jobject input_iter = input_iters[iter_index]; - bool materialize_input = materialize_inputs[iter_index]; - - GET_JNIENV(env) - SCOPE_EXIT({CLEAN_JNIENV}); - auto first_block = SourceFromJavaIter::peekBlock(env, input_iter); - - /// Try to decide header from the first block read from Java iterator. Thus AggregateFunction with parameters has more precise types. - auto header = first_block.has_value() ? first_block->cloneEmpty() : TypeParser::buildBlockFromNamedStruct(rel.base_schema()); - auto source = std::make_shared(context, std::move(header), input_iter, materialize_input, std::move(first_block)); - - QueryPlanStepPtr source_step = std::make_unique(Pipe(source)); - source_step->setStepDescription("Read From Java Iter"); - return source_step; -} - IQueryPlanStep * SerializedPlanParser::addRemoveNullableStep(QueryPlan & plan, const std::set & columns) { if (columns.empty()) @@ -345,7 +278,11 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel NamesWithAliases aliases; auto cols = query_plan->getCurrentDataStream().header.getNamesAndTypesList(); if (cols.getNames().size() != static_cast(root_rel.root().names_size())) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size. plan column size {}, subtrait plan size {}.", cols.getNames().size(), root_rel.root().names_size()); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Missmatch result columns size. plan column size {}, subtrait plan size {}.", + cols.getNames().size(), + root_rel.root().names_size()); for (int i = 0; i < static_cast(cols.getNames().size()); i++) aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i))); actions_dag.project(aliases); @@ -433,89 +370,55 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan) } std::unique_ptr SerializedPlanParser::createExecutor(const substrait::Plan & plan) -{ return createExecutor(parse(plan), plan); } +{ + return createExecutor(parse(plan), plan); +} QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) { - QueryPlanPtr query_plan; - std::vector steps; - switch (rel.rel_type_case()) - { - case substrait::Rel::RelTypeCase::kFetch: { - rel_stack.push_back(&rel); - const auto & limit = rel.fetch(); - query_plan = parseOp(limit.input(), rel_stack); - rel_stack.pop_back(); - auto limit_step = std::make_unique(query_plan->getCurrentDataStream(), limit.count(), limit.offset()); - limit_step->setStepDescription("LIMIT"); - steps.emplace_back(limit_step.get()); - query_plan->addStep(std::move(limit_step)); - break; - } - case substrait::Rel::RelTypeCase::kRead: { - const auto & read = rel.read(); - // TODO: We still maintain the old logic of parsing LocalFiles or ExtensionTable in RealRel - // to be compatiable with some suites about metrics. - // Remove this compatiability in later and then only java iter has local files in ReadRel. - if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read))) - { - assert(read.has_base_schema()); - QueryPlanStepPtr step; - if (isReadRelFromJava(read)) - step = parseReadRealWithJavaIter(read); - else - step = parseReadRealWithLocalFile(read); + DB::QueryPlanPtr query_plan; + auto rel_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(this); - query_plan = std::make_unique(); - steps.emplace_back(step.get()); - query_plan->addStep(std::move(step)); + auto all_input_rels = rel_parser->getInputs(rel); + std::vector input_query_plans; + rel_stack.push_back(&rel); + for (const auto * input_rel : all_input_rels) + { + auto input_query_plan = parseOp(*input_rel, rel_stack); + input_query_plans.push_back(std::move(input_query_plan)); + } + rel_stack.pop_back(); - // Add a buffer after source, it try to preload data from source and reduce the - // waiting time of downstream nodes. - if (context->getSettingsRef().max_threads > 1) - { - auto buffer_step = std::make_unique(query_plan->getCurrentDataStream()); - steps.emplace_back(buffer_step.get()); - query_plan->addStep(std::move(buffer_step)); - } - } - else + // source node is special + if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead) + { + assert(all_input_rels.empty()); + auto read_rel_parser = std::dynamic_pointer_cast(rel_parser); + const auto & read = rel.read(); + if (read.has_local_files()) + { + if (read_rel_parser->isReadRelFromJava(read)) { - substrait::ReadRel::ExtensionTable extension_table; - if (read.has_extension_table()) - extension_table = read.extension_table(); - else - { - extension_table = BinaryToMessage(split_infos.at(nextSplitInfoIndex())); - logDebugMessage(extension_table, "extension_table"); - } - - MergeTreeRelParser mergeTreeParser(this, context); - query_plan = mergeTreeParser.parseReadRel(std::make_unique(), read, extension_table); - steps = mergeTreeParser.getSteps(); + auto iter = read.local_files().items().at(0).uri_file(); + auto pos = iter.find(':'); + auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); + auto [input_iter, materalize_input] = getInputIter(static_cast(iter_index)); + read_rel_parser->setInputIter(input_iter, materalize_input); } - break; } - case substrait::Rel::RelTypeCase::kFilter: - case substrait::Rel::RelTypeCase::kGenerate: - case substrait::Rel::RelTypeCase::kProject: - case substrait::Rel::RelTypeCase::kAggregate: - case substrait::Rel::RelTypeCase::kSort: - case substrait::Rel::RelTypeCase::kWindow: - case substrait::Rel::RelTypeCase::kJoin: - case substrait::Rel::RelTypeCase::kCross: - case substrait::Rel::RelTypeCase::kWindowGroupLimit: - case substrait::Rel::RelTypeCase::kExpand: { - auto op_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(this); - query_plan = op_parser->parseOp(rel, rel_stack); - auto parser_steps = op_parser->getSteps(); - steps.insert(steps.end(), parser_steps.begin(), parser_steps.end()); - break; + else if (read_rel_parser->isReadFromMergeTree(read)) + { + if (!read.has_extension_table()) + { + read_rel_parser->setSplitInfo(nextSplitInfo()); + } } - default: - throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support relation type: {}.\n{}", rel.rel_type_case(), rel.DebugString()); } + query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); + + std::vector steps = rel_parser->getSteps(); + if (!context->getSettingsRef().query_plan_enable_optimizations) { if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead) @@ -1321,215 +1224,6 @@ SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : contex { } -void SerializedPlanParser::collectJoinKeys( - const substrait::Expression & condition, std::vector> & join_keys, int32_t right_key_start) -{ - auto condition_name = getFunctionName( - function_mapping.at(std::to_string(condition.scalar_function().function_reference())), condition.scalar_function()); - if (condition_name == "and") - { - collectJoinKeys(condition.scalar_function().arguments(0).value(), join_keys, right_key_start); - collectJoinKeys(condition.scalar_function().arguments(1).value(), join_keys, right_key_start); - } - else if (condition_name == "equals") - { - const auto & function = condition.scalar_function(); - auto left_key_idx = function.arguments(0).value().selection().direct_reference().struct_field().field(); - auto right_key_idx = function.arguments(1).value().selection().direct_reference().struct_field().field() - right_key_start; - join_keys.emplace_back(std::pair(left_key_idx, right_key_idx)); - } - else - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "doesn't support condition {}", condition_name); - } -} - -ActionsDAG ASTParser::convertToActions(const NamesAndTypesList & name_and_types, const ASTPtr & ast) const -{ - NamesAndTypesList aggregation_keys; - ColumnNumbersList aggregation_keys_indexes_list; - AggregationKeysInfo info(aggregation_keys, aggregation_keys_indexes_list, GroupByKind::NONE); - SizeLimits size_limits_for_set; - ActionsMatcher::Data visitor_data( - context, - size_limits_for_set, - static_cast(0), - name_and_types, - ActionsDAG(name_and_types), - std::make_shared(), - false /* no_subqueries */, - false /* no_makeset */, - false /* only_consts */, - info); - ActionsVisitor(visitor_data).visit(ast); - return visitor_data.getActions(); -} - -ASTPtr ASTParser::parseToAST(const Names & names, const substrait::Expression & rel) -{ - LOG_DEBUG(&Poco::Logger::get("ASTParser"), "substrait plan:\n{}", rel.DebugString()); - if (rel.has_scalar_function()) - { - const auto & scalar_function = rel.scalar_function(); - auto function_signature = function_mapping.at(std::to_string(rel.scalar_function().function_reference())); - - auto substrait_name = function_signature.substr(0, function_signature.find(':')); - auto func_parser = FunctionParserFactory::instance().tryGet(substrait_name, plan_parser); - String function_name = func_parser->getName(); - - ASTs ast_args; - parseFunctionArgumentsToAST(names, scalar_function, ast_args); - - return makeASTFunction(function_name, ast_args); - } - else - return parseArgumentToAST(names, rel); -} - -void ASTParser::parseFunctionArgumentsToAST( - const Names & names, const substrait::Expression_ScalarFunction & scalar_function, ASTs & ast_args) -{ - const auto & args = scalar_function.arguments(); - - for (const auto & arg : args) - { - if (arg.value().has_scalar_function()) - { - ast_args.emplace_back(parseToAST(names, arg.value())); - } - else - { - ast_args.emplace_back(parseArgumentToAST(names, arg.value())); - } - } -} - -ASTPtr ASTParser::parseArgumentToAST(const Names & names, const substrait::Expression & rel) -{ - switch (rel.rex_type_case()) - { - case substrait::Expression::RexTypeCase::kLiteral: { - DataTypePtr type; - Field field; - std::tie(type, field) = SerializedPlanParser::parseLiteral(rel.literal()); - return std::make_shared(field, type); - } - case substrait::Expression::RexTypeCase::kSelection: { - if (!rel.selection().has_direct_reference() || !rel.selection().direct_reference().has_struct_field()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can only have direct struct references in selections"); - - const auto field = rel.selection().direct_reference().struct_field().field(); - return std::make_shared(names[field]); - } - case substrait::Expression::RexTypeCase::kCast: { - if (!rel.cast().has_type() || !rel.cast().has_input()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Doesn't have type or input in cast node."); - - /// Append input to asts - ASTs args; - args.emplace_back(parseArgumentToAST(names, rel.cast().input())); - - /// Append destination type to asts - const auto & substrait_type = rel.cast().type(); - /// Spark cast(x as BINARY) -> CH reinterpretAsStringSpark(x) - if (substrait_type.has_binary()) - return makeASTFunction("reinterpretAsStringSpark", args); - else - { - DataTypePtr ch_type = TypeParser::parseType(substrait_type); - args.emplace_back(std::make_shared(ch_type->getName())); - - return makeASTFunction("CAST", args); - } - } - case substrait::Expression::RexTypeCase::kIfThen: { - const auto & if_then = rel.if_then(); - auto condition_nums = if_then.ifs_size(); - std::string ch_function_name = condition_nums == 1 ? "if" : "multiIf"; - auto function_multi_if = FunctionFactory::instance().get(ch_function_name, context); - ASTs args; - - for (int i = 0; i < condition_nums; ++i) - { - const auto & ifs = if_then.ifs(i); - auto if_node = parseArgumentToAST(names, ifs.if_()); - args.emplace_back(if_node); - - auto then_node = parseArgumentToAST(names, ifs.then()); - args.emplace_back(then_node); - } - - auto else_node = parseArgumentToAST(names, if_then.else_()); - args.emplace_back(std::move(else_node)); - return makeASTFunction(ch_function_name, args); - } - case substrait::Expression::RexTypeCase::kScalarFunction: { - return parseToAST(names, rel); - } - case substrait::Expression::RexTypeCase::kSingularOrList: { - const auto & options = rel.singular_or_list().options(); - /// options is empty always return false - if (options.empty()) - return std::make_shared(0); - /// options should be literals - if (!options[0].has_literal()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Options of SingularOrList must have literal type"); - - ASTs args; - args.emplace_back(parseArgumentToAST(names, rel.singular_or_list().value())); - - bool nullable = false; - size_t options_len = options.size(); - ASTs in_args; - in_args.reserve(options_len); - - for (int i = 0; i < static_cast(options_len); ++i) - { - if (!options[i].has_literal()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "in expression values must be the literal!"); - if (!nullable) - nullable = options[i].literal().has_null(); - } - - auto elem_type_and_field = SerializedPlanParser::parseLiteral(options[0].literal()); - DataTypePtr elem_type = wrapNullableType(nullable, elem_type_and_field.first); - for (int i = 0; i < static_cast(options_len); ++i) - { - auto type_and_field = SerializedPlanParser::parseLiteral(options[i].literal()); - auto option_type = wrapNullableType(nullable, type_and_field.first); - if (!elem_type->equals(*option_type)) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "SingularOrList options type mismatch:{} and {}", - elem_type->getName(), - option_type->getName()); - - in_args.emplace_back(std::make_shared(type_and_field.second)); - } - auto array_ast = makeASTFunction("array", in_args); - args.emplace_back(array_ast); - - auto ast = makeASTFunction("in", args); - if (nullable) - { - /// if sets has `null` and value not in sets - /// In Spark: return `null`, is the standard behaviour from ANSI.(SPARK-37920) - /// In CH: return `false` - /// So we used if(a, b, c) cast `false` to `null` if sets has `null` - ast = makeASTFunction("if", ast, std::make_shared(true), std::make_shared(Field())); - } - - return ast; - } - default: - throw Exception( - ErrorCodes::UNKNOWN_TYPE, - "Join on condition error. Unsupported spark expression type {} : {}", - magic_enum::enum_name(rel.rex_type_case()), - rel.DebugString()); - } -} - void SerializedPlanParser::removeNullableForRequiredColumns(const std::set & require_columns, ActionsDAG & actions_dag) const { for (const auto & item : require_columns) diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index 88ebb00872a3..c9a48106a9b3 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -90,18 +90,19 @@ class SerializedPlanParser /// std::unique_ptr createExecutor(const std::string_view plan); - DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel & rel); - DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel & rel); - - static bool isReadRelFromJava(const substrait::ReadRel & rel); - static bool isReadFromMergeTree(const substrait::ReadRel & rel); - void addInputIter(jobject iter, bool materialize_input) { input_iters.emplace_back(iter); materialize_inputs.emplace_back(materialize_input); } + std::pair getInputIter(size_t index) + { + if (index > input_iters.size()) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Index({}) is overflow input_iters's size({})", index, input_iters.size()); + return {input_iters[index], materialize_inputs[index]}; + } + void addSplitInfo(std::string && split_info) { split_infos.emplace_back(std::move(split_info)); } int nextSplitInfoIndex() @@ -115,6 +116,12 @@ class SerializedPlanParser return split_info_index++; } + const String & nextSplitInfo() + { + auto next_index = nextSplitInfoIndex(); + return split_infos.at(next_index); + } + void parseExtensions(const ::google::protobuf::RepeatedPtrField & extensions); DB::ActionsDAG expressionsToActionsDAG( const std::vector & expressions, const DB::Block & header, const DB::Block & read_schema); @@ -133,8 +140,6 @@ class SerializedPlanParser private: DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack); - void - collectJoinKeys(const substrait::Expression & condition, std::vector> & join_keys, int32_t right_key_start); void parseFunctionOrExpression( const substrait::Expression & rel, std::string & result_name, DB::ActionsDAG & actions_dag, bool keep_result = false); @@ -185,34 +190,10 @@ class SerializedPlanParser int split_info_index = 0; std::vector materialize_inputs; ContextPtr context; - // for parse rel node, collect steps from a rel node - std::vector temp_step_collection; std::vector metrics; public: const ActionsDAG::Node * addColumn(DB::ActionsDAG & actions_dag, const DataTypePtr & type, const Field & field); }; -class ASTParser -{ -public: - explicit ASTParser( - const ContextPtr & context_, std::unordered_map & function_mapping_, SerializedPlanParser * plan_parser_) - : context(context_), function_mapping(function_mapping_), plan_parser(plan_parser_) - { - } - - ~ASTParser() = default; - - ASTPtr parseToAST(const Names & names, const substrait::Expression & rel); - ActionsDAG convertToActions(const NamesAndTypesList & name_and_types, const ASTPtr & ast) const; - -private: - ContextPtr context; - std::unordered_map function_mapping; - SerializedPlanParser * plan_parser; - - void parseFunctionArgumentsToAST(const Names & names, const substrait::Expression_ScalarFunction & scalar_function, ASTs & ast_args); - ASTPtr parseArgumentToAST(const Names & names, const substrait::Expression & rel); -}; } diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp b/cpp-ch/local-engine/Parser/TypeParser.cpp index 84c9362269de..7796953e250c 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.cpp +++ b/cpp-ch/local-engine/Parser/TypeParser.cpp @@ -31,7 +31,7 @@ #include #include #include -#include +#include #include #include #include @@ -334,4 +334,4 @@ DB::DataTypePtr TypeParser::tryWrapNullable(substrait::Type_Nullability nullable return std::make_shared(nested_type); return nested_type; } -} \ No newline at end of file +} diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c1923ae592e9..68c445863133 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -23,13 +23,14 @@ #include #include #include -#include -#include -#include #include +#include +#include +#include +#include #include #include -#include +#include #include #include #include @@ -53,12 +54,10 @@ #include #include #include +#include #include #include #include -#include -#include -#include #ifdef __cplusplus namespace DB @@ -515,18 +514,15 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHNativeBlock_nativeTotalBytes LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, jobject obj, jlong block_address, jint column_position) +JNIEXPORT jobject +Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast(block_address); auto col = getColumnFromColumnVector(env, obj, block_address, column_position); if (!col.column->isNullable()) { - jobject block_stats = env->NewObject( - block_stats_class, - block_stats_constructor, - block->rows(), - false); + jobject block_stats = env->NewObject(block_stats_class, block_stats_constructor, block->rows(), false); return block_stats; } else @@ -535,10 +531,7 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockSta const auto & null_map_data = nullable->getNullMapData(); jobject block_stats = env->NewObject( - block_stats_class, - block_stats_constructor, - block->rows(), - !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size())); + block_stats_class, block_stats_constructor, block->rows(), !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size())); return block_stats; } LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) @@ -573,12 +566,8 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIE LOCAL_ENGINE_JNI_METHOD_END(env, ) } -local_engine::SplitterHolder * buildAndExecuteShuffle(JNIEnv * env, - jobject iter, - const String & name, - const local_engine::SplitOptions& options, - jobject rss_pusher = nullptr - ) +local_engine::SplitterHolder * buildAndExecuteShuffle( + JNIEnv * env, jobject iter, const String & name, const local_engine::SplitOptions & options, jobject rss_pusher = nullptr) { auto current_executor = local_engine::LocalExecutor::getCurrentExecutor(); local_engine::SplitterHolder * splitter = nullptr; @@ -592,7 +581,8 @@ local_engine::SplitterHolder * buildAndExecuteShuffle(JNIEnv * env, { /// Try to decide header from the first block read from Java iterator. auto header = first_block.value().cloneEmpty(); - splitter = new local_engine::SplitterHolder{.exchange_manager = std::make_unique(header, name, options, rss_pusher)}; + splitter = new local_engine::SplitterHolder{ + .exchange_manager = std::make_unique(header, name, options, rss_pusher)}; splitter->exchange_manager->initSinks(1); splitter->exchange_manager->pushBlock(first_block.value()); first_block = std::nullopt; @@ -604,14 +594,18 @@ local_engine::SplitterHolder * buildAndExecuteShuffle(JNIEnv * env, } else // empty iterator - splitter = new local_engine::SplitterHolder{.exchange_manager = std::make_unique(DB::Block(), name, options, rss_pusher)}; + splitter = new local_engine::SplitterHolder{ + .exchange_manager = std::make_unique(DB::Block(), name, options, rss_pusher)}; } else { - splitter = new local_engine::SplitterHolder{.exchange_manager = std::make_unique(current_executor.value()->getHeader().cloneEmpty(), name, options, rss_pusher)}; + splitter = new local_engine::SplitterHolder{ + .exchange_manager = std::make_unique( + current_executor.value()->getHeader().cloneEmpty(), name, options, rss_pusher)}; // TODO support multiple sinks splitter->exchange_manager->initSinks(1); - current_executor.value()->setSinks([&](auto & pipeline_builder) { splitter->exchange_manager->setSinksToPipeline(pipeline_builder);}); + current_executor.value()->setSinks([&](auto & pipeline_builder) + { splitter->exchange_manager->setSinksToPipeline(pipeline_builder); }); // execute pipeline current_executor.value()->execute(); } @@ -775,8 +769,7 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_ result.total_serialize_time, result.total_rows, result.total_blocks, - result.wall_time - ); + result.wall_time); return split_result; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)