Skip to content

Commit

Permalink
Revert "[GLUTEN-6122] Fix crash when driver send shutdown command to …
Browse files Browse the repository at this point in the history
…executor apache#6130"

This reverts commit eee234e.
  • Loading branch information
baibaichen committed Jul 3, 2024
1 parent 80bb848 commit fb60791
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 79 deletions.
7 changes: 2 additions & 5 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
size_t index_uncompressed_cache_size = config->getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
double index_uncompressed_cache_size_ratio = config->getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);

String index_mark_cache_policy = config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
size_t index_mark_cache_size = config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
double index_mark_cache_size_ratio = config->getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
Expand Down Expand Up @@ -986,10 +986,7 @@ void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context,

void BackendFinalizerUtil::finalizeGlobally()
{
/// Make sure that all active LocalExecutor stop before spark executor shutdown, otherwise crash map happen.
LocalExecutor::cancelAll();

/// Make sure client caches release before ClientCacheRegistry
// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
auto & global_context = SerializedPlanParser::global_context;
Expand Down
56 changes: 1 addition & 55 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2033,33 +2033,6 @@ void SerializedPlanParser::wrapNullable(

SharedContextHolder SerializedPlanParser::shared_context;

std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
std::mutex LocalExecutor::executors_mutex;

void LocalExecutor::cancelAll()
{
std::lock_guard lock{executors_mutex};

for (auto & [handle, executor] : executors)
executor->asyncCancel();

for (auto & [handle, executor] : executors)
executor->waitCancelFinished();
}

void LocalExecutor::addExecutor(LocalExecutor * executor)
{
std::lock_guard lock{executors_mutex};
Int64 handle = reinterpret_cast<Int64>(executor);
executors.emplace(handle, executor);
}

void LocalExecutor::removeExecutor(Int64 handle)
{
std::lock_guard lock{executors_mutex};
executors.erase(handle);
}

LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
Expand Down Expand Up @@ -2127,35 +2100,8 @@ Block * LocalExecutor::nextColumnar()

void LocalExecutor::cancel()
{
asyncCancel();
waitCancelFinished();
}

void LocalExecutor::asyncCancel()
{
if (executor && !is_cancelled)
{
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel LocalExecutor {}", reinterpret_cast<intptr_t>(this));
if (executor)
executor->cancel();
}
}

void LocalExecutor::waitCancelFinished()
{
if (executor && !is_cancelled)
{
Stopwatch watch;
Chunk chunk;
while (executor->pull(chunk))
;
is_cancelled = true;

LOG_INFO(
&Poco::Logger::get("LocalExecutor"),
"Finish cancel LocalExecutor {}, takes {} ms",
reinterpret_cast<intptr_t>(this),
watch.elapsedMilliseconds());
}
}

Block & LocalExecutor::getHeader()
Expand Down
12 changes: 0 additions & 12 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,9 @@ class LocalExecutor : public BlockIterator
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

static void cancelAll();
static void addExecutor(LocalExecutor * executor);
static void removeExecutor(Int64 handle);

private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(const DB::Block & block) const;

void asyncCancel();
void waitCancelFinished();

/// Dump processor runtime information to log
std::string dumpPipeline() const;

Expand All @@ -461,11 +454,6 @@ class LocalExecutor : public BlockIterator
QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;
std::atomic<bool> is_cancelled{false};

/// Record all active LocalExecutor in current executor to cancel them when executor receives shutdown command from driver.
static std::unordered_map<Int64, LocalExecutor *> executors;
static std::mutex executors_mutex;
};


Expand Down
9 changes: 2 additions & 7 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
= parser.createExecutor<false>({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
local_engine::LocalExecutor::addExecutor(executor);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
Expand Down Expand Up @@ -289,17 +288,15 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
auto *executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}

JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
auto *executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
delete executor;
Expand Down Expand Up @@ -1262,16 +1259,14 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
const std::string::size_type plan_size = plan_a.length();
local_engine::LocalExecutor * executor
= parser.createExecutor<false>({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
local_engine::LocalExecutor::addExecutor(executor);
= parser.createExecutor<false>({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}

JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(instance);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
Expand Down

0 comments on commit fb60791

Please sign in to comment.