Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6122] Fix crash when driver send shutdown command to executor #6130

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
size_t index_uncompressed_cache_size = config->getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
double index_uncompressed_cache_size_ratio = config->getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);

String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
Expand Down Expand Up @@ -919,7 +919,10 @@ void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context,

void BackendFinalizerUtil::finalizeGlobally()
{
// Make sure client caches release before ClientCacheRegistry
/// Make sure that all active LocalExecutor stop before spark executor shutdown, otherwise crash map happen.
LocalExecutor::cancelAll();

/// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
auto & global_context = SerializedPlanParser::global_context;
Expand Down
56 changes: 55 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,33 @@ void SerializedPlanParser::wrapNullable(

SharedContextHolder SerializedPlanParser::shared_context;

std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
std::mutex LocalExecutor::executors_mutex;

void LocalExecutor::cancelAll()
{
std::lock_guard lock{executors_mutex};

for (auto & [handle, executor] : executors)
executor->asyncCancel();

for (auto & [handle, executor] : executors)
executor->waitCancelFinished();
}

void LocalExecutor::addExecutor(LocalExecutor * executor)
{
std::lock_guard lock{executors_mutex};
Int64 handle = reinterpret_cast<Int64>(executor);
executors.emplace(handle, executor);
}

void LocalExecutor::removeExecutor(Int64 handle)
{
std::lock_guard lock{executors_mutex};
executors.erase(handle);
}

LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
Expand Down Expand Up @@ -2183,8 +2210,35 @@ Block * LocalExecutor::nextColumnar()

void LocalExecutor::cancel()
{
if (executor)
asyncCancel();
waitCancelFinished();
}

void LocalExecutor::asyncCancel()
{
if (executor && !is_cancelled)
{
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor {}", reinterpret_cast<intptr_t>(this));
executor->cancel();
}
}

void LocalExecutor::waitCancelFinished()
{
if (executor && !is_cancelled)
{
Stopwatch watch;
Chunk chunk;
while (executor->pull(chunk))
;
is_cancelled = true;

taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
LOG_INFO(
&Poco::Logger::get("LocalExecutor"),
"Finish cancel LocalExecutor {}, takes {} ms",
reinterpret_cast<intptr_t>(this),
watch.elapsedMilliseconds());
}
}

Block & LocalExecutor::getHeader()
Expand Down
14 changes: 13 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,17 +412,24 @@ class LocalExecutor : public BlockIterator
Block * nextColumnar();
bool hasNext();

/// Stop execution, used when task receives shutdown command or executor receives SIGTERM signal
/// Stop execution and wait for pipeline exit, used when task receives shutdown command or executor receives SIGTERM signal
void cancel();

Block & getHeader();
RelMetricPtr getMetric() const { return metric; }
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

static void cancelAll();
static void addExecutor(LocalExecutor * executor);
static void removeExecutor(Int64 handle);

private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);

void asyncCancel();
void waitCancelFinished();

/// Dump processor runtime information to log
std::string dumpPipeline();

Expand All @@ -435,6 +442,11 @@ class LocalExecutor : public BlockIterator
DB::QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;
std::atomic<bool> is_cancelled{false};

/// Record all active LocalExecutor in current executor to cancel them when executor receives shutdown command from driver.
static std::unordered_map<Int64, LocalExecutor *> executors;
static std::mutex executors_mutex;
};


Expand Down
11 changes: 8 additions & 3 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(query_context);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
local_engine::LocalExecutor::addExecutor(executor);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
executor->execute(std::move(query_plan));
Expand Down Expand Up @@ -314,17 +315,19 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}

JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
Expand Down Expand Up @@ -1332,6 +1335,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(context);
local_engine::LocalExecutor::addExecutor(executor);
executor->execute(std::move(query_plan));
env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
Expand All @@ -1341,6 +1345,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(instance);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
Expand Down
Loading