From 3513c4187156f2427addfd89bb1ad6703624f96a Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 17 Jul 2024 15:44:02 +0800 Subject: [PATCH] refactor the code of parsing join parameters --- cpp-ch/local-engine/Parser/CrossRelParser.cpp | 17 +-- cpp-ch/local-engine/Parser/JoinRelParser.cpp | 57 +-------- .../Parser/RelAdvanceParseUtil.cpp | 114 ++++++++++++++++++ .../local-engine/Parser/RelAdvanceParseUtil.h | 45 +++++++ 4 files changed, 168 insertions(+), 65 deletions(-) create mode 100644 cpp-ch/local-engine/Parser/RelAdvanceParseUtil.cpp create mode 100644 cpp-ch/local-engine/Parser/RelAdvanceParseUtil.h diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp b/cpp-ch/local-engine/Parser/CrossRelParser.cpp index ea898640146b4..7537e58dca6ee 100644 --- a/cpp-ch/local-engine/Parser/CrossRelParser.cpp +++ b/cpp-ch/local-engine/Parser/CrossRelParser.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -51,17 +52,6 @@ using namespace DB; namespace local_engine { -String parseCrossJoinOptimizationInfos(const substrait::CrossRel & join) -{ - google::protobuf::StringValue optimization; - optimization.ParseFromString(join.advanced_extension().optimization().value()); - String storage_join_key; - ReadBufferFromString in(optimization.value()); - assertString("JoinParameters:", in); - assertString("buildHashTableId=", in); - readString(storage_join_key, in); - return storage_join_key; -} std::shared_ptr createCrossTableJoin(substrait::CrossRel_JoinType join_type) { @@ -154,7 +144,10 @@ void CrossRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & rig DB::QueryPlanPtr CrossRelParser::parseJoin(const substrait::CrossRel & join, DB::QueryPlanPtr left, DB::QueryPlanPtr right) { - auto storage_join_key = parseCrossJoinOptimizationInfos(join); + google::protobuf::StringValue optimization_info; + optimization_info.ParseFromString(join.advanced_extension().optimization().value()); + auto join_opt_info = JoinOptimizationInfo::parse(optimization_info.value()); + const auto & storage_join_key = join_opt_info.storage_join_key; auto storage_join = BroadCastJoinBuilder::getJoin(storage_join_key) ; renamePlanColumns(*left, *right, *storage_join); auto table_join = createCrossTableJoin(join.type()); diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/JoinRelParser.cpp index 03734a2a9f0de..4dceb5a370fdd 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -46,60 +47,8 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } } - -struct JoinOptimizationInfo -{ - bool is_broadcast = false; - bool is_smj = false; - bool is_null_aware_anti_join = false; - bool is_existence_join = false; - std::string storage_join_key; -}; - using namespace DB; -JoinOptimizationInfo parseJoinOptimizationInfo(const substrait::JoinRel & join) -{ - google::protobuf::StringValue optimization; - optimization.ParseFromString(join.advanced_extension().optimization().value()); - JoinOptimizationInfo info; - if (optimization.value().contains("isBHJ=")) - { - ReadBufferFromString in(optimization.value()); - assertString("JoinParameters:", in); - assertString("isBHJ=", in); - readBoolText(info.is_broadcast, in); - assertChar('\n', in); - if (info.is_broadcast) - { - assertString("isNullAwareAntiJoin=", in); - readBoolText(info.is_null_aware_anti_join, in); - assertChar('\n', in); - assertString("buildHashTableId=", in); - readString(info.storage_join_key, in); - assertChar('\n', in); - } - } - else - { - ReadBufferFromString in(optimization.value()); - assertString("JoinParameters:", in); - assertString("isSMJ=", in); - readBoolText(info.is_smj, in); - assertChar('\n', in); - if (info.is_smj) - { - assertString("isNullAwareAntiJoin=", in); - readBoolText(info.is_null_aware_anti_join, in); - assertChar('\n', in); - assertString("isExistenceJoin=", in); - readBoolText(info.is_existence_join, in); - assertChar('\n', in); - } - } - return info; -} - namespace local_engine { std::shared_ptr createDefaultTableJoin(substrait::JoinRel_JoinType join_type) @@ -261,7 +210,9 @@ void JoinRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & righ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::QueryPlanPtr left, DB::QueryPlanPtr right) { - auto join_opt_info = parseJoinOptimizationInfo(join); + google::protobuf::StringValue optimization_info; + optimization_info.ParseFromString(join.advanced_extension().optimization().value()); + auto join_opt_info = JoinOptimizationInfo::parse(optimization_info.value()); auto storage_join = join_opt_info.is_broadcast ? BroadCastJoinBuilder::getJoin(join_opt_info.storage_join_key) : nullptr; if (storage_join) { diff --git a/cpp-ch/local-engine/Parser/RelAdvanceParseUtil.cpp b/cpp-ch/local-engine/Parser/RelAdvanceParseUtil.cpp new file mode 100644 index 0000000000000..c5bbc5fb732fe --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelAdvanceParseUtil.cpp @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +namespace local_engine +{ + +template +void tryAssign(const std::unordered_map & kvs, const String & key, T & v); + +template<> +void tryAssign(const std::unordered_map & kvs, const String & key, String & v) +{ + auto it = kvs.find(key); + if (it != kvs.end()) + v = it->second; +} + +template<> +void tryAssign(const std::unordered_map & kvs, const String & key, bool & v) +{ + auto it = kvs.find(key); + if (it != kvs.end()) + { + if (it->second == "0" || it->second == "false" || it->second == "FALSE") + v = false; + else + v = true; + } +} + +template +void readStringUntilCharsInto(String & s, DB::ReadBuffer & buf) +{ + while (!buf.eof()) + { + char * next_pos = find_first_symbols(buf.position(), buf.buffer().end()); + + s.append(buf.position(), next_pos - buf.position()); + buf.position() = next_pos; + + if (buf.hasPendingData()) + return; + } +} + +/// In the format: Seg1:k1=v1\nk2=v2\n..\nSeg2:k1=v1\n... +std::unordered_map> convertToKVs(const String & advance) +{ + std::unordered_map> res; + std::unordered_map *kvs; + DB::ReadBufferFromString in(advance); + while(!in.eof()) + { + String key; + readStringUntilCharsInto<'=', '\n', ':'>(key, in); + if (key.empty()) + { + if (!in.eof()) + { + char c; + DB::readChar(c, in); + } + continue; + } + + char c; + DB::readChar(c, in); + if (c == ':') + { + res[key] = {}; + kvs = &res[key]; + continue; + } + + if (c != '=') + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid format, = is expected: {}", advance); + + String value; + readStringUntilCharsInto<'\n'>(value, in); + (*kvs)[key] = value; + } + return res; +} + +JoinOptimizationInfo JoinOptimizationInfo::parse(const String & advance) +{ + JoinOptimizationInfo info; + auto kkvs = convertToKVs(advance); + auto & kvs = kkvs["JoinParameters"]; + + tryAssign(kvs, "isisBHJ", info.is_broadcast); + tryAssign(kvs, "isSMJ", info.is_smj); + tryAssign(kvs, "buildHashTableId", info.storage_join_key); + tryAssign(kvs, "isNullAwareAntiJoin", info.is_null_aware_anti_join); + tryAssign(kvs, "isExistenceJoin", info.is_existence_join); + return info; +} +} + diff --git a/cpp-ch/local-engine/Parser/RelAdvanceParseUtil.h b/cpp-ch/local-engine/Parser/RelAdvanceParseUtil.h new file mode 100644 index 0000000000000..f00bb7381e462 --- /dev/null +++ b/cpp-ch/local-engine/Parser/RelAdvanceParseUtil.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include + +namespace DB::ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +namespace local_engine +{ + +std::unordered_map> convertToKVs(const String & advance); + + +struct JoinOptimizationInfo +{ + bool is_broadcast = false; + bool is_smj = false; + bool is_null_aware_anti_join = false; + bool is_existence_join = false; + String storage_join_key; + + static JoinOptimizationInfo parse(const String & advance); +}; +} +