Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Sep 11, 2024
1 parent 8fb194e commit 8da6707
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 164 deletions.
18 changes: 5 additions & 13 deletions cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,12 @@ std::optional<const substrait::Rel *> CrossRelParser::getSingleInput(const subst
throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput().");
}

DB::QueryPlanPtr CrossRelParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
DB::QueryPlanPtr CrossRelParser::parse(
std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
const auto & join = rel.cross();
if (!join.has_left() || !join.has_right())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "left table or right table is missing.");
}

rel_stack.push_back(&rel);
auto left_plan = getPlanParser()->parseOp(join.left(), rel_stack);
auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack);
rel_stack.pop_back();

return parseJoin(join, std::move(left_plan), std::move(right_plan));
assert(input_plans_.size() == 2);
const auto & join = rel.join();
return parseJoin(join, std::move(input_plans_[0]), std::move(input_plans_[1]));
}

void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & right, const StorageJoinFromReadBuffer & storage_join)
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class CrossRelParser : public RelParser
DB::QueryPlanPtr
parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, std::list<const substrait::Rel *> & rel_stack_) override;

DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack) override;

std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel & rel) override;

Expand Down
24 changes: 13 additions & 11 deletions cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,27 @@ JoinRelParser::parse(DB::QueryPlanPtr /*query_plan*/, const substrait::Rel & /*r
throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call parse().");
}

std::optional<const substrait::Rel *> JoinRelParser::getSingleInput(const substrait::Rel & /*rel*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput().");
}

DB::QueryPlanPtr JoinRelParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
std::vector<const substrait::Rel *> JoinRelParser::getInputs(const substrait::Rel & rel)
{
const auto & join = rel.join();
if (!join.has_left() || !join.has_right())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "left table or right table is missing.");
}

rel_stack.push_back(&rel);
auto left_plan = getPlanParser()->parseOp(join.left(), rel_stack);
auto right_plan = getPlanParser()->parseOp(join.right(), rel_stack);
rel_stack.pop_back();
return {&join.left(), &join.right()};
}
std::optional<const substrait::Rel *> JoinRelParser::getSingleInput(const substrait::Rel & /*rel*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "join node has 2 inputs, can't call getSingleInput().");
}

return parseJoin(join, std::move(left_plan), std::move(right_plan));
DB::QueryPlanPtr JoinRelParser::parse(
std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
assert(input_plans_.size() == 2);
const auto & join = rel.join();
return parseJoin(join, std::move(input_plans_[0]), std::move(input_plans_[1]));
}

std::unordered_set<DB::JoinTableSide> JoinRelParser::extractTableSidesFromExpression(
Expand Down
4 changes: 3 additions & 1 deletion cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ class JoinRelParser : public RelParser
DB::QueryPlanPtr
parse(DB::QueryPlanPtr query_plan, const substrait::Rel & sort_rel, std::list<const substrait::Rel *> & rel_stack_) override;

DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack) override;
DB::QueryPlanPtr parse(
std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_) override;

std::vector<const substrait::Rel *> getInputs(const substrait::Rel & rel) override;
std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel & rel) override;

private:
Expand Down
204 changes: 85 additions & 119 deletions cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,8 @@
* limitations under the License.
*/

#include "ReadRelParser.h"
#include <memory>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Operator/BlocksBufferPoolTransform.h>
#include <Parser/RelParsers/MergeTreeRelParser.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Storages/SourceFromJavaIter.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/BlockTypeUtils.h>
#include <Common/JNIUtils.h>
#include "RelParser.h"

namespace DB::ErrorCodes
{
Expand All @@ -40,126 +25,107 @@ extern const int LOGICAL_ERROR;

namespace local_engine
{
class ReadRelParser : public RelParser
DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const substrait::Rel *> &)
{
public:
explicit ReadRelParser(SerializedPlanParser * plan_parser_) : RelParser(plan_parser_) { }
~ReadRelParser() override = default;

DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const substrait::Rel *> &) override
if (query_plan)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null");
const auto & read = rel.read();
if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read)))
{
if (query_plan)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null");
const auto & read = rel.read();
if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read)))
{
assert(read.has_base_schema());
DB::QueryPlanStepPtr read_step;
if (isReadRelFromJava(read))
read_step = parseReadRelWithJavaIter(read);
else
read_step = parseReadRelWithLocalFile(read);
query_plan = std::make_unique<DB::QueryPlan>();
steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));
assert(read.has_base_schema());
DB::QueryPlanStepPtr read_step;
if (isReadRelFromJava(read))
read_step = parseReadRelWithJavaIter(read);
else
read_step = parseReadRelWithLocalFile(read);
query_plan = std::make_unique<DB::QueryPlan>();
steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));

