Skip to content

Commit

Permalink
Add SubstraitParserUtils.h
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Jul 5, 2024
1 parent aea4db0 commit 8621df8
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 120 deletions.
83 changes: 23 additions & 60 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include <Parser/FunctionParser.h>
#include <Parser/MergeTreeRelParser.h>
#include <Parser/RelParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ExpressionListParsers.h>
Expand Down Expand Up @@ -292,7 +293,7 @@ QueryPlanStepPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrai
if (rel.has_local_files())
local_files = rel.local_files();
else
local_files = parseLocalFiles(split_infos.at(nextSplitInfoIndex()));
local_files = BinaryToMessage<substrait::ReadRel::LocalFiles>(split_infos.at(nextSplitInfoIndex()));
auto source = std::make_shared<SubstraitFileSource>(context, header, local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(context, std::move(source_pipe), "substrait local files");
Expand Down Expand Up @@ -469,6 +470,18 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
std::list<const substrait::Rel *> rel_stack;
auto query_plan = parseOp(root_rel.root().input(), rel_stack);
adjustOutput(query_plan, root_rel);

#ifndef NDEBUG
PlanUtil::checkOuputType(*query_plan);
#endif

auto * logger = &Poco::Logger::get("SerializedPlanParser");
if (logger->debug())
{
auto out = PlanUtil::explainPlan(*query_plan);
LOG_DEBUG(logger, "clickhouse plan:\n{}", out);
}

return query_plan;
}

Expand Down Expand Up @@ -522,7 +535,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list
if (read.has_extension_table())
extension_table = read.extension_table();
else
extension_table = parseExtensionTable(split_infos.at(nextSplitInfoIndex()));
extension_table = BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_infos.at(nextSplitInfoIndex()));

MergeTreeRelParser mergeTreeParser(this, context);
query_plan = mergeTreeParser.parseReadRel(std::make_unique<QueryPlan>(), read, extension_table);
Expand Down Expand Up @@ -1683,34 +1696,6 @@ const ActionsDAG::Node * SerializedPlanParser::parseExpression(ActionsDAGPtr act
}
}

substrait::ReadRel::ExtensionTable SerializedPlanParser::parseExtensionTable(const std::string & split_info)
{
substrait::ReadRel::ExtensionTable extension_table;
google::protobuf::io::CodedInputStream coded_in(
reinterpret_cast<const uint8_t *>(split_info.data()), static_cast<int>(split_info.size()));
coded_in.SetRecursionLimit(100000);

auto ok = extension_table.ParseFromCodedStream(&coded_in);
if (!ok)
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::ReadRel::ExtensionTable from string failed");
logDebugMessage(extension_table, "extension_table");
return extension_table;
}

substrait::ReadRel::LocalFiles SerializedPlanParser::parseLocalFiles(const std::string & split_info)
{
substrait::ReadRel::LocalFiles local_files;
google::protobuf::io::CodedInputStream coded_in(
reinterpret_cast<const uint8_t *>(split_info.data()), static_cast<int>(split_info.size()));
coded_in.SetRecursionLimit(100000);

auto ok = local_files.ParseFromCodedStream(&coded_in);
if (!ok)
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::ReadRel::LocalFiles from string failed");
logDebugMessage(local_files, "local_files");
return local_files;
}

DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan)
{
const Settings & settings = context->getSettingsRef();
Expand All @@ -1719,7 +1704,7 @@ DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPl
context,
"",
context->getClientInfo(),
priorities.insert(static_cast<int>(settings.priority)),
priorities.insert(settings.priority),
CurrentThread::getGroup(),
IAST::QueryKind::Select,
settings,
Expand All @@ -1733,7 +1718,13 @@ DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPl
.process_list_element = query_status});
}

std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan)
std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const std::string_view plan)
{
const auto s_plan = BinaryToMessage<substrait::Plan>(plan);
return createExecutor(parse(s_plan), s_plan);
}

std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan)
{
Stopwatch stopwatch;

Expand All @@ -1752,34 +1743,6 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPla
return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(pipeline), dump_pipeline);
}

QueryPlanPtr SerializedPlanParser::parse(std::string_view plan)
{
substrait::Plan s_plan;
/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const uint8_t *>(plan.data()), static_cast<int>(plan.size()));
coded_in.SetRecursionLimit(100000);

if (!s_plan.ParseFromCodedStream(&coded_in))
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");

auto res = parse(s_plan);

#ifndef NDEBUG
PlanUtil::checkOuputType(*res);
#endif

auto * logger = &Poco::Logger::get("SerializedPlanParser");
if (logger->debug())
{
auto out = PlanUtil::explainPlan(*res);
LOG_DEBUG(logger, "clickhouse plan:\n{}", out);
}
return res;
}

SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : context(context_)
{
}
Expand Down
13 changes: 4 additions & 9 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,29 +257,24 @@ class SerializedPlanParser
friend class MergeTreeRelParser;
friend class ProjectRelParser;

std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan);

DB::QueryPlanPtr parse(std::string_view plan);
std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan, const substrait::Plan & s_plan);

public:
explicit SerializedPlanParser(const ContextPtr & context);

/// visible for UT
DB::QueryPlanPtr parse(const substrait::Plan & plan);
std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & plan) { return createExecutor(parse(plan)); }
std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & plan) { return createExecutor(parse(plan), plan); }
DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan);
///
std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan) { return createExecutor(parse(plan)); }
std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);

DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel & rel);
DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel & rel);

static bool isReadRelFromJava(const substrait::ReadRel & rel);
static bool isReadFromMergeTree(const substrait::ReadRel & rel);

