diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7ca88cd3fae2..ac1bfc620b02 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3197,9 +3197,5 @@ ReadSettings Context::getReadSettings() const return res; } -void Context::setBackgroundExecutorsInitialized(bool initialized) -{ - shared->is_background_executors_initialized = initialized; -} } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index e6d3c14eb1b3..c3615db90682 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -892,9 +892,6 @@ class Context: public std::enable_shared_from_this /** Get settings for reading from filesystem. */ ReadSettings getReadSettings() const; - /** Used to disable global context initialized. */ - void setBackgroundExecutorsInitialized(bool initialized); - private: std::unique_lock getLock() const; diff --git a/utils/local-engine/Builder/BroadCastJoinBuilder.cpp b/utils/local-engine/Builder/BroadCastJoinBuilder.cpp index ca55b8625faa..82df86c21a76 100644 --- a/utils/local-engine/Builder/BroadCastJoinBuilder.cpp +++ b/utils/local-engine/Builder/BroadCastJoinBuilder.cpp @@ -127,5 +127,14 @@ void BroadCastJoinBuilder::buildJoinIfNotExist( ColumnsDescription columns_description(header.getNamesAndTypesList()); buildJoinIfNotExist(key, std::move(read_buffer), key_names, kind, strictness, columns_description); } +void BroadCastJoinBuilder::clean() +{ + storage_join_lock.clear(); + storage_join_map.clear(); + while (!storage_join_queue.empty()) + { + storage_join_queue.pop(); + } +} } diff --git a/utils/local-engine/Builder/BroadCastJoinBuilder.h b/utils/local-engine/Builder/BroadCastJoinBuilder.h index 6eb3a14d038d..4a7982d8b41a 100644 --- a/utils/local-engine/Builder/BroadCastJoinBuilder.h +++ b/utils/local-engine/Builder/BroadCastJoinBuilder.h @@ -24,6 +24,8 @@ class BroadCastJoinBuilder static std::shared_ptr getJoin(const std::string & key); + static void clean(); + private: static std::queue storage_join_queue; static std::unordered_map> storage_join_map; diff --git a/utils/local-engine/Common/JNIUtils.cpp b/utils/local-engine/Common/JNIUtils.cpp new file mode 100644 index 000000000000..d1538a5c4a02 --- /dev/null +++ b/utils/local-engine/Common/JNIUtils.cpp @@ -0,0 +1,31 @@ +#include "JNIUtils.h" + +namespace local_engine +{ + +JavaVM * JNIUtils::vm = nullptr; + +JNIEnv * JNIUtils::getENV(int * attach) +{ + if (vm == nullptr) return nullptr; + + *attach = 0; + JNIEnv *jni_env = nullptr; + + int status = vm->GetEnv(reinterpret_cast(&jni_env), JNI_VERSION_1_8); + + if (status == JNI_EDETACHED || jni_env == nullptr) { + status = vm->AttachCurrentThread(reinterpret_cast(&jni_env), nullptr); + if (status < 0) { + jni_env = nullptr; + } else { + *attach = 1; + } + } + return jni_env; +} +void JNIUtils::detachCurrentThread() +{ + vm->DetachCurrentThread(); +} +} diff --git a/utils/local-engine/Common/JNIUtils.h b/utils/local-engine/Common/JNIUtils.h new file mode 100644 index 000000000000..76237c86c8ac --- /dev/null +++ b/utils/local-engine/Common/JNIUtils.h @@ -0,0 +1,17 @@ +#pragma once +#include + +namespace local_engine +{ +class JNIUtils +{ +public: + static JavaVM * vm; + + static JNIEnv * getENV(int *attach); + + static void detachCurrentThread(); +}; +} + + diff --git a/utils/local-engine/Common/common.cpp b/utils/local-engine/Common/common.cpp index 910fb69f6518..c285bd30fa24 100644 --- a/utils/local-engine/Common/common.cpp +++ b/utils/local-engine/Common/common.cpp @@ -22,23 +22,35 @@ void registerAllFunctions() void init() { - registerAllFunctions(); - local_engine::SerializedPlanParser::shared_context = SharedContextHolder(Context::createShared()); - local_engine::SerializedPlanParser::global_context = Context::createGlobal(local_engine::SerializedPlanParser::shared_context.get()); - // disable global context initialized - local_engine::SerializedPlanParser::global_context->setBackgroundExecutorsInitialized(true); - local_engine::SerializedPlanParser::global_context->makeGlobalContext(); - local_engine::SerializedPlanParser::global_context->setSetting("join_use_nulls", true); - local_engine::SerializedPlanParser::global_context->setConfig(local_engine::SerializedPlanParser::config); - local_engine::SerializedPlanParser::global_context->setPath("/"); - local_engine::Logger::initConsoleLogger(); - + static std::once_flag init_flag; + std::call_once(init_flag, [](){ + registerAllFunctions(); + local_engine::Logger::initConsoleLogger(); #if USE_EMBEDDED_COMPILER - /// 128 MB - constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128; - constexpr size_t compiled_expression_cache_elements_size_default = 10000; - CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size_default, compiled_expression_cache_size_default); + /// 128 MB + constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128; + constexpr size_t compiled_expression_cache_elements_size_default = 10000; + CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size_default, compiled_expression_cache_size_default); #endif + }); + + static std::mutex context_lock; + + { + std::lock_guard lock(context_lock); + if (!local_engine::SerializedPlanParser::global_context) + { + local_engine::SerializedPlanParser::shared_context = SharedContextHolder(Context::createShared()); + local_engine::SerializedPlanParser::global_context + = Context::createGlobal(local_engine::SerializedPlanParser::shared_context.get()); + // disable global context initialized + // local_engine::SerializedPlanParser::global_context->setBackgroundExecutorsInitialized(true); + local_engine::SerializedPlanParser::global_context->makeGlobalContext(); + local_engine::SerializedPlanParser::global_context->setSetting("join_use_nulls", true); + local_engine::SerializedPlanParser::global_context->setConfig(local_engine::SerializedPlanParser::config); + local_engine::SerializedPlanParser::global_context->setPath("/"); + } + } } char * createExecutor(std::string plan_string) diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index 5709d86a57df..89e158f36609 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -233,7 +233,7 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithJavaIter(const substrait::Re auto pos = iter.find(':'); auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); auto plan = std::make_unique(); - auto source = std::make_shared(parseNameStruct(rel.base_schema()), input_iters[iter_index], vm); + auto source = std::make_shared(parseNameStruct(rel.base_schema()), input_iters[iter_index]); QueryPlanStepPtr source_step = std::make_unique(Pipe(source), context); plan->addStep(std::move(source_step)); return plan; diff --git a/utils/local-engine/Parser/SerializedPlanParser.h b/utils/local-engine/Parser/SerializedPlanParser.h index 4c76f570257c..1c1a2d6510b8 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.h +++ b/utils/local-engine/Parser/SerializedPlanParser.h @@ -89,11 +89,6 @@ class SerializedPlanParser input_iters.emplace_back(iter); } - void setJavaVM(JavaVM * vm_) - { - vm = vm_; - } - static ContextMutablePtr global_context; static Context::ConfigurationPtr config; static SharedContextHolder shared_context; @@ -146,8 +141,6 @@ class SerializedPlanParser std::unordered_map function_mapping; std::vector input_iters; ContextPtr context; - JavaVM* vm; - // DB::QueryPlanPtr query_plan; diff --git a/utils/local-engine/Shuffle/NativeSplitter.cpp b/utils/local-engine/Shuffle/NativeSplitter.cpp index 8c815daca7cd..2b7313ba2b9c 100644 --- a/utils/local-engine/Shuffle/NativeSplitter.cpp +++ b/utils/local-engine/Shuffle/NativeSplitter.cpp @@ -1,6 +1,7 @@ #include "NativeSplitter.h" #include #include +#include using namespace DB; @@ -48,13 +49,10 @@ void NativeSplitter::split(DB::Block & block) } } -NativeSplitter::NativeSplitter(Options options_, jobject input_, JavaVM * vm_) : options(options_), vm(vm_) +NativeSplitter::NativeSplitter(Options options_, jobject input_) : options(options_) { - JNIEnv * env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8) != JNI_OK) - { - throwError("get env error"); - } + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); input = env->NewGlobalRef(input_); partition_ids.reserve(options.buffer_size); partition_buffer.reserve(options.partition_nums); @@ -62,15 +60,20 @@ NativeSplitter::NativeSplitter(Options options_, jobject input_, JavaVM * vm_) : { partition_buffer.emplace_back(std::make_shared()); } + if (attached) + { + JNIUtils::detachCurrentThread(); + } } NativeSplitter::~NativeSplitter() { - JNIEnv * env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8) != JNI_OK) + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); + env->DeleteGlobalRef(input); + if (attached) { - throwError("get env error"); + JNIUtils::detachCurrentThread(); } - env->DeleteGlobalRef(input); } bool NativeSplitter::hasNext() { @@ -114,38 +117,41 @@ int32_t NativeSplitter::nextPartitionId() bool NativeSplitter::inputHasNext() { - JNIEnv * env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8) != JNI_OK) + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); + bool next = env->CallBooleanMethod(input, iterator_has_next); + if (attached) { - throwError("get env error"); + JNIUtils::detachCurrentThread(); } - bool next = env->CallBooleanMethod(input, iterator_has_next); return next; } int64_t NativeSplitter::inputNext() { - JNIEnv * env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8) != JNI_OK) + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); + int64_t result = env->CallLongMethod(input, iterator_next); + if (attached) { - throwError("get env error"); + JNIUtils::detachCurrentThread(); } - return env->CallLongMethod(input, iterator_next); + return result; } -std::unique_ptr NativeSplitter::create(std::string short_name, Options options_, jobject input, JavaVM * vm) +std::unique_ptr NativeSplitter::create(std::string short_name, Options options_, jobject input) { if (short_name == "rr") { - return std::make_unique(options_, input, vm); + return std::make_unique(options_, input); } else if (short_name == "hash") { - return std::make_unique(options_, input, vm); + return std::make_unique(options_, input); } else if (short_name == "single") { options_.partition_nums = 1; - return std::make_unique(options_, input, vm); + return std::make_unique(options_, input); } else { diff --git a/utils/local-engine/Shuffle/NativeSplitter.h b/utils/local-engine/Shuffle/NativeSplitter.h index b636e925d7ed..fc311d127312 100644 --- a/utils/local-engine/Shuffle/NativeSplitter.h +++ b/utils/local-engine/Shuffle/NativeSplitter.h @@ -23,9 +23,9 @@ class NativeSplitter static jclass iterator_class; static jmethodID iterator_has_next; static jmethodID iterator_next; - static std::unique_ptr create(std::string short_name, Options options, jobject input, JavaVM * vm); + static std::unique_ptr create(std::string short_name, Options options, jobject input); - NativeSplitter(Options options, jobject input, JavaVM * vm); + NativeSplitter(Options options, jobject input); bool hasNext(); DB::Block * next(); int32_t nextPartitionId(); @@ -50,7 +50,6 @@ class NativeSplitter int32_t next_partition_id = -1; DB::Block * next_block = nullptr; jobject input; - JavaVM * vm; }; class HashNativeSplitter : public NativeSplitter @@ -58,7 +57,7 @@ class HashNativeSplitter : public NativeSplitter void computePartitionId(DB::Block & block) override; public: - HashNativeSplitter(NativeSplitter::Options options_, jobject input, JavaVM * vm) : NativeSplitter(options_, input, vm) { } + HashNativeSplitter(NativeSplitter::Options options_, jobject input) : NativeSplitter(options_, input) { } private: DB::FunctionBasePtr hash_function; @@ -69,7 +68,7 @@ class RoundRobinNativeSplitter : public NativeSplitter void computePartitionId(DB::Block & block) override; public: - RoundRobinNativeSplitter(NativeSplitter::Options options_, jobject input, JavaVM * vm) : NativeSplitter(options_, input, vm) { } + RoundRobinNativeSplitter(NativeSplitter::Options options_, jobject input) : NativeSplitter(options_, input) { } private: int32_t pid_selection = 0; diff --git a/utils/local-engine/Shuffle/ShuffleReader.cpp b/utils/local-engine/Shuffle/ShuffleReader.cpp index 627237e58f1f..b408df7a33fa 100644 --- a/utils/local-engine/Shuffle/ShuffleReader.cpp +++ b/utils/local-engine/Shuffle/ShuffleReader.cpp @@ -1,8 +1,7 @@ #include "ShuffleReader.h" -#include #include -#include #include +#include using namespace DB; @@ -40,7 +39,6 @@ ShuffleReader::~ShuffleReader() input_stream.reset(); } -thread_local JNIEnv * ShuffleReader::env = nullptr; jclass ShuffleReader::input_stream_class = nullptr; jmethodID ShuffleReader::input_stream_read = nullptr; @@ -55,17 +53,22 @@ bool ReadBufferFromJavaInputStream::nextImpl() } int ReadBufferFromJavaInputStream::readFromJava() { - assert(ShuffleReader::env != nullptr); + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); if (buf == nullptr) { - jbyteArray local_buf = ShuffleReader::env->NewByteArray(4096); - buf = static_cast(ShuffleReader::env->NewGlobalRef(local_buf)); - ShuffleReader::env->DeleteLocalRef(local_buf); + jbyteArray local_buf = env->NewByteArray(4096); + buf = static_cast(env->NewGlobalRef(local_buf)); + env->DeleteLocalRef(local_buf); } - jint count = ShuffleReader::env->CallIntMethod(java_in, ShuffleReader::input_stream_read, buf); + jint count = env->CallIntMethod(java_in, ShuffleReader::input_stream_read, buf); if (count > 0) { - ShuffleReader::env->GetByteArrayRegion(buf, 0, count, reinterpret_cast(internal_buffer.begin())); + env->GetByteArrayRegion(buf, 0, count, reinterpret_cast(internal_buffer.begin())); + } + if (attached) + { + JNIUtils::detachCurrentThread(); } return count; } @@ -74,11 +77,12 @@ ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject input_strea } ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream() { - assert(ShuffleReader::env != nullptr); - ShuffleReader::env->DeleteGlobalRef(java_in); + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); + env->DeleteGlobalRef(java_in); if (buf != nullptr) { - ShuffleReader::env->DeleteGlobalRef(buf); + env->DeleteGlobalRef(buf); } } diff --git a/utils/local-engine/Shuffle/ShuffleReader.h b/utils/local-engine/Shuffle/ShuffleReader.h index 5cd25fd19f53..1cd5beb5b7eb 100644 --- a/utils/local-engine/Shuffle/ShuffleReader.h +++ b/utils/local-engine/Shuffle/ShuffleReader.h @@ -14,7 +14,6 @@ class ShuffleReader explicit ShuffleReader(std::unique_ptr in_, bool compressed); DB::Block* read(); ~ShuffleReader(); - static thread_local JNIEnv * env; static jclass input_stream_class; static jmethodID input_stream_read; std::unique_ptr in; diff --git a/utils/local-engine/Shuffle/ShuffleWriter.cpp b/utils/local-engine/Shuffle/ShuffleWriter.cpp index cbaea30f6deb..42527b293a2e 100644 --- a/utils/local-engine/Shuffle/ShuffleWriter.cpp +++ b/utils/local-engine/Shuffle/ShuffleWriter.cpp @@ -5,9 +5,9 @@ using namespace DB; namespace local_engine { -ShuffleWriter::ShuffleWriter(JavaVM * vm, jobject output_stream, jbyteArray buffer) +ShuffleWriter::ShuffleWriter(jobject output_stream, jbyteArray buffer) { - write_buffer = std::make_unique(vm, output_stream, buffer); + write_buffer = std::make_unique(output_stream, buffer); } void ShuffleWriter::write(const Block & block) { diff --git a/utils/local-engine/Shuffle/ShuffleWriter.h b/utils/local-engine/Shuffle/ShuffleWriter.h index a1cf9fec4dde..91cc1d68ece8 100644 --- a/utils/local-engine/Shuffle/ShuffleWriter.h +++ b/utils/local-engine/Shuffle/ShuffleWriter.h @@ -7,7 +7,7 @@ namespace local_engine class ShuffleWriter { public: - ShuffleWriter(JavaVM * vm, jobject output_stream, jbyteArray buffer); + ShuffleWriter(jobject output_stream, jbyteArray buffer); virtual ~ShuffleWriter(); void write(const DB::Block & block); void flush(); diff --git a/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.cpp b/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.cpp index 5592db112d63..e7ba216b3fc4 100644 --- a/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.cpp +++ b/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.cpp @@ -1,4 +1,5 @@ #include "WriteBufferFromJavaOutputStream.h" +#include namespace local_engine { @@ -8,11 +9,8 @@ jmethodID WriteBufferFromJavaOutputStream::output_stream_flush = nullptr; void WriteBufferFromJavaOutputStream::nextImpl() { - JNIEnv * env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8) != JNI_OK) - { - throw std::runtime_error("get env error"); - } + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); size_t bytes_write = 0; while (offset() - bytes_write > 0) @@ -22,34 +20,43 @@ void WriteBufferFromJavaOutputStream::nextImpl() env->CallVoidMethod(output_stream, output_stream_write, buffer, 0, copy_num); bytes_write += copy_num; } -} -WriteBufferFromJavaOutputStream::WriteBufferFromJavaOutputStream(JavaVM * vm_, jobject output_stream_, jbyteArray buffer_) - : vm(vm_) -{ - JNIEnv * env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8) != JNI_OK) + if (attached) { - throw std::runtime_error("get env error"); + JNIUtils::detachCurrentThread(); } +} +WriteBufferFromJavaOutputStream::WriteBufferFromJavaOutputStream(jobject output_stream_, jbyteArray buffer_) +{ + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); buffer = static_cast(env->NewWeakGlobalRef(buffer_)); output_stream = env->NewWeakGlobalRef(output_stream_); buffer_size = env->GetArrayLength(buffer); + if (attached) + { + JNIUtils::detachCurrentThread(); + } } void WriteBufferFromJavaOutputStream::finalizeImpl() { next(); - JNIEnv * env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8) != JNI_OK) + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); + env->CallVoidMethod(output_stream, output_stream_flush); + if (attached) { - throw std::runtime_error("get env error"); + JNIUtils::detachCurrentThread(); } - env->CallVoidMethod(output_stream, output_stream_flush); } WriteBufferFromJavaOutputStream::~WriteBufferFromJavaOutputStream() { - JNIEnv * env; - vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8); + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); env->DeleteWeakGlobalRef(output_stream); env->DeleteWeakGlobalRef(buffer); + if (attached) + { + JNIUtils::detachCurrentThread(); + } } } diff --git a/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.h b/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.h index ae8c7dc5f800..ab1f3658d58e 100644 --- a/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.h +++ b/utils/local-engine/Shuffle/WriteBufferFromJavaOutputStream.h @@ -12,7 +12,7 @@ class WriteBufferFromJavaOutputStream : public DB::BufferWithOwnMemory -#include +#include +#include +#include namespace local_engine { @@ -10,9 +11,10 @@ jmethodID SourceFromJavaIter::serialized_record_batch_iterator_next = nullptr; DB::Chunk SourceFromJavaIter::generate() { - int attach; - JNIEnv * env = getENV(vm, &attach); + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); jboolean has_next = env->CallBooleanMethod(java_iter,serialized_record_batch_iterator_hasNext); + DB::Chunk result = {}; if (has_next) { jbyteArray block = static_cast(env->CallObjectMethod(java_iter, serialized_record_batch_iterator_next)); @@ -23,18 +25,23 @@ DB::Chunk SourceFromJavaIter::generate() info->is_overflows = data->info.is_overflows; info->bucket_num = data->info.bucket_num; chunk.setChunkInfo(info); - return chunk; + result = std::move(chunk); } - else + if (attached) { - return {}; + JNIUtils::detachCurrentThread(); } + return result; } SourceFromJavaIter::~SourceFromJavaIter() { - int attach; - JNIEnv * env = getENV(vm, &attach); + int attached; + JNIEnv * env = JNIUtils::getENV(&attached); env->DeleteGlobalRef(java_iter); + if (attached) + { + JNIUtils::detachCurrentThread(); + } } Int64 SourceFromJavaIter::byteArrayToLong(JNIEnv* env, jbyteArray arr) { diff --git a/utils/local-engine/Storages/SourceFromJavaIter.h b/utils/local-engine/Storages/SourceFromJavaIter.h index 127864cafa70..f4a163c78f28 100644 --- a/utils/local-engine/Storages/SourceFromJavaIter.h +++ b/utils/local-engine/Storages/SourceFromJavaIter.h @@ -13,38 +13,20 @@ class SourceFromJavaIter : public DB::ISource static Int64 byteArrayToLong(JNIEnv* env, jbyteArray arr); - static JNIEnv * getENV(JavaVM * vm, int *attach) { - if (vm == nullptr) return nullptr; - - *attach = 0; - JNIEnv *jni_env = nullptr; - - int status = vm->GetEnv(reinterpret_cast(&jni_env), JNI_VERSION_1_8); - - if (status == JNI_EDETACHED || jni_env == nullptr) { - status = vm->AttachCurrentThread(reinterpret_cast(&jni_env), nullptr); - if (status < 0) { - jni_env = nullptr; - } else { - *attach = 1; - } - } - return jni_env; - } - SourceFromJavaIter(DB::Block header, jobject java_iter_, JavaVM * vm_): DB::ISource(header), - java_iter(java_iter_), vm(vm_) + + SourceFromJavaIter(DB::Block header, jobject java_iter_): DB::ISource(header), + java_iter(java_iter_) { } String getName() const override { return "SourceFromJavaIter"; } - ~SourceFromJavaIter(); + ~SourceFromJavaIter() override; private: DB::Chunk generate() override; jobject java_iter; - JavaVM * vm; }; } diff --git a/utils/local-engine/build/build.sh b/utils/local-engine/build/build.sh index 4c53a4f02864..107c33aceeab 100755 --- a/utils/local-engine/build/build.sh +++ b/utils/local-engine/build/build.sh @@ -1 +1 @@ -sudo docker run --rm --volume="$2":/output --volume="$1":/clickhouse --volume=/tmp/.cache:/ccache -e ENABLE_EMBEDDED_COMPILER=OFF libchbuilder:1.0 \ No newline at end of file +sudo docker run --rm --volume="$2":/output --volume="$1":/clickhouse --volume=/tmp/.cache:/ccache -e ENABLE_EMBEDDED_COMPILER=ON libchbuilder:1.0 \ No newline at end of file diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index d2b9e17f4743..593b6f19f6f7 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include "jni_common.h" bool inside_main = true; @@ -32,11 +34,9 @@ namespace dbms static jclass spark_row_info_class; static jmethodID spark_row_info_constructor; -static jclass ch_column_batch_class; static jclass split_result_class; static jmethodID split_result_constructor; -static JavaVM * global_vm = nullptr; jint JNI_OnLoad(JavaVM * vm, void * reserved) { @@ -57,27 +57,28 @@ jint JNI_OnLoad(JavaVM * vm, void * reserved) split_result_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/SplitResult;"); split_result_constructor = GetMethodID(env, split_result_class, "", "(JJJJJJ[J[J)V"); - ch_column_batch_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/CHColumnVector;"); local_engine::ShuffleReader::input_stream_class = CreateGlobalClassReference(env, "Ljava/io/InputStream;"); + local_engine::NativeSplitter::iterator_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); + local_engine::WriteBufferFromJavaOutputStream::output_stream_class = CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); + local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class + = CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); + local_engine::ShuffleReader::input_stream_read = env->GetMethodID(local_engine::ShuffleReader::input_stream_class, "read", "([B)I"); - local_engine::NativeSplitter::iterator_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); local_engine::NativeSplitter::iterator_has_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); local_engine::NativeSplitter::iterator_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); - local_engine::WriteBufferFromJavaOutputStream::output_stream_class = CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); local_engine::WriteBufferFromJavaOutputStream::output_stream_write = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); local_engine::WriteBufferFromJavaOutputStream::output_stream_flush = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); - local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class - = CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); + local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); - global_vm = vm; + local_engine::JNIUtils::vm = vm; return JNI_VERSION_1_8; } @@ -85,7 +86,6 @@ void JNI_OnUnload(JavaVM * vm, void * reserved) { JNIEnv * env; vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8); - env->DeleteGlobalRef(io_exception_class); env->DeleteGlobalRef(runtime_exception_class); env->DeleteGlobalRef(unsupportedoperation_exception_class); @@ -96,6 +96,13 @@ void JNI_OnUnload(JavaVM * vm, void * reserved) env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class); env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class); + if (local_engine::SerializedPlanParser::global_context) + { + local_engine::SerializedPlanParser::global_context->shutdown(); + local_engine::SerializedPlanParser::global_context.reset(); + local_engine::SerializedPlanParser::shared_context.reset(); + } + local_engine::BroadCastJoinBuilder::clean(); } //static SharedContextHolder shared_context; @@ -103,8 +110,7 @@ void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNa { try { - static std::once_flag init_flag; - std::call_once(init_flag, [](){init();}); + init(); } catch (const DB::Exception & e) { @@ -138,7 +144,6 @@ jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreat { auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); local_engine::SerializedPlanParser parser(context); - parser.setJavaVM(global_vm); jsize iter_num = env->GetArrayLength(iter_arr); for (jsize i = 0; i < iter_num; i++) { @@ -496,10 +501,8 @@ jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * env, j { // try // { - local_engine::ShuffleReader::env = env; local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); Block * block = reader->read(); - local_engine::ShuffleReader::env = nullptr; return reinterpret_cast(block); // } // catch (DB::Exception & e) @@ -511,10 +514,8 @@ jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * env, j void Java_io_glutenproject_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject obj, jlong shuffle_reader) { - local_engine::ShuffleReader::env = env; local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); delete reader; - local_engine::ShuffleReader::env = nullptr; } // CHCoalesceOperator @@ -798,7 +799,6 @@ void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeClose(JNIEnv *, jo void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( JNIEnv * env, jobject, jstring hash_table_id_, jobject in, jstring join_key_, jstring join_type_, jbyteArray named_struct) { - local_engine::ShuffleReader::env = env; auto * input = env->NewGlobalRef(in); auto read_buffer = std::make_unique(input); auto hash_table_id = jstring2string(env, hash_table_id_); @@ -810,7 +810,6 @@ void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( struct_string.assign(reinterpret_cast(struct_address), struct_size); local_engine::BroadCastJoinBuilder::buildJoinIfNotExist(hash_table_id, std::move(read_buffer), join_key, join_type, struct_string); env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT); - local_engine::ShuffleReader::env = nullptr; } // BlockSplitIterator @@ -824,7 +823,7 @@ jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( Poco::StringTokenizer exprs(expr_str, ","); options.exprs.insert(options.exprs.end(), exprs.begin(), exprs.end()); local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{ - .splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in, global_vm)}; + .splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)}; return reinterpret_cast(splitter); } @@ -856,7 +855,7 @@ jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(J jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(JNIEnv * /*env*/, jobject, jobject output_stream, jbyteArray buffer) { - local_engine::ShuffleWriter * writer = new local_engine::ShuffleWriter(global_vm, output_stream, buffer); + local_engine::ShuffleWriter * writer = new local_engine::ShuffleWriter(output_stream, buffer); return reinterpret_cast(writer); }