if (getContext()->getSettingsRef().max_threads > 1)
{
auto buffer_step = std::make_unique<BlocksBufferPoolStep>(query_plan->getCurrentDataStream());
steps.emplace_back(buffer_step.get());
query_plan->addStep(std::move(buffer_step));
}
if (getContext()->getSettingsRef().max_threads > 1)
{
auto buffer_step = std::make_unique<BlocksBufferPoolStep>(query_plan->getCurrentDataStream());
steps.emplace_back(buffer_step.get());
query_plan->addStep(std::move(buffer_step));
}
}
else
{
substrait::ReadRel::ExtensionTable extension_table;
if (read.has_extension_table())
extension_table = read.extension_table();
else
{
substrait::ReadRel::ExtensionTable extension_table;
if (read.has_extension_table())
extension_table = read.extension_table();
else
{
extension_table = BinaryToMessage<substrait::ReadRel::ExtensionTable>(getPlanParser()->nextSplitInfo());
logDebugMessage(extension_table, "extension_table");
}
MergeTreeRelParser mergeTreeParser(getPlanParser(), getContext());
query_plan = mergeTreeParser.parseReadRel(std::make_unique<DB::QueryPlan>(), read, extension_table);
steps = mergeTreeParser.getSteps();
extension_table = BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_info);
logDebugMessage(extension_table, "extension_table");
}
return query_plan;
MergeTreeRelParser mergeTreeParser(getPlanParser(), getContext());
query_plan = mergeTreeParser.parseReadRel(std::make_unique<DB::QueryPlan>(), read, extension_table);
steps = mergeTreeParser.getSteps();
}
return query_plan;
}

// This is source node, there is no input
std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel & rel) override { return {}; }

private:
bool isReadRelFromJava(const substrait::ReadRel & rel)
{
return rel.has_local_files() && rel.local_files().items().size() == 1
&& rel.local_files().items().at(0).uri_file().starts_with("iterator");
}
bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel)
{
return rel.has_local_files() && rel.local_files().items().size() == 1
&& rel.local_files().items().at(0).uri_file().starts_with("iterator");
}

bool isReadFromMergeTree(const substrait::ReadRel & rel)
{
assert(rel.has_advanced_extension());
bool is_read_from_merge_tree;
google::protobuf::StringValue optimization;
optimization.ParseFromString(rel.advanced_extension().optimization().value());
ReadBufferFromString in(optimization.value());
assertString("isMergeTree=", in);
readBoolText(is_read_from_merge_tree, in);
assertChar('\n', in);
return is_read_from_merge_tree;
}
bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel)
{
assert(rel.has_advanced_extension());
bool is_read_from_merge_tree;
google::protobuf::StringValue optimization;
optimization.ParseFromString(rel.advanced_extension().optimization().value());
ReadBufferFromString in(optimization.value());
assertString("isMergeTree=", in);
readBoolText(is_read_from_merge_tree, in);
assertChar('\n', in);
return is_read_from_merge_tree;
}

DB::QueryPlanStepPtr parseReadRelWithJavaIter(const substrait::ReadRel & rel)
{
assert(rel.has_local_files());
assert(rel.local_files().items().size() == 1);
auto iter = rel.local_files().items().at(0).uri_file();
auto pos = iter.find(':');
auto iter_index = std::stoi(iter.substr(pos + 1, iter.size()));
auto [input_iter, materialize_input] = getPlanParser()->getInputIter(static_cast<size_t>(iter_index));
DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const substrait::ReadRel & rel)
{
GET_JNIENV(env)
SCOPE_EXIT({CLEAN_JNIENV});
auto first_block = SourceFromJavaIter::peekBlock(env, input_iter);

GET_JNIENV(env)
SCOPE_EXIT({CLEAN_JNIENV});
auto first_block = SourceFromJavaIter::peekBlock(env, input_iter);
/// Try to decide header from the first block read from Java iterator. Thus AggregateFunction with parameters has more precise types.
auto header = first_block.has_value() ? first_block->cloneEmpty() : TypeParser::buildBlockFromNamedStruct(rel.base_schema());
auto source = std::make_shared<SourceFromJavaIter>(
getContext(), std::move(header), input_iter, is_input_iter_materialize, std::move(first_block));

/// Try to decide header from the first block read from Java iterator. Thus AggregateFunction with parameters has more precise types.
auto header = first_block.has_value() ? first_block->cloneEmpty() : TypeParser::buildBlockFromNamedStruct(rel.base_schema());
auto source
= std::make_shared<SourceFromJavaIter>(getContext(), std::move(header), input_iter, materialize_input, std::move(first_block));
QueryPlanStepPtr source_step = std::make_unique<ReadFromPreparedSource>(Pipe(source));
source_step->setStepDescription("Read From Java Iter");
return source_step;
}

