From 4e44662c3600ff0f2d3d6bf0f61083e8678794a5 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Tue, 2 Jul 2024 20:31:48 +0800 Subject: [PATCH] Revert "[GLUTEN-6122] Fix crash when driver send shutdown command to executor #6130" This reverts commit eee234e398c9418b6f5f93dcfb142e0e0948711f. --- cpp-ch/local-engine/Common/CHUtil.cpp | 7 +-- .../Parser/SerializedPlanParser.cpp | 56 +------------------ .../Parser/SerializedPlanParser.h | 12 ---- cpp-ch/local-engine/local_engine_jni.cpp | 9 +-- 4 files changed, 5 insertions(+), 79 deletions(-) diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 4a21dbe39834..770fbbc59c80 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -822,7 +822,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) size_t index_uncompressed_cache_size = config->getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE); double index_uncompressed_cache_size_ratio = config->getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO); global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio); - + String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY); size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE); double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO); @@ -986,10 +986,7 @@ void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context, void BackendFinalizerUtil::finalizeGlobally() { - /// Make sure that all active LocalExecutor stop before spark executor shutdown, otherwise crash map happen. - LocalExecutor::cancelAll(); - - /// Make sure client caches release before ClientCacheRegistry + // Make sure client caches release before ClientCacheRegistry ReadBufferBuilderFactory::instance().clean(); StorageMergeTreeFactory::clear(); auto & global_context = SerializedPlanParser::global_context; diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 1ee485346d07..46ddb4c9909e 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -2027,33 +2027,6 @@ void SerializedPlanParser::wrapNullable( SharedContextHolder SerializedPlanParser::shared_context; -std::unordered_map LocalExecutor::executors; -std::mutex LocalExecutor::executors_mutex; - -void LocalExecutor::cancelAll() -{ - std::lock_guard lock{executors_mutex}; - - for (auto & [handle, executor] : executors) - executor->asyncCancel(); - - for (auto & [handle, executor] : executors) - executor->waitCancelFinished(); -} - -void LocalExecutor::addExecutor(LocalExecutor * executor) -{ - std::lock_guard lock{executors_mutex}; - Int64 handle = reinterpret_cast(executor); - executors.emplace(handle, executor); -} - -void LocalExecutor::removeExecutor(Int64 handle) -{ - std::lock_guard lock{executors_mutex}; - executors.erase(handle); -} - LocalExecutor::~LocalExecutor() { if (context->getConfigRef().getBool("dump_pipeline", false)) @@ -2121,35 +2094,8 @@ Block * LocalExecutor::nextColumnar() void LocalExecutor::cancel() { - asyncCancel(); - waitCancelFinished(); -} - -void LocalExecutor::asyncCancel() -{ - if (executor && !is_cancelled) - { - LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor {}", reinterpret_cast(this)); + if (executor) executor->cancel(); - } -} - -void LocalExecutor::waitCancelFinished() -{ - if (executor && !is_cancelled) - { - Stopwatch watch; - Chunk chunk; - while (executor->pull(chunk)) - ; - is_cancelled = true; - - LOG_INFO( - &Poco::Logger::get("LocalExecutor"), - "Finish cancel LocalExecutor {}, takes {} ms", - reinterpret_cast(this), - watch.elapsedMilliseconds()); - } } Block & LocalExecutor::getHeader() diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index c62dc73c9394..28263f427102 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -436,16 +436,9 @@ class LocalExecutor : public BlockIterator void setMetric(RelMetricPtr metric_) { metric = metric_; } void setExtraPlanHolder(std::vector & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); } - static void cancelAll(); - static void addExecutor(LocalExecutor * executor); - static void removeExecutor(Int64 handle); - private: std::unique_ptr writeBlockToSparkRow(const DB::Block & block) const; - void asyncCancel(); - void waitCancelFinished(); - /// Dump processor runtime information to log std::string dumpPipeline() const; @@ -458,11 +451,6 @@ class LocalExecutor : public BlockIterator QueryPlanPtr current_query_plan; RelMetricPtr metric; std::vector extra_plan_holder; - std::atomic is_cancelled{false}; - - /// Record all active LocalExecutor in current executor to cancel them when executor receives shutdown command from driver. - static std::unordered_map executors; - static std::mutex executors_mutex; }; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 2338bfe8b1e6..695fc8585538 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -259,7 +259,6 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_ const std::string::size_type plan_size = plan_a.length(); local_engine::LocalExecutor * executor = parser.createExecutor({reinterpret_cast(plan_a.elems()), plan_size}).release(); - local_engine::LocalExecutor::addExecutor(executor); LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast(executor)); executor->setMetric(parser.getMetric()); executor->setExtraPlanHolder(parser.extra_plan_holder); @@ -289,17 +288,15 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START - local_engine::LocalExecutor::removeExecutor(executor_address); auto *executor = reinterpret_cast(executor_address); executor->cancel(); - LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast(executor)); + LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast(executor)); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START - local_engine::LocalExecutor::removeExecutor(executor_address); auto *executor = reinterpret_cast(executor_address); LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast(executor)); delete executor; @@ -1262,8 +1259,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan); const std::string::size_type plan_size = plan_a.length(); local_engine::LocalExecutor * executor - = parser.createExecutor({reinterpret_cast(plan_a.elems()), plan_size}).release(); - local_engine::LocalExecutor::addExecutor(executor); + = parser.createExecutor({reinterpret_cast(plan_a.elems()), plan_size}).release(); return reinterpret_cast(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } @@ -1271,7 +1267,6 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START - local_engine::LocalExecutor::removeExecutor(instance); local_engine::LocalExecutor * executor = reinterpret_cast(instance); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, )