Skip to content

Commit

Permalink
wait for cancel finish
Browse files Browse the repository at this point in the history
  • Loading branch information
taiyang-li committed Jun 19, 2024
1 parent 4f4c946 commit 2f94c99
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 46 deletions.
58 changes: 57 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2042,6 +2042,48 @@ SharedContextHolder SerializedPlanParser::shared_context;
std::unordered_map<Int64, LocalExecutor *> LocalExecutor::executors;
std::mutex LocalExecutor::executors_mutex;

void LocalExecutor::cancelAll()
{
std::lock_guard lock{executors_mutex};
for (auto & [handle, executor] : executors)
{
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel remaining LocalExecutor {}", handle);
executor->cancel();
}

/// Wait for all remaining LocalExecutor to be cancelled
for (auto & [handle, executor] : executors)
{
if (executor->executor && !executor->is_cancelled)
{
Stopwatch watch;
Chunk chunk;
while (executor->executor->pull(chunk))
;

executor->is_cancelled = true;
LOG_INFO(
&Poco::Logger::get("LocalExecutor"),
"Finish cancel LocalExecutor {}, takes {} ms",
handle,
watch.elapsedMilliseconds());
}
}
}

void LocalExecutor::addExecutor(LocalExecutor * executor)
{
std::lock_guard lock{executors_mutex};
Int64 handle = reinterpret_cast<Int64>(executor);
executors.emplace(handle, executor);
}

void LocalExecutor::removeExecutor(Int64 handle)
{
std::lock_guard lock{executors_mutex};
executors.erase(handle);
}

LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
Expand Down Expand Up @@ -2172,8 +2214,22 @@ Block * LocalExecutor::nextColumnar()

void LocalExecutor::cancel()
{
if (executor)
if (executor && !is_cancelled)
{
executor->cancel();

Stopwatch watch;
Chunk chunk;
while (executor->pull(chunk))
;
is_cancelled = true;

LOG_INFO(
&Poco::Logger::get("LocalExecutor"),
"Finish cancel LocalExecutor {}, takes {} ms",
reinterpret_cast<intptr_t>(this),
watch.elapsedMilliseconds());
}
}

Block & LocalExecutor::getHeader()
Expand Down
46 changes: 4 additions & 42 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,48 +417,9 @@ class LocalExecutor : public BlockIterator
void setMetric(RelMetricPtr metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

static void cancelAll()
{
std::lock_guard lock{executors_mutex};
for (auto & [handle, executor] : executors)
{
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Cancel remaining LocalExecutor {}", handle);
executor->cancel();
}

/// Wait for all remaining LocalExecutor to be cancelled
for (auto & [handle, executor] : executors)
{
if (executor->executor)
{
Stopwatch watch;

Chunk chunk;
while (executor->executor->pull(chunk))
;

LOG_INFO(
&Poco::Logger::get("LocalExecutor"),
"Finish cancel remaining LocalExecutor {}, takes {} ms",
handle,
watch.elapsedMilliseconds());
}
}

}

static void addExecutor(LocalExecutor * executor)
{
std::lock_guard lock{executors_mutex};
Int64 handle = reinterpret_cast<Int64>(executor);
executors.emplace(handle, executor);
}

static void removeExecutor(Int64 handle)
{
std::lock_guard lock{executors_mutex};
executors.erase(handle);
}
static void cancelAll();
static void addExecutor(LocalExecutor * executor);
static void removeExecutor(Int64 handle);

private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);
Expand All @@ -475,6 +436,7 @@ class LocalExecutor : public BlockIterator
DB::QueryPlanPtr current_query_plan;
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;
std::atomic<bool> is_cancelled{false};

/// Record all active LocalExecutor in current executor to cancel them when executor receives shutdown command from driver.
static std::unordered_map<Int64, LocalExecutor *> executors;
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(query_context);
local_engine::LocalExecutor::addExecutor(executor);
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
executor->execute(std::move(query_plan));
Expand Down Expand Up @@ -319,7 +319,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIE
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}

Expand All @@ -328,7 +328,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEn
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor::removeExecutor(executor_address);
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
Expand Down

0 comments on commit 2f94c99

Please sign in to comment.