diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 937beae99a6b0..be66d8ecc509a 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -750,7 +750,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) size_t index_uncompressed_cache_size = config->getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE); double index_uncompressed_cache_size_ratio = config->getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO); global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio); - + String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY); size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE); double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO); @@ -919,7 +919,10 @@ void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context, void BackendFinalizerUtil::finalizeGlobally() { - // Make sure client caches release before ClientCacheRegistry + /// Make sure that all active LocalExecutor stop before spark executor shutdown, otherwise crash map happen. + LocalExecutor::cancelAll(); + + /// Make sure client caches release before ClientCacheRegistry ReadBufferBuilderFactory::instance().clean(); StorageMergeTreeFactory::clear(); auto & global_context = SerializedPlanParser::global_context; diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index f9ea783a2bbd5..70db692c80097 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -2053,6 +2053,33 @@ void SerializedPlanParser::wrapNullable( SharedContextHolder SerializedPlanParser::shared_context; +std::unordered_map LocalExecutor::executors; +std::mutex LocalExecutor::executors_mutex; + +void LocalExecutor::cancelAll() +{ + std::lock_guard lock{executors_mutex}; + + for (auto & [handle, executor] : executors) + executor->asyncCancel(); + + for (auto & [handle, executor] : executors) + executor->waitCancelFinished(); +} + +void LocalExecutor::addExecutor(LocalExecutor * executor) +{ + std::lock_guard lock{executors_mutex}; + Int64 handle = reinterpret_cast(executor); + executors.emplace(handle, executor); +} + +void LocalExecutor::removeExecutor(Int64 handle) +{ + std::lock_guard lock{executors_mutex}; + executors.erase(handle); +} + LocalExecutor::~LocalExecutor() { if (context->getConfigRef().getBool("dump_pipeline", false)) @@ -2183,8 +2210,35 @@ Block * LocalExecutor::nextColumnar() void LocalExecutor::cancel() { - if (executor) + asyncCancel(); + waitCancelFinished(); +} + +void LocalExecutor::asyncCancel() +{ + if (executor && !is_cancelled) + { + LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor {}", reinterpret_cast(this)); executor->cancel(); + } +} + +void LocalExecutor::waitCancelFinished() +{ + if (executor && !is_cancelled) + { + Stopwatch watch; + Chunk chunk; + while (executor->pull(chunk)) + ; + is_cancelled = true; + + LOG_INFO( + &Poco::Logger::get("LocalExecutor"), + "Finish cancel LocalExecutor {}, takes {} ms", + reinterpret_cast(this), + watch.elapsedMilliseconds()); + } } Block & LocalExecutor::getHeader() diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index 8964f42d9d02b..71cdca58a6ce1 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -412,7 +412,7 @@ class LocalExecutor : public BlockIterator Block * nextColumnar(); bool hasNext(); - /// Stop execution, used when task receives shutdown command or executor receives SIGTERM signal + /// Stop execution and wait for pipeline exit, used when task receives shutdown command or executor receives SIGTERM signal void cancel(); Block & getHeader(); @@ -420,9 +420,16 @@ class LocalExecutor : public BlockIterator void setMetric(RelMetricPtr metric_) { metric = metric_; } void setExtraPlanHolder(std::vector & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); } + static void cancelAll(); + static void addExecutor(LocalExecutor * executor); + static void removeExecutor(Int64 handle); + private: std::unique_ptr writeBlockToSparkRow(DB::Block & block); + void asyncCancel(); + void waitCancelFinished(); + /// Dump processor runtime information to log std::string dumpPipeline(); @@ -435,6 +442,11 @@ class LocalExecutor : public BlockIterator DB::QueryPlanPtr current_query_plan; RelMetricPtr metric; std::vector extra_plan_holder; + std::atomic is_cancelled{false}; + + /// Record all active LocalExecutor in current executor to cancel them when executor receives shutdown command from driver. + static std::unordered_map executors; + static std::mutex executors_mutex; }; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 256f373c28b55..bbc467879182b 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -283,7 +283,8 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_ plan_string.assign(reinterpret_cast(plan_address), plan_size); auto query_plan = parser.parse(plan_string); local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(query_context); - LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast(executor)); + local_engine::LocalExecutor::addExecutor(executor); + LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast(executor)); executor->setMetric(parser.getMetric()); executor->setExtraPlanHolder(parser.extra_plan_holder); executor->execute(std::move(query_plan)); @@ -314,17 +315,19 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START + local_engine::LocalExecutor::removeExecutor(executor_address); local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); executor->cancel(); - LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast(executor)); + LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast(executor)); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START + local_engine::LocalExecutor::removeExecutor(executor_address); local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast(executor)); + LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast(executor)); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) } @@ -1332,6 +1335,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE plan_string.assign(reinterpret_cast(plan_address), plan_size); auto query_plan = parser.parse(plan_string); local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(context); + local_engine::LocalExecutor::addExecutor(executor); executor->execute(std::move(query_plan)); env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); return reinterpret_cast(executor); @@ -1341,6 +1345,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START + local_engine::LocalExecutor::removeExecutor(instance); local_engine::LocalExecutor * executor = reinterpret_cast(instance); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, )