Skip to content

Commit

Permalink
[GLUTEN-6122] Fix crash when driver send shutdown command to executor a…
Browse files Browse the repository at this point in the history
…pache#6130

What changes were proposed in this pull request?
Fix crash when driver send shutdown command to executor
(Fixes: apache#6122)
  • Loading branch information
taiyang-li authored and Deepa8 committed Jun 26, 2024
1 parent 0ef0bf1 commit 19324f3
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 7 deletions.
7 changes: 5 additions & 2 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
size_t index_uncompressed_cache_size = config->getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
double index_uncompressed_cache_size_ratio = config->getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);

String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
Expand Down Expand Up @@ -919,7 +919,10 @@ void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context,

void BackendFinalizerUtil::finalizeGlobally()
{
// Make sure client caches release before ClientCacheRegistry
/// Make sure that all active LocalExecutor stop before spark executor shutdown, otherwise crash map happen.
LocalExecutor::cancelAll();

/// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
auto & global_context = SerializedPlanParser::global_context;
Expand Down
56 changes: 55 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,33 @@ void SerializedPlanParser::wrapNullable(

SharedContextHolder SerializedPlanParser::shared_context;

std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
std::mutex LocalExecutor::executors_mutex;

void LocalExecutor::cancelAll()
{
std::lock_guard lock{executors_mutex};

for (auto & [handle, executor] : executors)
executor->asyncCancel();

for (auto & [handle, executor] : executors)
executor->waitCancelFinished();
}

void LocalExecutor::addExecutor(LocalExecutor * executor)
{
std::lock_guard lock{executors_mutex};
Int64 handle = reinterpret_cast<Int64>(executor);
executors.emplace(handle, executor);
}

void LocalExecutor::removeExecutor(Int64 handle)
{
std::lock_guard lock{executors_mutex};
executors.erase(handle);
}

LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
Expand Down Expand Up @@ -2183,8 +2210,35 @@ Block * LocalExecutor::nextColumnar()

void LocalExecutor::cancel()
{
if (executor)
asyncCancel();
waitCancelFinished();
}

void LocalExecutor::asyncCancel()
{
if (executor && !is_cancelled)
{
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor {}", reinterpret_cast<intptr_t>(this));
executor->cancel();
}
}

void LocalExecutor::waitCancelFinished()
{
if (executor && !is_cancelled)
{
Stopwatch watch;
Chunk chunk;
while (executor->pull(chunk))
;
is_cancelled = true;

LOG_INFO(
&Poco::Logger::get("LocalExecutor"),
"Finish cancel LocalExecutor {}, takes {} ms",
reinterpret_cast<intptr_t>(this),
watch.elapsedMilliseconds());
}
}

Block & LocalExecutor::getHeader()
Expand Down
14 changes: 13 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,17 +412,24 @@ class LocalExecutor : public BlockIterator
Block * nextColumnar();
bool hasNext();

/// Stop execution, used when task receives shutdown command or executor receives SIGTERM signal
/// Stop execution and wait for pipeline exit, used when task receives shutdown command or executor receives SIGTERM signal
void cancel();

Block & getHeader();
RelMetricPtr getMetric() const { return metric; }
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

static void cancelAll();
static void addExecutor(LocalExecutor * executor);
static void removeExecutor(Int64 handle);

private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);

void asyncCancel();
void waitCancelFinished();

/// Dump processor runtime information to log
std::string dumpPipeline();

Expand All @@ -435,6 +442,11 @@ class LocalExecutor : public BlockIterator
DB::QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;
std::atomic<bool> is_cancelled{false};

/// Record all active LocalExecutor in current executor to cancel them when executor receives shutdown command from driver.
static std::unordered_map<Int64, LocalExecutor *> executors;
static std::mutex executors_mutex;
};


Expand Down
11 changes: 8 additions & 3 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(query_context);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
local_engine::LocalExecutor::addExecutor(executor);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
executor->execute(std::move(query_plan));
Expand Down Expand Up @@ -314,17 +315,19 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}

JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
Expand Down Expand Up @@ -1332,6 +1335,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(context);
local_engine::LocalExecutor::addExecutor(executor);
executor->execute(std::move(query_plan));
env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
Expand All @@ -1341,6 +1345,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(instance);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
Expand Down

0 comments on commit 19324f3

Please sign in to comment.