Skip to content

Commit

Permalink
fix crash when driver send shutdown command to executor
Browse files Browse the repository at this point in the history
  • Loading branch information
taiyang-li committed Jun 18, 2024
1 parent e4388e6 commit 4f4c946
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
7 changes: 5 additions & 2 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
size_t index_uncompressed_cache_size = config->getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
double index_uncompressed_cache_size_ratio = config->getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);

String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
Expand Down Expand Up @@ -917,7 +917,10 @@ void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context,

void BackendFinalizerUtil::finalizeGlobally()
{
// Make sure client caches release before ClientCacheRegistry
/// Make sure that all active LocalExecutor stop before spark executor shutdown, otherwise crash map happen.
LocalExecutor::cancelAll();

/// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
auto & global_context = SerializedPlanParser::global_context;
Expand Down
3 changes: 3 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,9 @@ void SerializedPlanParser::wrapNullable(

SharedContextHolder SerializedPlanParser::shared_context;

std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
std::mutex LocalExecutor::executors_mutex;

LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
Expand Down
47 changes: 47 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,50 @@ class LocalExecutor : public BlockIterator
RelMetricPtr getMetric() const { return metric; }
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

static void cancelAll()
{
std::lock_guard lock{executors_mutex};
for (auto & [handle, executor] : executors)
{
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel remaining LocalExecutor {}", handle);
executor->cancel();
}

/// Wait for all remaining LocalExecutor to be cancelled
for (auto & [handle, executor] : executors)
{
if (executor->executor)
{
Stopwatch watch;

Chunk chunk;
while (executor->executor->pull(chunk))
;

LOG_INFO(
&Poco::Logger::get("LocalExecutor"),
"Finish cancel remaining LocalExecutor {}, takes {} ms",
handle,
watch.elapsedMilliseconds());
}
}

}

static void addExecutor(LocalExecutor * executor)
{
std::lock_guard lock{executors_mutex};
Int64 handle = reinterpret_cast<Int64>(executor);
executors.emplace(handle, executor);
}

static void removeExecutor(Int64 handle)
{
std::lock_guard lock{executors_mutex};
executors.erase(handle);
}

private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);

Expand All @@ -432,6 +476,9 @@ class LocalExecutor : public BlockIterator
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;

/// Record all active LocalExecutor in current executor to cancel them when executor receives shutdown command from driver.
static std::unordered_map<Int64, LocalExecutor *> executors;
static std::mutex executors_mutex;
};


Expand Down
5 changes: 5 additions & 0 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(query_context);
local_engine::LocalExecutor::addExecutor(executor);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
Expand Down Expand Up @@ -315,6 +316,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
Expand All @@ -324,6 +326,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIE
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
delete executor;
Expand Down Expand Up @@ -1331,6 +1334,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(context);
local_engine::LocalExecutor::addExecutor(executor);
executor->execute(std::move(query_plan));
env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
Expand All @@ -1340,6 +1344,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(instance);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
Expand Down

0 comments on commit 4f4c946

Please sign in to comment.