static substrait::ReadRel::LocalFiles parseLocalFiles(const std::string & split_info);
static substrait::ReadRel::ExtensionTable parseExtensionTable(const std::string & split_info);

void addInputIter(jobject iter, bool materialize_input)
{
input_iters.emplace_back(iter);
Expand Down Expand Up @@ -415,7 +410,7 @@ struct SparkBuffer
class LocalExecutor : public BlockIterator
{
public:
LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_);
LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_ = false);
~LocalExecutor();

SparkRowInfoPtr next();
Expand Down
69 changes: 69 additions & 0 deletions cpp-ch/local-engine/Parser/SubstraitParserUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
#include <google/protobuf/util/json_util.h>
#include <Common/Exception.h>

namespace DB::ErrorCodes
{
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
}

namespace local_engine
{

template <typename Message>
Message JsonStringToMessage(std::string_view json)
{
Message message;
auto status = google::protobuf::util::JsonStringToMessage(json, &message);
if (!status.ok())
{
std::string errmsg(status.message());
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse failed due to {}", errmsg);
}
return message;
}

template <typename Message>
std::string JsonStringToBinary(const std::string_view json)
{
Message message = JsonStringToMessage<Message>(json);
std::string binary;
message.SerializeToString(&binary);
return binary;
}

template <typename Message>
Message BinaryToMessage(const std::string_view binary)
{
Message message;
/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const uint8_t *>(binary.data()), static_cast<int>(binary.size()));
coded_in.SetRecursionLimit(100000);

if (!message.ParseFromCodedStream(&coded_in))
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse failed");
return message;
}

} // namespace local_engine
68 changes: 17 additions & 51 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Parser/RelParser.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SparkRowToCHColumn.h>
#include <Parser/SubstraitParserUtils.h>
#include <Shuffle/CachedShuffleWriter.h>
#include <Shuffle/NativeSplitter.h>
#include <Shuffle/NativeWriterInMemory.h>
Expand Down Expand Up @@ -254,8 +255,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_

const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
= parser.createExecutor({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
local_engine::LocalExecutor * executor = parser.createExecutor({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
Expand Down Expand Up @@ -285,7 +285,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto *executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
auto * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
Expand All @@ -294,7 +294,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIE
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto *executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
auto * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
Expand Down Expand Up @@ -898,23 +898,12 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
const auto bucket_dir = jstring2string(env, bucket_dir_);

const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);

/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(plan_a.elems(), plan_a.length());
coded_in.SetRecursionLimit(100000);

substrait::Plan plan_ptr;
if (!plan_ptr.ParseFromCodedStream(&coded_in))
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");
auto plan_ptr = local_engine::BinaryToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(plan_a.elems()), static_cast<size_t>(plan_a.length())});

const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_);
const std::string::size_type split_info_size = split_info_a.length();
std::string split_info_str{reinterpret_cast<const char *>(split_info_a.elems()), split_info_size};

substrait::ReadRel::ExtensionTable extension_table = local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
auto extension_table = local_engine::BinaryToMessage<substrait::ReadRel::ExtensionTable>(
{reinterpret_cast<const char *>(split_info_a.elems()), static_cast<size_t>(split_info_a.length())});

auto merge_tree_table = local_engine::MergeTreeRelParser::parseMergeTreeTable(extension_table);
auto uuid = uuid_str + "_" + task_id;
Expand All @@ -930,24 +919,12 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
{
LOCAL_ENGINE_JNI_METHOD_START
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
const std::string::size_type plan_size = plan_a.length();

substrait::Plan plan_ptr;
if (!plan_ptr.ParseFromString({reinterpret_cast<const char *>(plan_a.elems()), plan_size}))
throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");
auto plan_ptr = local_engine::BinaryToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(plan_a.elems()), static_cast<size_t>(plan_a.length())});

const auto read_a = local_engine::getByteArrayElementsSafe(env, read_);
/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(read_a.elems(), read_a.length());
coded_in.SetRecursionLimit(100000);

substrait::Rel read_ptr;
if (!read_ptr.ParseFromCodedStream(&coded_in))
throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Expression from string failed");

auto read_ptr = local_engine::BinaryToMessage<substrait::Rel>(
{reinterpret_cast<const char *>(read_a.elems()), static_cast<size_t>(read_a.length())});

local_engine::SerializedPlanParser parser(local_engine::SerializedPlanParser::global_context);
parser.parseExtensions(plan_ptr.extensions());
Expand Down Expand Up @@ -1023,23 +1000,13 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
const auto bucket_dir = jstring2string(env, bucket_dir_);

const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);

/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(plan_a.elems(), plan_a.length());
coded_in.SetRecursionLimit(100000);

substrait::Plan plan_ptr;
if (!plan_ptr.ParseFromCodedStream(&coded_in))
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");
auto plan_ptr = local_engine::BinaryToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(plan_a.elems()), static_cast<size_t>(plan_a.length())});

const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_);
const std::string::size_type split_info_size = split_info_a.length();
std::string split_info_str{reinterpret_cast<const char *>(split_info_a.elems()), split_info_size};
auto extension_table = local_engine::BinaryToMessage<substrait::ReadRel::ExtensionTable>(
{reinterpret_cast<const char *>(split_info_a.elems()), static_cast<size_t>(split_info_a.length())});

substrait::ReadRel::ExtensionTable extension_table = local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
auto merge_tree_table = local_engine::parseMergeTreeTableString(table.value());
Expand Down Expand Up @@ -1255,8 +1222,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
parser.addInputIter(iter, false);
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
= parser.createExecutor({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
local_engine::LocalExecutor * executor = parser.createExecutor({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
Expand Down

0 comments on commit 8621df8

Please sign in to comment.