Skip to content

Commit

Permalink
add context clean when unload lib (#40)
Browse files Browse the repository at this point in the history
* add context clean and refactor jni code

* change init

* add broadcast clean
  • Loading branch information
liuneng1994 authored Jul 14, 2022
1 parent c695853 commit 7abd83d
Show file tree
Hide file tree
Showing 21 changed files with 204 additions and 145 deletions.
4 changes: 0 additions & 4 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3197,9 +3197,5 @@ ReadSettings Context::getReadSettings() const

return res;
}
void Context::setBackgroundExecutorsInitialized(bool initialized)
{
shared->is_background_executors_initialized = initialized;
}

}
3 changes: 0 additions & 3 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -892,9 +892,6 @@ class Context: public std::enable_shared_from_this<Context>
/** Get settings for reading from filesystem. */
ReadSettings getReadSettings() const;

/** Used to disable global context initialized. */
void setBackgroundExecutorsInitialized(bool initialized);

private:
std::unique_lock<std::recursive_mutex> getLock() const;

Expand Down
9 changes: 9 additions & 0 deletions utils/local-engine/Builder/BroadCastJoinBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,14 @@ void BroadCastJoinBuilder::buildJoinIfNotExist(
ColumnsDescription columns_description(header.getNamesAndTypesList());
buildJoinIfNotExist(key, std::move(read_buffer), key_names, kind, strictness, columns_description);
}
void BroadCastJoinBuilder::clean()
{
storage_join_lock.clear();
storage_join_map.clear();
while (!storage_join_queue.empty())
{
storage_join_queue.pop();
}
}

}
2 changes: 2 additions & 0 deletions utils/local-engine/Builder/BroadCastJoinBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class BroadCastJoinBuilder

static std::shared_ptr<StorageJoinFromReadBuffer> getJoin(const std::string & key);

static void clean();

private:
static std::queue<std::string> storage_join_queue;
static std::unordered_map<std::string, std::shared_ptr<StorageJoinFromReadBuffer>> storage_join_map;
Expand Down
31 changes: 31 additions & 0 deletions utils/local-engine/Common/JNIUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include "JNIUtils.h"

namespace local_engine
{

JavaVM * JNIUtils::vm = nullptr;

JNIEnv * JNIUtils::getENV(int * attach)
{
if (vm == nullptr) return nullptr;

*attach = 0;
JNIEnv *jni_env = nullptr;

int status = vm->GetEnv(reinterpret_cast<void **>(&jni_env), JNI_VERSION_1_8);

if (status == JNI_EDETACHED || jni_env == nullptr) {
status = vm->AttachCurrentThread(reinterpret_cast<void **>(&jni_env), nullptr);
if (status < 0) {
jni_env = nullptr;
} else {
*attach = 1;
}
}
return jni_env;
}
void JNIUtils::detachCurrentThread()
{
vm->DetachCurrentThread();
}
}
17 changes: 17 additions & 0 deletions utils/local-engine/Common/JNIUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once
#include <jni.h>

namespace local_engine
{
class JNIUtils
{
public:
static JavaVM * vm;

static JNIEnv * getENV(int *attach);

static void detachCurrentThread();
};
}


42 changes: 27 additions & 15 deletions utils/local-engine/Common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,35 @@ void registerAllFunctions()

void init()
{
registerAllFunctions();
local_engine::SerializedPlanParser::shared_context = SharedContextHolder(Context::createShared());
local_engine::SerializedPlanParser::global_context = Context::createGlobal(local_engine::SerializedPlanParser::shared_context.get());
// disable global context initialized
local_engine::SerializedPlanParser::global_context->setBackgroundExecutorsInitialized(true);
local_engine::SerializedPlanParser::global_context->makeGlobalContext();
local_engine::SerializedPlanParser::global_context->setSetting("join_use_nulls", true);
local_engine::SerializedPlanParser::global_context->setConfig(local_engine::SerializedPlanParser::config);
local_engine::SerializedPlanParser::global_context->setPath("/");
local_engine::Logger::initConsoleLogger();

static std::once_flag init_flag;
std::call_once(init_flag, [](){
registerAllFunctions();
local_engine::Logger::initConsoleLogger();
#if USE_EMBEDDED_COMPILER
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size_default, compiled_expression_cache_size_default);
/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size_default, compiled_expression_cache_size_default);
#endif
});

static std::mutex context_lock;

{
std::lock_guard lock(context_lock);
if (!local_engine::SerializedPlanParser::global_context)
{
local_engine::SerializedPlanParser::shared_context = SharedContextHolder(Context::createShared());
local_engine::SerializedPlanParser::global_context
= Context::createGlobal(local_engine::SerializedPlanParser::shared_context.get());
// disable global context initialized
// local_engine::SerializedPlanParser::global_context->setBackgroundExecutorsInitialized(true);
local_engine::SerializedPlanParser::global_context->makeGlobalContext();
local_engine::SerializedPlanParser::global_context->setSetting("join_use_nulls", true);
local_engine::SerializedPlanParser::global_context->setConfig(local_engine::SerializedPlanParser::config);
local_engine::SerializedPlanParser::global_context->setPath("/");
}
}
}

