diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index a928fe577490..4b34494b0adc 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #include #include #include @@ -292,7 +293,7 @@ QueryPlanStepPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrai if (rel.has_local_files()) local_files = rel.local_files(); else - local_files = parseLocalFiles(split_infos.at(nextSplitInfoIndex())); + local_files = BinaryToMessage(split_infos.at(nextSplitInfoIndex())); auto source = std::make_shared(context, header, local_files); auto source_pipe = Pipe(source); auto source_step = std::make_unique(context, std::move(source_pipe), "substrait local files"); @@ -469,6 +470,18 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan) std::list rel_stack; auto query_plan = parseOp(root_rel.root().input(), rel_stack); adjustOutput(query_plan, root_rel); + +#ifndef NDEBUG + PlanUtil::checkOuputType(*query_plan); +#endif + + auto * logger = &Poco::Logger::get("SerializedPlanParser"); + if (logger->debug()) + { + auto out = PlanUtil::explainPlan(*query_plan); + LOG_DEBUG(logger, "clickhouse plan:\n{}", out); + } + return query_plan; } @@ -522,7 +535,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list if (read.has_extension_table()) extension_table = read.extension_table(); else - extension_table = parseExtensionTable(split_infos.at(nextSplitInfoIndex())); + extension_table = BinaryToMessage(split_infos.at(nextSplitInfoIndex())); MergeTreeRelParser mergeTreeParser(this, context); query_plan = mergeTreeParser.parseReadRel(std::make_unique(), read, extension_table); @@ -1683,34 +1696,6 @@ const ActionsDAG::Node * SerializedPlanParser::parseExpression(ActionsDAGPtr act } } -substrait::ReadRel::ExtensionTable SerializedPlanParser::parseExtensionTable(const std::string & split_info) -{ - substrait::ReadRel::ExtensionTable extension_table; - google::protobuf::io::CodedInputStream coded_in( - reinterpret_cast(split_info.data()), static_cast(split_info.size())); - coded_in.SetRecursionLimit(100000); - - auto ok = extension_table.ParseFromCodedStream(&coded_in); - if (!ok) - throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::ReadRel::ExtensionTable from string failed"); - logDebugMessage(extension_table, "extension_table"); - return extension_table; -} - -substrait::ReadRel::LocalFiles SerializedPlanParser::parseLocalFiles(const std::string & split_info) -{ - substrait::ReadRel::LocalFiles local_files; - google::protobuf::io::CodedInputStream coded_in( - reinterpret_cast(split_info.data()), static_cast(split_info.size())); - coded_in.SetRecursionLimit(100000); - - auto ok = local_files.ParseFromCodedStream(&coded_in); - if (!ok) - throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::ReadRel::LocalFiles from string failed"); - logDebugMessage(local_files, "local_files"); - return local_files; -} - DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan) { const Settings & settings = context->getSettingsRef(); @@ -1719,7 +1704,7 @@ DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPl context, "", context->getClientInfo(), - priorities.insert(static_cast(settings.priority)), + priorities.insert(settings.priority), CurrentThread::getGroup(), IAST::QueryKind::Select, settings, @@ -1733,7 +1718,13 @@ DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPl .process_list_element = query_status}); } -std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan) +std::unique_ptr SerializedPlanParser::createExecutor(const std::string_view plan) +{ + const auto s_plan = BinaryToMessage(plan); + return createExecutor(parse(s_plan), s_plan); +} + +std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan) { Stopwatch stopwatch; @@ -1752,34 +1743,6 @@ std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPla return std::make_unique(std::move(query_plan), std::move(pipeline), dump_pipeline); } -QueryPlanPtr SerializedPlanParser::parse(std::string_view plan) -{ - substrait::Plan s_plan; - /// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data - /// Parsing may fail when the number of recursive layers is large. - /// Here, set a limit large enough to avoid this problem. - /// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information - google::protobuf::io::CodedInputStream coded_in(reinterpret_cast(plan.data()), static_cast(plan.size())); - coded_in.SetRecursionLimit(100000); - - if (!s_plan.ParseFromCodedStream(&coded_in)) - throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed"); - - auto res = parse(s_plan); - -#ifndef NDEBUG - PlanUtil::checkOuputType(*res); -#endif - - auto * logger = &Poco::Logger::get("SerializedPlanParser"); - if (logger->debug()) - { - auto out = PlanUtil::explainPlan(*res); - LOG_DEBUG(logger, "clickhouse plan:\n{}", out); - } - return res; -} - SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : context(context_) { } diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index 17581366e6bd..6fd3223a3cc4 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -257,19 +257,17 @@ class SerializedPlanParser friend class MergeTreeRelParser; friend class ProjectRelParser; - std::unique_ptr createExecutor(DB::QueryPlanPtr query_plan); - - DB::QueryPlanPtr parse(std::string_view plan); + std::unique_ptr createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan); public: explicit SerializedPlanParser(const ContextPtr & context); /// visible for UT DB::QueryPlanPtr parse(const substrait::Plan & plan); - std::unique_ptr createExecutor(const substrait::Plan & plan) { return createExecutor(parse(plan)); } + std::unique_ptr createExecutor(const substrait::Plan & plan) { return createExecutor(parse(plan), plan); } DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan); /// - std::unique_ptr createExecutor(const std::string_view plan) { return createExecutor(parse(plan)); } + std::unique_ptr createExecutor(const std::string_view plan); DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel & rel); DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel & rel); @@ -277,9 +275,6 @@ class SerializedPlanParser static bool isReadRelFromJava(const substrait::ReadRel & rel); static bool isReadFromMergeTree(const substrait::ReadRel & rel); - static substrait::ReadRel::LocalFiles parseLocalFiles(const std::string & split_info); - static substrait::ReadRel::ExtensionTable parseExtensionTable(const std::string & split_info); - void addInputIter(jobject iter, bool materialize_input) { input_iters.emplace_back(iter); @@ -415,7 +410,7 @@ struct SparkBuffer class LocalExecutor : public BlockIterator { public: - LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_); + LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_ = false); ~LocalExecutor(); SparkRowInfoPtr next(); diff --git a/cpp-ch/local-engine/Parser/SubstraitParserUtils.h b/cpp-ch/local-engine/Parser/SubstraitParserUtils.h new file mode 100644 index 000000000000..f247a3bddc09 --- /dev/null +++ b/cpp-ch/local-engine/Parser/SubstraitParserUtils.h @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +namespace DB::ErrorCodes +{ +extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; +} + +namespace local_engine +{ + +template +Message JsonStringToMessage(std::string_view json) +{ + Message message; + auto status = google::protobuf::util::JsonStringToMessage(json, &message); + if (!status.ok()) + { + std::string errmsg(status.message()); + throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse failed due to {}", errmsg); + } + return message; +} + +template +std::string JsonStringToBinary(const std::string_view json) +{ + Message message = JsonStringToMessage(json); + std::string binary; + message.SerializeToString(&binary); + return binary; +} + +template +Message BinaryToMessage(const std::string_view binary) +{ + Message message; + /// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data + /// Parsing may fail when the number of recursive layers is large. + /// Here, set a limit large enough to avoid this problem. + /// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information + google::protobuf::io::CodedInputStream coded_in(reinterpret_cast(binary.data()), static_cast(binary.size())); + coded_in.SetRecursionLimit(100000); + + if (!message.ParseFromCodedStream(&coded_in)) + throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse failed"); + return message; +} + +} // namespace local_engine diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 8b40c65a8b2e..abf95a4ae1fa 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -254,8 +255,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_ const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan); const std::string::size_type plan_size = plan_a.length(); - local_engine::LocalExecutor * executor - = parser.createExecutor({reinterpret_cast(plan_a.elems()), plan_size}).release(); + local_engine::LocalExecutor * executor = parser.createExecutor({reinterpret_cast(plan_a.elems()), plan_size}).release(); LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast(executor)); executor->setMetric(parser.getMetric()); executor->setExtraPlanHolder(parser.extra_plan_holder); @@ -285,7 +285,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START - auto *executor = reinterpret_cast(executor_address); + auto * executor = reinterpret_cast(executor_address); executor->cancel(); LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast(executor)); LOCAL_ENGINE_JNI_METHOD_END(env, ) @@ -294,7 +294,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIE JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START - auto *executor = reinterpret_cast(executor_address); + auto * executor = reinterpret_cast(executor_address); LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast(executor)); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) @@ -898,23 +898,12 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW const auto bucket_dir = jstring2string(env, bucket_dir_); const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_); - - /// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data - /// Parsing may fail when the number of recursive layers is large. - /// Here, set a limit large enough to avoid this problem. - /// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information - google::protobuf::io::CodedInputStream coded_in(plan_a.elems(), plan_a.length()); - coded_in.SetRecursionLimit(100000); - - substrait::Plan plan_ptr; - if (!plan_ptr.ParseFromCodedStream(&coded_in)) - throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed"); + auto plan_ptr = local_engine::BinaryToMessage( + {reinterpret_cast(plan_a.elems()), static_cast(plan_a.length())}); const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_); - const std::string::size_type split_info_size = split_info_a.length(); - std::string split_info_str{reinterpret_cast(split_info_a.elems()), split_info_size}; - - substrait::ReadRel::ExtensionTable extension_table = local_engine::SerializedPlanParser::parseExtensionTable(split_info_str); + auto extension_table = local_engine::BinaryToMessage( + {reinterpret_cast(split_info_a.elems()), static_cast(split_info_a.length())}); auto merge_tree_table = local_engine::MergeTreeRelParser::parseMergeTreeTable(extension_table); auto uuid = uuid_str + "_" + task_id; @@ -930,24 +919,12 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn { LOCAL_ENGINE_JNI_METHOD_START const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_); - const std::string::size_type plan_size = plan_a.length(); - - substrait::Plan plan_ptr; - if (!plan_ptr.ParseFromString({reinterpret_cast(plan_a.elems()), plan_size})) - throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed"); + auto plan_ptr = local_engine::BinaryToMessage( + {reinterpret_cast(plan_a.elems()), static_cast(plan_a.length())}); const auto read_a = local_engine::getByteArrayElementsSafe(env, read_); - /// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data - /// Parsing may fail when the number of recursive layers is large. - /// Here, set a limit large enough to avoid this problem. - /// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information - google::protobuf::io::CodedInputStream coded_in(read_a.elems(), read_a.length()); - coded_in.SetRecursionLimit(100000); - - substrait::Rel read_ptr; - if (!read_ptr.ParseFromCodedStream(&coded_in)) - throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Expression from string failed"); - + auto read_ptr = local_engine::BinaryToMessage( + {reinterpret_cast(read_a.elems()), static_cast(read_a.length())}); local_engine::SerializedPlanParser parser(local_engine::SerializedPlanParser::global_context); parser.parseExtensions(plan_ptr.extensions()); @@ -1023,23 +1000,13 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn const auto bucket_dir = jstring2string(env, bucket_dir_); const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_); - - /// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data - /// Parsing may fail when the number of recursive layers is large. - /// Here, set a limit large enough to avoid this problem. - /// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information - google::protobuf::io::CodedInputStream coded_in(plan_a.elems(), plan_a.length()); - coded_in.SetRecursionLimit(100000); - - substrait::Plan plan_ptr; - if (!plan_ptr.ParseFromCodedStream(&coded_in)) - throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed"); + auto plan_ptr = local_engine::BinaryToMessage( + {reinterpret_cast(plan_a.elems()), static_cast(plan_a.length())}); const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_); - const std::string::size_type split_info_size = split_info_a.length(); - std::string split_info_str{reinterpret_cast(split_info_a.elems()), split_info_size}; + auto extension_table = local_engine::BinaryToMessage( + {reinterpret_cast(split_info_a.elems()), static_cast(split_info_a.length())}); - substrait::ReadRel::ExtensionTable extension_table = local_engine::SerializedPlanParser::parseExtensionTable(split_info_str); google::protobuf::StringValue table; table.ParseFromString(extension_table.detail().value()); auto merge_tree_table = local_engine::parseMergeTreeTableString(table.value()); @@ -1255,8 +1222,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE parser.addInputIter(iter, false); const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan); const std::string::size_type plan_size = plan_a.length(); - local_engine::LocalExecutor * executor - = parser.createExecutor({reinterpret_cast(plan_a.elems()), plan_size}).release(); + local_engine::LocalExecutor * executor = parser.createExecutor({reinterpret_cast(plan_a.elems()), plan_size}).release(); return reinterpret_cast(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) }