From e3b48c166d2d4e7da3de1360c0e15901f52eefa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=89=AC?= <654010905@qq.com> Date: Wed, 12 Oct 2022 04:08:44 -0500 Subject: [PATCH] Support loading setting from config file and improve logging. (#118) * support config load * fix building error * fix bugs * commit again * improve function params, std::string -> const std::string& * refinement configure loading * fixed code style Co-authored-by: lgbo-ustc --- .../Builder/SerializedPlanBuilder.cpp | 17 +- .../Builder/SerializedPlanBuilder.h | 32 +-- utils/local-engine/CMakeLists.txt | 3 + utils/local-engine/Common/JoinHelper.cpp | 2 +- utils/local-engine/Common/JoinHelper.h | 2 +- utils/local-engine/Common/Logger.cpp | 19 +- utils/local-engine/Common/Logger.h | 4 +- utils/local-engine/Common/MergeTreeTool.cpp | 2 +- utils/local-engine/Common/MergeTreeTool.h | 3 +- utils/local-engine/Common/StringUtils.cpp | 4 +- utils/local-engine/Common/StringUtils.h | 4 +- utils/local-engine/Common/common.cpp | 190 +++++++++++++++--- .../Parser/SerializedPlanParser.cpp | 7 +- .../Parser/SerializedPlanParser.h | 7 +- utils/local-engine/Shuffle/NativeSplitter.cpp | 2 +- utils/local-engine/Shuffle/NativeSplitter.h | 2 +- .../local-engine/Shuffle/ShuffleSplitter.cpp | 2 +- utils/local-engine/Shuffle/ShuffleSplitter.h | 2 +- .../SubstraitSource/ReadBufferBuilder.cpp | 4 + utils/local-engine/local_engine_jni.cpp | 12 +- 20 files changed, 244 insertions(+), 76 deletions(-) diff --git a/utils/local-engine/Builder/SerializedPlanBuilder.cpp b/utils/local-engine/Builder/SerializedPlanBuilder.cpp index cf0605e7f0c0..7b2f8c721f55 100644 --- a/utils/local-engine/Builder/SerializedPlanBuilder.cpp +++ b/utils/local-engine/Builder/SerializedPlanBuilder.cpp @@ -85,8 +85,7 @@ SchemaPtr SerializedSchemaBuilder::build() } return std::move(this->schema); } - -SerializedSchemaBuilder & SerializedSchemaBuilder::column(std::string name, std::string type, bool nullable) +SerializedSchemaBuilder & SerializedSchemaBuilder::column(const std::string & name, const std::string & type, bool nullable) { this->type_map.emplace(name, type); this->nullability_map.emplace(name, nullable); @@ -95,7 +94,7 @@ SerializedSchemaBuilder & SerializedSchemaBuilder::column(std::string name, std: SerializedSchemaBuilder::SerializedSchemaBuilder() : schema(new substrait::NamedStruct()) { } -SerializedPlanBuilder & SerializedPlanBuilder::registerFunction(int id, std::string name) +SerializedPlanBuilder & SerializedPlanBuilder::registerFunction(int id, const std::string & name) { auto * extension = this->plan->mutable_extensions()->Add(); auto * function_mapping = extension->mutable_extension_function(); @@ -139,7 +138,7 @@ SerializedPlanBuilder & SerializedPlanBuilder::filter(substrait::Expression * co return *this; } -SerializedPlanBuilder & SerializedPlanBuilder::read(std::string path, SchemaPtr schema) +SerializedPlanBuilder & SerializedPlanBuilder::read(const std::string & path, SchemaPtr schema) { substrait::Rel * rel = new substrait::Rel(); auto * read = rel->mutable_read(); @@ -150,7 +149,13 @@ SerializedPlanBuilder & SerializedPlanBuilder::read(std::string path, SchemaPtr return *this; } -SerializedPlanBuilder& SerializedPlanBuilder::readMergeTree(std::string database, std::string table, std::string relative_path,int min_block, int max_block, SchemaPtr schema) +SerializedPlanBuilder & SerializedPlanBuilder::readMergeTree( + const std::string & database, + const std::string & table, + const std::string & relative_path, + int min_block, + int max_block, + SchemaPtr schema) { substrait::Rel * rel = new substrait::Rel(); auto * read = rel->mutable_read(); @@ -234,7 +239,7 @@ substrait::Expression * literal(int32_t value) return rel; } -substrait::Expression * literal(std::string value) +substrait::Expression * literal(const std::string & value) { substrait::Expression * rel = new substrait::Expression(); auto * literal = rel->mutable_literal(); diff --git a/utils/local-engine/Builder/SerializedPlanBuilder.h b/utils/local-engine/Builder/SerializedPlanBuilder.h index 75501ed5a43a..66345c55260a 100644 --- a/utils/local-engine/Builder/SerializedPlanBuilder.h +++ b/utils/local-engine/Builder/SerializedPlanBuilder.h @@ -39,14 +39,18 @@ class SerializedPlanBuilder .registerFunction(EQUAL_TO, "equal"); return *this; } - - SerializedPlanBuilder & registerFunction(int id, std::string name); - SerializedPlanBuilder & filter(substrait::Expression * condition); - SerializedPlanBuilder & project(std::vector projections); - SerializedPlanBuilder & aggregate(std::vector keys, std::vector aggregates); - SerializedPlanBuilder & read(std::string path, SchemaPtr schema); - SerializedPlanBuilder & - readMergeTree(std::string database, std::string table, std::string relative_path, int min_block, int max_block, SchemaPtr schema); + SerializedPlanBuilder& registerFunction(int id, const std::string & name); + SerializedPlanBuilder& filter(substrait::Expression* condition); + SerializedPlanBuilder& project(std::vector projections); + SerializedPlanBuilder& aggregate(std::vector keys, std::vector aggregates); + SerializedPlanBuilder& read(const std::string & path, SchemaPtr schema); + SerializedPlanBuilder & readMergeTree( + const std::string & database, + const std::string & table, + const std::string & relative_path, + int min_block, + int max_block, + SchemaPtr schema); std::unique_ptr build(); private: @@ -68,8 +72,7 @@ class SerializedSchemaBuilder public: SerializedSchemaBuilder(); SchemaPtr build(); - SerializedSchemaBuilder & column(std::string name, std::string type, bool nullable = false); - + SerializedSchemaBuilder& column(const std::string & name, const std::string & type, bool nullable = false); private: std::map type_map; std::map nullability_map; @@ -83,11 +86,10 @@ using MeasureList = std::vector; substrait::Expression * scalarFunction(int32_t id, ExpressionList args); substrait::AggregateRel_Measure * measureFunction(int32_t id, ExpressionList args); -substrait::Expression * literal(double_t value); -substrait::Expression * literal(int32_t value); -substrait::Expression * literal(std::string value); -substrait::Expression * literalDate(int32_t value); -substrait::Expression * literalTimestamp(int64_t value); +substrait::Expression* literal(double_t value); +substrait::Expression* literal(int32_t value); +substrait::Expression* literal(const std::string & value); +substrait::Expression* literalDate(int32_t value); substrait::Expression * selection(int32_t field_id); diff --git a/utils/local-engine/CMakeLists.txt b/utils/local-engine/CMakeLists.txt index f4bbe99c835f..766610a49261 100644 --- a/utils/local-engine/CMakeLists.txt +++ b/utils/local-engine/CMakeLists.txt @@ -122,6 +122,9 @@ target_compile_options(clickhouse_functions PRIVATE -fPIC) target_compile_options(clickhouse_common_access PRIVATE -fPIC) target_compile_options(clickhouse_storages_system PRIVATE -fPIC) target_compile_options(clickhouse_table_functions PRIVATE -fPIC) +target_compile_options(substrait PRIVATE -fPIC) +target_compile_options(loggers PRIVATE -fPIC) + if (ENABLE_EMBEDDED_COMPILER) target_compile_options(LLVMDemangle PRIVATE -fPIC) target_compile_options(LLVMSupport PRIVATE -fPIC) diff --git a/utils/local-engine/Common/JoinHelper.cpp b/utils/local-engine/Common/JoinHelper.cpp index e886b9bdc1ef..cecdcf4a3128 100644 --- a/utils/local-engine/Common/JoinHelper.cpp +++ b/utils/local-engine/Common/JoinHelper.cpp @@ -7,7 +7,7 @@ using namespace DB; namespace local_engine { -JoinOptimizationInfo parseJoinOptimizationInfo(std::string optimization) +JoinOptimizationInfo parseJoinOptimizationInfo(const std::string & optimization) { JoinOptimizationInfo info; ReadBufferFromString in(optimization); diff --git a/utils/local-engine/Common/JoinHelper.h b/utils/local-engine/Common/JoinHelper.h index 53520e632247..ec6a4f778a2c 100644 --- a/utils/local-engine/Common/JoinHelper.h +++ b/utils/local-engine/Common/JoinHelper.h @@ -11,7 +11,7 @@ struct JoinOptimizationInfo }; -JoinOptimizationInfo parseJoinOptimizationInfo(std::string optimization); +JoinOptimizationInfo parseJoinOptimizationInfo(const std::string & optimization); } diff --git a/utils/local-engine/Common/Logger.cpp b/utils/local-engine/Common/Logger.cpp index cd00aed91f7c..7b3d7aaa40bc 100644 --- a/utils/local-engine/Common/Logger.cpp +++ b/utils/local-engine/Common/Logger.cpp @@ -1,19 +1,26 @@ #include "Logger.h" + +#include #include #include #include +#include using Poco::ConsoleChannel; using Poco::AutoPtr; using Poco::AsyncChannel; -void local_engine::Logger::initConsoleLogger() +void local_engine::Logger::initConsoleLogger(const std::string & level) { - AutoPtr pCons(new ConsoleChannel); - AutoPtr pAsync(new AsyncChannel(pCons)); - Poco::Logger::root().setChannel(pAsync); - Poco::Logger::root().setLevel("error"); - Poco::Logger::root().debug("init logger success"); + AutoPtr chan(new ConsoleChannel); + AutoPtr async_chann(new AsyncChannel(chan)); + Poco::Logger::root().setChannel(async_chann); + Poco::Logger::root().setLevel(level); } +void local_engine::Logger::initFileLogger(Poco::Util::AbstractConfiguration & config, const std::string & cmd_name) +{ + static Loggers loggers; + loggers.buildLoggers(config, Poco::Logger::root(), cmd_name); +} diff --git a/utils/local-engine/Common/Logger.h b/utils/local-engine/Common/Logger.h index 941b3890e1a6..051dc45efb22 100644 --- a/utils/local-engine/Common/Logger.h +++ b/utils/local-engine/Common/Logger.h @@ -1,13 +1,15 @@ #pragma once #include +#include namespace local_engine { class Logger { public: - static void initConsoleLogger(); + static void initConsoleLogger(const std::string & level); + static void initFileLogger(Poco::Util::AbstractConfiguration & config, const std::string & cmd_name); }; } diff --git a/utils/local-engine/Common/MergeTreeTool.cpp b/utils/local-engine/Common/MergeTreeTool.cpp index 60e77ecae9a2..0ff8e030e221 100644 --- a/utils/local-engine/Common/MergeTreeTool.cpp +++ b/utils/local-engine/Common/MergeTreeTool.cpp @@ -42,7 +42,7 @@ std::unique_ptr buildQueryInfo(NamesAndTypesList& names_and_typ } -MergeTreeTable parseMergeTreeTableString(std::string & info) +MergeTreeTable parseMergeTreeTableString(const std::string & info) { ReadBufferFromString in(info); assertString("MergeTree;", in); diff --git a/utils/local-engine/Common/MergeTreeTool.h b/utils/local-engine/Common/MergeTreeTool.h index fcd38e35f6a0..df82133c887c 100644 --- a/utils/local-engine/Common/MergeTreeTool.h +++ b/utils/local-engine/Common/MergeTreeTool.h @@ -34,5 +34,6 @@ struct MergeTreeTable std::string toString() const; }; -MergeTreeTable parseMergeTreeTableString(std::string & info); +MergeTreeTable parseMergeTreeTableString(const std::string & info); + } diff --git a/utils/local-engine/Common/StringUtils.cpp b/utils/local-engine/Common/StringUtils.cpp index d05cd8dea509..720bb32cbf9f 100644 --- a/utils/local-engine/Common/StringUtils.cpp +++ b/utils/local-engine/Common/StringUtils.cpp @@ -4,7 +4,7 @@ namespace local_engine { -PartitionValues StringUtils::parsePartitionTablePath(std::string file) +PartitionValues StringUtils::parsePartitionTablePath(const std::string & file) { PartitionValues result; Poco::StringTokenizer path(file, "/"); @@ -18,7 +18,7 @@ PartitionValues StringUtils::parsePartitionTablePath(std::string file) } return result; } -bool StringUtils::isNullPartitionValue(std::string value) +bool StringUtils::isNullPartitionValue(const std::string & value) { return value == "__HIVE_DEFAULT_PARTITION__"; } diff --git a/utils/local-engine/Common/StringUtils.h b/utils/local-engine/Common/StringUtils.h index 41d1da3765e1..40f33500513c 100644 --- a/utils/local-engine/Common/StringUtils.h +++ b/utils/local-engine/Common/StringUtils.h @@ -10,7 +10,7 @@ using PartitionValues = std::vector; class StringUtils { public: - static PartitionValues parsePartitionTablePath(std::string file); - static bool isNullPartitionValue(std::string value); + static PartitionValues parsePartitionTablePath(const std::string & file); + static bool isNullPartitionValue(const std::string & value); }; } diff --git a/utils/local-engine/Common/common.cpp b/utils/local-engine/Common/common.cpp index f088ba2f7f43..267ae8815bfe 100644 --- a/utils/local-engine/Common/common.cpp +++ b/utils/local-engine/Common/common.cpp @@ -1,15 +1,27 @@ #include -#include #include - +#include #include +#include #include #include -#include +#include #include +#include +#include #include +#include +namespace DB +{ +namespace ErrorCodes +{ +extern const int BAD_ARGUMENTS; +} +} using namespace DB; +namespace fs = std::filesystem; + #ifdef __cplusplus extern "C" { #endif @@ -19,49 +31,177 @@ void registerAllFunctions() registerFunctions(); registerAggregateFunctions(); } +constexpr auto CH_BACKEND_CONF_PREFIX = "spark.gluten.sql.columnar.backend.ch"; +constexpr auto CH_RUNTIME_CONF = "runtime_conf"; + +/// For using gluten, we recommend to pass clickhouse runtime configure by using --files in spark-submit. +/// And set the parameter CH_BACKEND_CONF_PREFIX.CH_RUNTIME_CONF.conf_file +/// You can also set a specified configuration with prefix CH_BACKEND_CONF_PREFIX.CH_RUNTIME_CONF, and this +/// will overwrite the configuration from CH_BACKEND_CONF_PREFIX.CH_RUNTIME_CONF.conf_file . +static std::map getBackendConf(const std::string & plan) +{ + std::map ch_backend_conf; + + /// parse backend configs from plan extensions + do + { + auto plan_ptr = std::make_unique(); + auto success = plan_ptr->ParseFromString(plan); + if (!success) + break; + + if (!plan_ptr->has_advanced_extensions() || !plan_ptr->advanced_extensions().has_enhancement()) + break; + const auto & enhancement = plan_ptr->advanced_extensions().enhancement(); + + if (!enhancement.Is()) + break; + + substrait::Expression expression; + if (!enhancement.UnpackTo(&expression) || !expression.has_literal() || !expression.literal().has_map()) + break; + + const auto & key_values = expression.literal().map().key_values(); + for (const auto & key_value : key_values) + { + if (!key_value.has_key() || !key_value.has_value()) + continue; + + const auto & key = key_value.key(); + const auto & value = key_value.value(); + if (!key.has_string() || !value.has_string()) + continue; + + if (!key.string().starts_with(CH_BACKEND_CONF_PREFIX)) + continue; + + ch_backend_conf[key.string()] = value.string(); + } + } while (false); + + std::string ch_runtime_conf_file = std::string(CH_BACKEND_CONF_PREFIX) + "." + std::string(CH_RUNTIME_CONF) + ".conf_file"; + if (!ch_backend_conf.count(ch_runtime_conf_file)) + { + /// Try to get config path from environment variable + const char * config_path = std::getenv("CLICKHOUSE_BACKEND_CONFIG"); + if (config_path) + { + ch_backend_conf[ch_runtime_conf_file] = config_path; + } + } + return ch_backend_conf; +} + +void initCHRuntimeConfig(const std::map & conf) +{} -void init() +void init(const std::string & plan) { static std::once_flag init_flag; std::call_once( init_flag, - []() + [&plan]() { + /// Load Config + std::map ch_backend_conf; + std::string ch_runtime_conf_prefix = std::string(CH_BACKEND_CONF_PREFIX) + "." + std::string(CH_RUNTIME_CONF); + std::string ch_runtime_conf_file = ch_runtime_conf_prefix + ".conf_file"; + if (!local_engine::SerializedPlanParser::config) + { + ch_backend_conf = getBackendConf(plan); + + /// If we have a configuration file, use it at first + if (ch_backend_conf.count(ch_runtime_conf_file)) + { + if (fs::exists(ch_runtime_conf_file) && fs::is_regular_file(ch_runtime_conf_file)) + { + DB::ConfigProcessor config_processor(ch_runtime_conf_file, false, true); + config_processor.setConfigPath(fs::path(ch_runtime_conf_file).parent_path()); + auto loaded_config = config_processor.loadConfig(false); + local_engine::SerializedPlanParser::config = loaded_config.configuration; + } + else + { + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "{} is not a valid configure file.", ch_runtime_conf_file); + } + } + else + { + local_engine::SerializedPlanParser::config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); + } + + /// Update specified settings + for (const auto & kv : ch_backend_conf) + { + if (kv.first.starts_with(ch_runtime_conf_prefix) && kv.first != ch_runtime_conf_file) + { + /// Notice, you can set a conf by setString(), but get it by getInt() + local_engine::SerializedPlanParser::config->setString( + kv.first.substr(ch_runtime_conf_prefix.size() + 1), kv.second); + } + } + } + + /// Initialize Loggers + auto & config = local_engine::SerializedPlanParser::config; + auto level = config->getString("logger.level", "trace"); + if (config->has("logger.log")) + { + local_engine::Logger::initFileLogger(*config, "ClickHouseBackend"); + } + else + { + local_engine::Logger::initConsoleLogger(level); + } + LOG_INFO(&Poco::Logger::get("ClickHouseBackend"), "Init logger."); + + /// Initialize settings + const std::string prefix = "local_engine."; + auto settings = Settings(); + if (config->has(prefix + "settings")) + { + settings.loadSettingsFromConfig(prefix + "settings", *config); + } + settings.set("join_use_nulls", true); + LOG_INFO(&Poco::Logger::get("ClickHouseBackend"), "Init settings."); + + /// Initialize global context + if (!local_engine::SerializedPlanParser::global_context) + { + local_engine::SerializedPlanParser::shared_context = SharedContextHolder(Context::createShared()); + local_engine::SerializedPlanParser::global_context + = Context::createGlobal(local_engine::SerializedPlanParser::shared_context.get()); + local_engine::SerializedPlanParser::global_context->makeGlobalContext(); + local_engine::SerializedPlanParser::global_context->setConfig(config); + local_engine::SerializedPlanParser::global_context->setSettings(settings); + + auto path = config->getString("path", "/"); + local_engine::SerializedPlanParser::global_context->setPath(path); + LOG_INFO(&Poco::Logger::get("ClickHouseBackend"), "Init global context."); + } + registerAllFunctions(); - local_engine::Logger::initConsoleLogger(); + LOG_INFO(&Poco::Logger::get("ClickHouseBackend"), "Register all functions."); + #if USE_EMBEDDED_COMPILER /// 128 MB constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128; - constexpr size_t compiled_expression_cache_elements_size_default = 10000; CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size_default, compiled_expression_cache_size_default); + LOG_INFO(&Poco::Logger::get("ClickHouseBackend"), "Init compiled expressions cache factory."); #endif - }); - - static std::mutex context_lock; - - { - std::lock_guard lock(context_lock); - if (!local_engine::SerializedPlanParser::global_context) - { - local_engine::SerializedPlanParser::shared_context = SharedContextHolder(Context::createShared()); - local_engine::SerializedPlanParser::global_context - = Context::createGlobal(local_engine::SerializedPlanParser::shared_context.get()); - local_engine::SerializedPlanParser::global_context->makeGlobalContext(); - local_engine::SerializedPlanParser::global_context->setSetting("join_use_nulls", true); - local_engine::SerializedPlanParser::global_context->setConfig(local_engine::SerializedPlanParser::config); - local_engine::SerializedPlanParser::global_context->setPath("/"); } - } + + ); } -char * createExecutor(std::string plan_string) +char * createExecutor(const std::string & plan_string) { auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); local_engine::SerializedPlanParser parser(context); auto query_plan = parser.parse(plan_string); local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); executor->execute(std::move(query_plan)); - return reinterpret_cast(executor); + return reinterpret_cast(executor); } bool executorHasNext(char * executor_address) diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index 7d32c01d2a89..075a89ed9b1d 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -806,8 +806,7 @@ NamesAndTypesList SerializedPlanParser::blockToNameAndTypeList(const Block & hea return types; } -/// TODO spark中cast会被转化为Expression_Cast,而不是Expression_ScalarFunction -std::string SerializedPlanParser::getFunctionName(std::string function_signature, const substrait::Expression_ScalarFunction & function) +std::string SerializedPlanParser::getFunctionName(const std::string & function_signature, const substrait::Expression_ScalarFunction & function) { const auto & output_type = function.output_type(); auto args = function.arguments(); @@ -1365,7 +1364,7 @@ const ActionsDAG::Node * SerializedPlanParser::parseArgument(ActionsDAGPtr actio } } -QueryPlanPtr SerializedPlanParser::parse(std::string & plan) +QueryPlanPtr SerializedPlanParser::parse(const std::string & plan) { auto plan_ptr = std::make_unique(); plan_ptr->ParseFromString(plan); @@ -1382,7 +1381,7 @@ SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : contex } ContextMutablePtr SerializedPlanParser::global_context = nullptr; -Context::ConfigurationPtr SerializedPlanParser::config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); +Context::ConfigurationPtr SerializedPlanParser::config = nullptr; void SerializedPlanParser::collectJoinKeys( const substrait::Expression & condition, std::vector> & join_keys, int32_t right_key_start) diff --git a/utils/local-engine/Parser/SerializedPlanParser.h b/utils/local-engine/Parser/SerializedPlanParser.h index b7ddb02628ab..22a2f5428f39 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.h +++ b/utils/local-engine/Parser/SerializedPlanParser.h @@ -83,7 +83,7 @@ class SerializedPlanParser public: explicit SerializedPlanParser(const ContextPtr & context); static void initFunctionEnv(); - DB::QueryPlanPtr parse(std::string & plan); + DB::QueryPlanPtr parse(const std::string& plan); DB::QueryPlanPtr parse(std::unique_ptr plan); DB::QueryPlanPtr parseReadRealWithLocalFile(const substrait::ReadRel & rel); @@ -107,8 +107,9 @@ class SerializedPlanParser void collectJoinKeys(const substrait::Expression & condition, std::vector> & join_keys, int32_t right_key_start); DB::QueryPlanPtr parseJoin(substrait::JoinRel join, DB::QueryPlanPtr left, DB::QueryPlanPtr right); + static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols); - static std::string getFunctionName(std::string function_sig, const substrait::Expression_ScalarFunction & function); + static std::string getFunctionName(const std::string & function_sig, const substrait::Expression_ScalarFunction & function); DB::ActionsDAGPtr parseFunction( const DataStream & input, const substrait::Expression & rel, @@ -129,7 +130,7 @@ class SerializedPlanParser // remove nullable after isNotNull void removeNullable(std::vector require_columns, ActionsDAGPtr actionsDag); void wrapNullable(std::vector columns, ActionsDAGPtr actionsDag); - std::string getUniqueName(std::string name) { return name + "_" + std::to_string(name_no++); } + std::string getUniqueName(const std::string & name) { return name + "_" + std::to_string(name_no++); } static Aggregator::Params getAggregateParam(const Block & header, const ColumnNumbers & keys, const AggregateDescriptions & aggregates) { diff --git a/utils/local-engine/Shuffle/NativeSplitter.cpp b/utils/local-engine/Shuffle/NativeSplitter.cpp index cb912af8093f..ea3e94a82582 100644 --- a/utils/local-engine/Shuffle/NativeSplitter.cpp +++ b/utils/local-engine/Shuffle/NativeSplitter.cpp @@ -140,7 +140,7 @@ int64_t NativeSplitter::inputNext() } return result; } -std::unique_ptr NativeSplitter::create(std::string short_name, Options options_, jobject input) +std::unique_ptr NativeSplitter::create(const std::string & short_name, Options options_, jobject input) { if (short_name == "rr") { diff --git a/utils/local-engine/Shuffle/NativeSplitter.h b/utils/local-engine/Shuffle/NativeSplitter.h index 4f219ecb53f2..5cf8aac5fe2b 100644 --- a/utils/local-engine/Shuffle/NativeSplitter.h +++ b/utils/local-engine/Shuffle/NativeSplitter.h @@ -24,7 +24,7 @@ class NativeSplitter : BlockIterator static jclass iterator_class; static jmethodID iterator_has_next; static jmethodID iterator_next; - static std::unique_ptr create(std::string short_name, Options options, jobject input); + static std::unique_ptr create(const std::string & short_name, Options options, jobject input); NativeSplitter(Options options, jobject input); bool hasNext(); diff --git a/utils/local-engine/Shuffle/ShuffleSplitter.cpp b/utils/local-engine/Shuffle/ShuffleSplitter.cpp index 287a0f1a3a61..613c6a06fc3d 100644 --- a/utils/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/utils/local-engine/Shuffle/ShuffleSplitter.cpp @@ -150,7 +150,7 @@ ShuffleSplitter::ShuffleSplitter(SplitOptions && options_) : options(options_) init(); } -ShuffleSplitter::Ptr ShuffleSplitter::create(std::string short_name, SplitOptions options_) +ShuffleSplitter::Ptr ShuffleSplitter::create(const std::string & short_name, SplitOptions options_) { if (short_name == "rr") { diff --git a/utils/local-engine/Shuffle/ShuffleSplitter.h b/utils/local-engine/Shuffle/ShuffleSplitter.h index d0b991e0d9fb..da82d7c4a245 100644 --- a/utils/local-engine/Shuffle/ShuffleSplitter.h +++ b/utils/local-engine/Shuffle/ShuffleSplitter.h @@ -55,7 +55,7 @@ class ShuffleSplitter public: static const std::vector compress_methods; using Ptr = std::unique_ptr; - static Ptr create(std::string short_name, SplitOptions options_); + static Ptr create(const std::string & short_name, SplitOptions options_); explicit ShuffleSplitter(SplitOptions && options); virtual ~ShuffleSplitter() { diff --git a/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index 1a9d9468244b..c460369cdbf6 100644 --- a/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/utils/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -65,6 +65,10 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder Poco::URI file_uri(file_info.uri_file()); std::unique_ptr read_buffer; /// Need to set "hdfs.libhdfs3_conf" in global settings + if (context->getConfigRef().getString("hdfs.libhdfs3_conf", "").empty()) + { + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Not found hdfs.libhdfs3_conf"); + } read_buffer = std::make_unique( "hdfs://" + file_uri.getHost(), file_uri.getPath(), context->getGlobalContext()->getConfigRef()); return read_buffer; diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index 461970fb0860..7ebd8dd1e256 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -80,8 +80,8 @@ extern "C" { #endif extern void registerAllFunctions(); -extern void init(); -extern char * createExecutor(std::string plan_string); +extern void init(const std::string &); +extern char * createExecutor(const std::string &); namespace dbms { @@ -164,10 +164,14 @@ void JNI_OnUnload(JavaVM * vm, void * /*reserved*/) local_engine::BroadCastJoinBuilder::clean(); } -void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray) +void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray plan) { LOCAL_ENGINE_JNI_METHOD_START - init(); + jsize plan_buf_size = env->GetArrayLength(plan); + jbyte * plan_buf_addr = env->GetByteArrayElements(plan, nullptr); + std::string plan_str; + plan_str.assign(reinterpret_cast(plan_buf_addr), plan_buf_size); + init(plan_str); LOCAL_ENGINE_JNI_METHOD_END(env, ) }