QueryPlanStepPtr source_step = std::make_unique<ReadFromPreparedSource>(Pipe(source));
source_step->setStepDescription("Read From Java Iter");
return source_step;
QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadRel & rel)
{
auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
substrait::ReadRel::LocalFiles local_files;
if (rel.has_local_files())
local_files = rel.local_files();
else
{
local_files = BinaryToMessage<substrait::ReadRel::LocalFiles>(getPlanParser()->nextSplitInfo());
logDebugMessage(local_files, "local_files");
}

QueryPlanStepPtr parseReadRelWithLocalFile(const substrait::ReadRel & rel)
auto source = std::make_shared<SubstraitFileSource>(getContext(), header, local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(), std::move(source_pipe), "substrait local files");
source_step->setStepDescription("read local files");
if (rel.has_filter())
{
auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
substrait::ReadRel::LocalFiles local_files;
if (rel.has_local_files())
local_files = rel.local_files();
else
{
local_files = BinaryToMessage<substrait::ReadRel::LocalFiles>(getPlanParser()->nextSplitInfo());
logDebugMessage(local_files, "local_files");
}
auto source = std::make_shared<SubstraitFileSource>(getContext(), header, local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(), std::move(source_pipe), "substrait local files");
source_step->setStepDescription("read local files");
if (rel.has_filter())
{
DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)};
const DB::ActionsDAG::Node * filter_node = parseExpression(actions_dag, rel.filter());
actions_dag.addOrReplaceInOutputs(*filter_node);
assert(filter_node == &(actions_dag.findInOutputs(filter_node->result_name)));
source_step->addFilter(std::move(actions_dag), filter_node->result_name);
}
return source_step;
DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)};
const DB::ActionsDAG::Node * filter_node = parseExpression(actions_dag, rel.filter());
actions_dag.addOrReplaceInOutputs(*filter_node);
assert(filter_node == &(actions_dag.findInOutputs(filter_node->result_name)));
source_step->addFilter(std::move(actions_dag), filter_node->result_name);
}
};

return source_step;
}

void registerReadRelParser(RelParserFactory & factory)
{
Expand Down
27 changes: 15 additions & 12 deletions cpp-ch/local-engine/Parser/RelParsers/RelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/IDataType.h>
#include <Poco/Logger.h>
#include <Poco/StringTokenizer.h>
#include <Common/Exception.h>

#include <Common/logger_useful.h>

namespace DB
{
Expand All @@ -37,6 +38,14 @@ extern const int LOGICAL_ERROR;

namespace local_engine
{

std::vector<const substrait::Rel *> RelParser::getInputs(const substrait::Rel & rel)
{
auto input = getSingleInput(rel);
if (!input)
return {};
return {*input};
}
AggregateFunctionPtr RelParser::getAggregateFunction(
const String & name, DB::DataTypes arg_types, DB::AggregateFunctionProperties & properties, const DB::Array & parameters)
{
Expand Down Expand Up @@ -80,18 +89,12 @@ std::optional<String> RelParser::parseFunctionName(UInt32 function_ref, const su
}
return plan_parser->getFunctionName(*sigature_name, function);
}
DB::QueryPlanPtr RelParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)

DB::QueryPlanPtr
RelParser::parse(std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
SerializedPlanParser & planParser = *getPlanParser();
rel_stack.push_back(&rel);
DB::QueryPlanPtr query_plan;
auto input = getSingleInput(rel);
if (input)
{
query_plan = planParser.parseOp(**input, rel_stack);
}
rel_stack.pop_back();
return parse(std::move(query_plan), rel, rel_stack);
assert(input_plans_.size() == 1);
return parse(std::move(input_plans_[0]), rel, rel_stack_);
}

std::map<std::string, std::string>
Expand Down
4 changes: 3 additions & 1 deletion cpp-ch/local-engine/Parser/RelParsers/RelParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class RelParser
virtual DB::QueryPlanPtr
parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
= 0;
virtual DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack);
virtual DB::QueryPlanPtr
parse(std::vector<DB::QueryPlanPtr> & input_plans_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_);
virtual std::vector<const substrait::Rel *> getInputs(const substrait::Rel & rel);
virtual std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel & rel) = 0;
const std::vector<IQueryPlanStep *> & getSteps() const { return steps; }

Expand Down
Loading

0 comments on commit 8da6707

Please sign in to comment.