char * createExecutor(std::string plan_string)
Expand Down
2 changes: 1 addition & 1 deletion utils/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithJavaIter(const substrait::Re
auto pos = iter.find(':');
auto iter_index = std::stoi(iter.substr(pos + 1, iter.size()));
auto plan = std::make_unique<QueryPlan>();
auto source = std::make_shared<SourceFromJavaIter>(parseNameStruct(rel.base_schema()), input_iters[iter_index], vm);
auto source = std::make_shared<SourceFromJavaIter>(parseNameStruct(rel.base_schema()), input_iters[iter_index]);
QueryPlanStepPtr source_step = std::make_unique<ReadFromPreparedSource>(Pipe(source), context);
plan->addStep(std::move(source_step));
return plan;
Expand Down
7 changes: 0 additions & 7 deletions utils/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ class SerializedPlanParser
input_iters.emplace_back(iter);
}

void setJavaVM(JavaVM * vm_)
{
vm = vm_;
}

static ContextMutablePtr global_context;
static Context::ConfigurationPtr config;
static SharedContextHolder shared_context;
Expand Down Expand Up @@ -146,8 +141,6 @@ class SerializedPlanParser
std::unordered_map<std::string, std::string> function_mapping;
std::vector<jobject> input_iters;
ContextPtr context;
JavaVM* vm;


// DB::QueryPlanPtr query_plan;

Expand Down
50 changes: 28 additions & 22 deletions utils/local-engine/Shuffle/NativeSplitter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "NativeSplitter.h"
#include <Functions/FunctionFactory.h>
#include <Parser/SerializedPlanParser.h>
#include <Common/JNIUtils.h>


using namespace DB;
Expand Down Expand Up @@ -48,29 +49,31 @@ void NativeSplitter::split(DB::Block & block)
}
}

NativeSplitter::NativeSplitter(Options options_, jobject input_, JavaVM * vm_) : options(options_), vm(vm_)
NativeSplitter::NativeSplitter(Options options_, jobject input_) : options(options_)
{
JNIEnv * env;
if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK)
{
throwError("get env error");
}
int attached;
JNIEnv * env = JNIUtils::getENV(&attached);
input = env->NewGlobalRef(input_);
partition_ids.reserve(options.buffer_size);
partition_buffer.reserve(options.partition_nums);
for (size_t i = 0; i < options.partition_nums; ++i)
{
partition_buffer.emplace_back(std::make_shared<ColumnsBuffer>());
}
if (attached)
{
JNIUtils::detachCurrentThread();
}
}
NativeSplitter::~NativeSplitter()
{
JNIEnv * env;
if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK)
int attached;
JNIEnv * env = JNIUtils::getENV(&attached);
env->DeleteGlobalRef(input);
if (attached)
{
throwError("get env error");
JNIUtils::detachCurrentThread();
}
env->DeleteGlobalRef(input);
}
bool NativeSplitter::hasNext()
{
Expand Down Expand Up @@ -114,38 +117,41 @@ int32_t NativeSplitter::nextPartitionId()

bool NativeSplitter::inputHasNext()
{
JNIEnv * env;
if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK)
int attached;
JNIEnv * env = JNIUtils::getENV(&attached);
bool next = env->CallBooleanMethod(input, iterator_has_next);
if (attached)
{
throwError("get env error");
JNIUtils::detachCurrentThread();
}
bool next = env->CallBooleanMethod(input, iterator_has_next);
return next;
}

