diff --git a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp index a6091cbe1075..99ade18028a1 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp @@ -74,12 +74,7 @@ CrossRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const substrait::Rel & /* throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call parse()."); } -std::optional CrossRelParser::getSingleInput(const substrait::Rel & /*rel*/) -{ - throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); -} - -DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) +std::vector CrossRelParser::getInputs(const substrait::Rel & rel) { const auto & join = rel.cross(); if (!join.has_left() || !join.has_right()) @@ -87,12 +82,19 @@ DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, std::listparseOp(join.left(), rel_stack); - auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack); - rel_stack.pop_back(); + return {&join.left(), &join.right()}; +} +std::optional CrossRelParser::getSingleInput(const substrait::Rel & /*rel*/) +{ + throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); +} - return parseJoin(join, std::move(left_plan), std::move(right_plan)); +DB::QueryPlanPtr +CrossRelParser::parse(std::vector & input_plans_, const substrait::Rel & rel, std::list &) +{ + assert(input_plans_.size() == 2); + const auto & join = rel.cross(); + return parseJoin(join, std::move(input_plans_[0]), std::move(input_plans_[1])); } void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & right, const StorageJoinFromReadBuffer & storage_join) diff --git a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h index 6baedebe1fb6..635a2fb629fb 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h @@ -38,11 +38,12 @@ class CrossRelParser : public RelParser explicit CrossRelParser(SerializedPlanParser * plan_paser_); ~CrossRelParser() override = default; + DB::QueryPlanPtr parse( + std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) override; DB::QueryPlanPtr - parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, std::list & rel_stack_) override; - - DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack) override; + parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list & rel_stack_) override; + std::vector getInputs(const substrait::Rel & rel) override; std::optional getSingleInput(const substrait::Rel & rel) override; private: diff --git a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp index bdb91004cfa9..39212cf8dcbf 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp @@ -80,12 +80,7 @@ JoinRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const substrait::Rel & /*r throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call parse()."); } -std::optional JoinRelParser::getSingleInput(const substrait::Rel & /*rel*/) -{ - throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); -} - -DB::QueryPlanPtr JoinRelParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) +std::vector JoinRelParser::getInputs(const substrait::Rel & rel) { const auto & join = rel.join(); if (!join.has_left() || !join.has_right()) @@ -93,12 +88,19 @@ DB::QueryPlanPtr JoinRelParser::parseOp(const substrait::Rel & rel, std::listparseOp(join.left(), rel_stack); - auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack); - rel_stack.pop_back(); + return {&join.left(), &join.right()}; +} +std::optional JoinRelParser::getSingleInput(const substrait::Rel & /*rel*/) +{ + throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput()."); +} - return parseJoin(join, std::move(left_plan), std::move(right_plan)); +DB::QueryPlanPtr JoinRelParser::parse( + std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) +{ + assert(input_plans_.size() == 2); + const auto & join = rel.join(); + return parseJoin(join, std::move(input_plans_[0]), std::move(input_plans_[1])); } std::unordered_set JoinRelParser::extractTableSidesFromExpression( diff --git a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h index f5165bb0d42a..42ebc5c0a98e 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h @@ -42,8 +42,10 @@ class JoinRelParser : public RelParser DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, std::list & rel_stack_) override; - DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack) override; + DB::QueryPlanPtr parse( + std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) override; + std::vector getInputs(const substrait::Rel & rel) override; std::optional getSingleInput(const substrait::Rel & rel) override; private: diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp index 92528e2f9a4c..5532baca5342 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp @@ -15,23 +15,8 @@ * limitations under the License. */ +#include "ReadRelParser.h" #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "RelParser.h" namespace DB::ErrorCodes { @@ -40,126 +25,108 @@ extern const int LOGICAL_ERROR; namespace local_engine { -class ReadRelParser : public RelParser +DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) { -public: - explicit ReadRelParser(SerializedPlanParser * plan_parser_) : RelParser(plan_parser_) { } - ~ReadRelParser() override = default; - - DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) override + if (query_plan) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null"); + const auto & read = rel.read(); + if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read))) { - if (query_plan) - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null"); - const auto & read = rel.read(); - if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read))) - { - assert(read.has_base_schema()); - DB::QueryPlanStepPtr read_step; - if (isReadRelFromJava(read)) - read_step = parseReadRelWithJavaIter(read); - else - read_step = parseReadRelWithLocalFile(read); - query_plan = std::make_unique(); - steps.emplace_back(read_step.get()); - query_plan->addStep(std::move(read_step)); + assert(read.has_base_schema()); + DB::QueryPlanStepPtr read_step; + if (isReadRelFromJava(read)) + read_step = parseReadRelWithJavaIter(read); + else + read_step = parseReadRelWithLocalFile(read); + query_plan = std::make_unique(); + steps.emplace_back(read_step.get()); + query_plan->addStep(std::move(read_step)); - if (getContext()->getSettingsRef().max_threads > 1) - { - auto buffer_step = std::make_unique(query_plan->getCurrentDataStream()); - steps.emplace_back(buffer_step.get()); - query_plan->addStep(std::move(buffer_step)); - } + if (getContext()->getSettingsRef().max_threads > 1) + { + auto buffer_step = std::make_unique(query_plan->getCurrentDataStream()); + steps.emplace_back(buffer_step.get()); + query_plan->addStep(std::move(buffer_step)); } + } + else + { + substrait::ReadRel::ExtensionTable extension_table; + if (read.has_extension_table()) + extension_table = read.extension_table(); else { - substrait::ReadRel::ExtensionTable extension_table; - if (read.has_extension_table()) - extension_table = read.extension_table(); - else - { - extension_table = BinaryToMessage(getPlanParser()->nextSplitInfo()); - logDebugMessage(extension_table, "extension_table"); - } - MergeTreeRelParser mergeTreeParser(getPlanParser(), getContext()); - query_plan = mergeTreeParser.parseReadRel(std::make_unique(), read, extension_table); - steps = mergeTreeParser.getSteps(); + extension_table = BinaryToMessage(split_info); + logDebugMessage(extension_table, "extension_table"); } - return query_plan; + MergeTreeRelParser mergeTreeParser(getPlanParser(), getContext()); + query_plan = mergeTreeParser.parseReadRel(std::make_unique(), read, extension_table); + steps = mergeTreeParser.getSteps(); } + return query_plan; +} - // This is source node, there is no input - std::optional getSingleInput(const substrait::Rel & rel) override { return {}; } - -private: - bool isReadRelFromJava(const substrait::ReadRel & rel) - { - return rel.has_local_files() && rel.local_files().items().size() == 1 - && rel.local_files().items().at(0).uri_file().starts_with("iterator"); - } +bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel) +{ + return rel.has_local_files() && rel.local_files().items().size() == 1 + && rel.local_files().items().at(0).uri_file().starts_with("iterator"); +} - bool isReadFromMergeTree(const substrait::ReadRel & rel) - { - assert(rel.has_advanced_extension()); - bool is_read_from_merge_tree; - google::protobuf::StringValue optimization; - optimization.ParseFromString(rel.advanced_extension().optimization().value()); - ReadBufferFromString in(optimization.value()); - assertString("isMergeTree=", in); - readBoolText(is_read_from_merge_tree, in); - assertChar('\n', in); - return is_read_from_merge_tree; - } +bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel) +{ + assert(rel.has_advanced_extension()); + bool is_read_from_merge_tree; + google::protobuf::StringValue optimization; + optimization.ParseFromString(rel.advanced_extension().optimization().value()); + ReadBufferFromString in(optimization.value()); + if (!checkString("isMergeTree=", in)) + return false; + readBoolText(is_read_from_merge_tree, in); + assertChar('\n', in); + return is_read_from_merge_tree; +} - DB::QueryPlanStepPtr parseReadRelWithJavaIter(const substrait::ReadRel & rel) - { - assert(rel.has_local_files()); - assert(rel.local_files().items().size() == 1); - auto iter = rel.local_files().items().at(0).uri_file(); - auto pos = iter.find(':'); - auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); - auto [input_iter, materialize_input] = getPlanParser()->getInputIter(static_cast(iter_index)); +DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const substrait::ReadRel & rel) +{ + GET_JNIENV(env) + SCOPE_EXIT({CLEAN_JNIENV}); + auto first_block = SourceFromJavaIter::peekBlock(env, input_iter); - GET_JNIENV(env) - SCOPE_EXIT({CLEAN_JNIENV}); - auto first_block = SourceFromJavaIter::peekBlock(env, input_iter); + /// Try to decide header from the first block read from Java iterator. Thus AggregateFunction with parameters has more precise types. + auto header = first_block.has_value() ? first_block->cloneEmpty() : TypeParser::buildBlockFromNamedStruct(rel.base_schema()); + auto source = std::make_shared( + getContext(), std::move(header), input_iter, is_input_iter_materialize, std::move(first_block)); - /// Try to decide header from the first block read from Java iterator. Thus AggregateFunction with parameters has more precise types. - auto header = first_block.has_value() ? first_block->cloneEmpty() : TypeParser::buildBlockFromNamedStruct(rel.base_schema()); - auto source - = std::make_shared(getContext(), std::move(header), input_iter, materialize_input, std::move(first_block)); + QueryPlanStepPtr source_step = std::make_unique(Pipe(source)); + source_step->setStepDescription("Read From Java Iter"); + return source_step; +} - QueryPlanStepPtr source_step = std::make_unique(Pipe(source)); - source_step->setStepDescription("Read From Java Iter"); - return source_step; +QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadRel & rel) +{ + auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema()); + substrait::ReadRel::LocalFiles local_files; + if (rel.has_local_files()) + local_files = rel.local_files(); + else + { + local_files = BinaryToMessage(getPlanParser()->nextSplitInfo()); + logDebugMessage(local_files, "local_files"); } - - QueryPlanStepPtr parseReadRelWithLocalFile(const substrait::ReadRel & rel) + auto source = std::make_shared(getContext(), header, local_files); + auto source_pipe = Pipe(source); + auto source_step = std::make_unique(getContext(), std::move(source_pipe), "substrait local files"); + source_step->setStepDescription("read local files"); + if (rel.has_filter()) { - auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema()); - substrait::ReadRel::LocalFiles local_files; - if (rel.has_local_files()) - local_files = rel.local_files(); - else - { - local_files = BinaryToMessage(getPlanParser()->nextSplitInfo()); - logDebugMessage(local_files, "local_files"); - } - auto source = std::make_shared(getContext(), header, local_files); - auto source_pipe = Pipe(source); - auto source_step = std::make_unique(getContext(), std::move(source_pipe), "substrait local files"); - source_step->setStepDescription("read local files"); - if (rel.has_filter()) - { - DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)}; - const DB::ActionsDAG::Node * filter_node = parseExpression(actions_dag, rel.filter()); - actions_dag.addOrReplaceInOutputs(*filter_node); - assert(filter_node == &(actions_dag.findInOutputs(filter_node->result_name))); - source_step->addFilter(std::move(actions_dag), filter_node->result_name); - } - return source_step; + DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)}; + const DB::ActionsDAG::Node * filter_node = parseExpression(actions_dag, rel.filter()); + actions_dag.addOrReplaceInOutputs(*filter_node); + assert(filter_node == &(actions_dag.findInOutputs(filter_node->result_name))); + source_step->addFilter(std::move(actions_dag), filter_node->result_name); } -}; - + return source_step; +} void registerReadRelParser(RelParserFactory & factory) { diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h new file mode 100644 index 000000000000..8f9c578fba5e --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class ReadRelParser : public RelParser +{ +public: + explicit ReadRelParser(SerializedPlanParser * plan_parser_) : RelParser(plan_parser_) { } + ~ReadRelParser() override = default; + + DB::QueryPlanPtr + parse(std::vector &, const substrait::Rel & rel, std::list & rel_stack_) override + { + DB::QueryPlanPtr query_plan; + return parse(std::move(query_plan), rel, rel_stack_); + } + + DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list &) override; + // This is source node, there is no input + std::optional getSingleInput(const substrait::Rel & rel) override { return {}; } + + bool isReadRelFromJava(const substrait::ReadRel & rel); + bool isReadFromMergeTree(const substrait::ReadRel & rel); + + void setInputIter(jobject input_iter_, bool is_materialze) + { + input_iter = input_iter_; + is_input_iter_materialize = is_materialze; + } + + void setSplitInfo(String split_info_) { split_info = split_info_; } + +private: + jobject input_iter; + bool is_input_iter_materialize; + String split_info; + DB::QueryPlanStepPtr parseReadRelWithJavaIter(const substrait::ReadRel & rel); + QueryPlanStepPtr parseReadRelWithLocalFile(const substrait::ReadRel & rel); +}; +} diff --git a/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp index d86ac5740d39..ed48a2242c0e 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp @@ -22,9 +22,10 @@ #include #include #include +#include #include #include - +#include namespace DB { @@ -37,6 +38,14 @@ extern const int LOGICAL_ERROR; namespace local_engine { + +std::vector RelParser::getInputs(const substrait::Rel & rel) +{ + auto input = getSingleInput(rel); + if (!input) + return {}; + return {*input}; +} AggregateFunctionPtr RelParser::getAggregateFunction( const String & name, DB::DataTypes arg_types, DB::AggregateFunctionProperties & properties, const DB::Array & parameters) { @@ -80,18 +89,12 @@ std::optional RelParser::parseFunctionName(UInt32 function_ref, const su } return plan_parser->getFunctionName(*sigature_name, function); } -DB::QueryPlanPtr RelParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) + +DB::QueryPlanPtr +RelParser::parse(std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_) { - SerializedPlanParser & planParser = *getPlanParser(); - rel_stack.push_back(&rel); - DB::QueryPlanPtr query_plan; - auto input = getSingleInput(rel); - if (input) - { - query_plan = planParser.parseOp(**input, rel_stack); - } - rel_stack.pop_back(); - return parse(std::move(query_plan), rel, rel_stack); + assert(input_plans_.size() == 1); + return parse(std::move(input_plans_[0]), rel, rel_stack_); } std::map diff --git a/cpp-ch/local-engine/Parser/RelParsers/RelParser.h b/cpp-ch/local-engine/Parser/RelParsers/RelParser.h index f60f68b7248e..2e38c30c69d1 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/RelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/RelParser.h @@ -39,7 +39,9 @@ class RelParser virtual DB::QueryPlanPtr parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list & rel_stack_) = 0; - virtual DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list & rel_stack); + virtual DB::QueryPlanPtr + parse(std::vector & input_plans_, const substrait::Rel & rel, std::list & rel_stack_); + virtual std::vector getInputs(const substrait::Rel & rel); virtual std::optional getSingleInput(const substrait::Rel & rel) = 0; const std::vector & getSteps() const { return steps; } diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 8b07aa85788d..aa4bf5aac404 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -54,6 +54,7 @@ #include #include #include +#include #include #include #include @@ -84,6 +85,7 @@ #include #include #include +#include "RelParsers/RelParser.h" namespace DB { @@ -374,13 +376,48 @@ std::unique_ptr SerializedPlanParser::createExecutor(const substr QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) { - QueryPlanPtr query_plan; - std::vector steps; + DB::QueryPlanPtr query_plan; + auto rel_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(this); - auto op_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(this); - query_plan = op_parser->parseOp(rel, rel_stack); - auto parser_steps = op_parser->getSteps(); - steps.insert(steps.end(), parser_steps.begin(), parser_steps.end()); + auto all_input_rels = rel_parser->getInputs(rel); + std::vector input_query_plans; + rel_stack.push_back(&rel); + for (const auto * input_rel : all_input_rels) + { + auto input_query_plan = parseOp(*input_rel, rel_stack); + input_query_plans.push_back(std::move(input_query_plan)); + } + rel_stack.pop_back(); + + // source node is special + if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead) + { + assert(all_input_rels.empty()); + auto read_rel_parser = std::dynamic_pointer_cast(rel_parser); + const auto & read = rel.read(); + if (read.has_local_files()) + { + if (read_rel_parser->isReadRelFromJava(read)) + { + auto iter = read.local_files().items().at(0).uri_file(); + auto pos = iter.find(':'); + auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); + auto [input_iter, materalize_input] = getInputIter(static_cast(iter_index)); + read_rel_parser->setInputIter(input_iter, materalize_input); + } + } + else if (read_rel_parser->isReadFromMergeTree(read)) + { + if (!read.has_extension_table()) + { + read_rel_parser->setSplitInfo(nextSplitInfo()); + } + } + } + + query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); + + std::vector steps = rel_parser->getSteps(); if (!context->getSettingsRef().query_plan_enable_optimizations) {