int64_t NativeSplitter::inputNext()
{
JNIEnv * env;
if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK)
int attached;
JNIEnv * env = JNIUtils::getENV(&attached);
int64_t result = env->CallLongMethod(input, iterator_next);
if (attached)
{
throwError("get env error");
JNIUtils::detachCurrentThread();
}
return env->CallLongMethod(input, iterator_next);
return result;
}
std::unique_ptr<NativeSplitter> NativeSplitter::create(std::string short_name, Options options_, jobject input, JavaVM * vm)
std::unique_ptr<NativeSplitter> NativeSplitter::create(std::string short_name, Options options_, jobject input)
{
if (short_name == "rr")
{
return std::make_unique<RoundRobinNativeSplitter>(options_, input, vm);
return std::make_unique<RoundRobinNativeSplitter>(options_, input);
}
else if (short_name == "hash")
{
return std::make_unique<HashNativeSplitter>(options_, input, vm);
return std::make_unique<HashNativeSplitter>(options_, input);
}
else if (short_name == "single")
{
options_.partition_nums = 1;
return std::make_unique<RoundRobinNativeSplitter>(options_, input, vm);
return std::make_unique<RoundRobinNativeSplitter>(options_, input);
}
else
{
Expand Down
9 changes: 4 additions & 5 deletions utils/local-engine/Shuffle/NativeSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class NativeSplitter
static jclass iterator_class;
static jmethodID iterator_has_next;
static jmethodID iterator_next;
static std::unique_ptr<NativeSplitter> create(std::string short_name, Options options, jobject input, JavaVM * vm);
static std::unique_ptr<NativeSplitter> create(std::string short_name, Options options, jobject input);

NativeSplitter(Options options, jobject input, JavaVM * vm);
NativeSplitter(Options options, jobject input);
bool hasNext();
DB::Block * next();
int32_t nextPartitionId();
Expand All @@ -50,15 +50,14 @@ class NativeSplitter
int32_t next_partition_id = -1;
DB::Block * next_block = nullptr;
jobject input;
JavaVM * vm;
};

class HashNativeSplitter : public NativeSplitter
{
void computePartitionId(DB::Block & block) override;

public:
HashNativeSplitter(NativeSplitter::Options options_, jobject input, JavaVM * vm) : NativeSplitter(options_, input, vm) { }
HashNativeSplitter(NativeSplitter::Options options_, jobject input) : NativeSplitter(options_, input) { }

private:
DB::FunctionBasePtr hash_function;
Expand All @@ -69,7 +68,7 @@ class RoundRobinNativeSplitter : public NativeSplitter
void computePartitionId(DB::Block & block) override;

public:
RoundRobinNativeSplitter(NativeSplitter::Options options_, jobject input, JavaVM * vm) : NativeSplitter(options_, input, vm) { }
RoundRobinNativeSplitter(NativeSplitter::Options options_, jobject input) : NativeSplitter(options_, input) { }

private:
int32_t pid_selection = 0;
Expand Down
28 changes: 16 additions & 12 deletions utils/local-engine/Shuffle/ShuffleReader.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#include "ShuffleReader.h"
#include <Shuffle/ShuffleSplitter.h>
#include <Common/DebugUtils.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/JNIUtils.h>

using namespace DB;

Expand Down Expand Up @@ -40,7 +39,6 @@ ShuffleReader::~ShuffleReader()
input_stream.reset();
}

thread_local JNIEnv * ShuffleReader::env = nullptr;
jclass ShuffleReader::input_stream_class = nullptr;
jmethodID ShuffleReader::input_stream_read = nullptr;

Expand All @@ -55,17 +53,22 @@ bool ReadBufferFromJavaInputStream::nextImpl()
}
int ReadBufferFromJavaInputStream::readFromJava()
{
assert(ShuffleReader::env != nullptr);
int attached;
JNIEnv * env = JNIUtils::getENV(&attached);
if (buf == nullptr)
{
jbyteArray local_buf = ShuffleReader::env->NewByteArray(4096);
buf = static_cast<jbyteArray>(ShuffleReader::env->NewGlobalRef(local_buf));
ShuffleReader::env->DeleteLocalRef(local_buf);
jbyteArray local_buf = env->NewByteArray(4096);
buf = static_cast<jbyteArray>(env->NewGlobalRef(local_buf));
env->DeleteLocalRef(local_buf);
}
jint count = ShuffleReader::env->CallIntMethod(java_in, ShuffleReader::input_stream_read, buf);
jint count = env->CallIntMethod(java_in, ShuffleReader::input_stream_read, buf);
if (count > 0)
{
ShuffleReader::env->GetByteArrayRegion(buf, 0, count, reinterpret_cast<jbyte *>(internal_buffer.begin()));
env->GetByteArrayRegion(buf, 0, count, reinterpret_cast<jbyte *>(internal_buffer.begin()));
}
if (attached)
{
JNIUtils::detachCurrentThread();
}
return count;
}
Expand All @@ -74,11 +77,12 @@ ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject input_strea
}
ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream()
{
assert(ShuffleReader::env != nullptr);
ShuffleReader::env->DeleteGlobalRef(java_in);
int attached;
JNIEnv * env = JNIUtils::getENV(&attached);
env->DeleteGlobalRef(java_in);
if (buf != nullptr)
{
ShuffleReader::env->DeleteGlobalRef(buf);
env->DeleteGlobalRef(buf);
}
}

Expand Down
1 change: 0 additions & 1 deletion utils/local-engine/Shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class ShuffleReader
explicit ShuffleReader(std::unique_ptr<DB::ReadBuffer> in_, bool compressed);
DB::Block* read();
~ShuffleReader();
static thread_local JNIEnv * env;
static jclass input_stream_class;
static jmethodID input_stream_read;
std::unique_ptr<DB::ReadBuffer> in;
Expand Down
4 changes: 2 additions & 2 deletions utils/local-engine/Shuffle/ShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ using namespace DB;
namespace local_engine
{

ShuffleWriter::ShuffleWriter(JavaVM * vm, jobject output_stream, jbyteArray buffer)
ShuffleWriter::ShuffleWriter(jobject output_stream, jbyteArray buffer)
{
write_buffer = std::make_unique<WriteBufferFromJavaOutputStream>(vm, output_stream, buffer);
write_buffer = std::make_unique<WriteBufferFromJavaOutputStream>(output_stream, buffer);
}
void ShuffleWriter::write(const Block & block)
{
Expand Down
Loading

0 comments on commit 7abd83d

Please sign in to comment.