Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update condition for running global idle task to use total runnable task count #101

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/threading/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ namespace threading {
const size_t count = pool->pool_descriptor.counts_for_idle ? 1 : 0;

// Sum up the number of threads that are idleable
total_idleable_threads += count;
pool->n_threads += count;

// When task is nullptr there are no more tasks to get and the scheduler is shutting down
Expand All @@ -84,10 +83,12 @@ namespace threading {
}
catch (...) {
}
if (pool->pool_descriptor.counts_for_idle) {
--total_runnable_tasks;
}
}

pool->n_threads -= count;
total_idleable_threads -= count;

// Clear the current queue
current_queue = nullptr;
Expand Down Expand Up @@ -213,8 +214,12 @@ namespace threading {

// We do not accept new tasks once we are shutdown
if (running.load()) {
const std::shared_ptr<PoolQueue> pool = get_pool_queue(task.thread_pool_descriptor);
if (pool->pool_descriptor.counts_for_idle) {
++total_runnable_tasks;
}
// Get the appropiate pool for this task and submit
get_pool_queue(task.thread_pool_descriptor)->submit(std::move(task));
pool->submit(std::move(task));
}
}

Expand Down Expand Up @@ -284,22 +289,29 @@ namespace threading {

// Check if we can run the task
if (is_runnable(it->group_descriptor)) {

// Move the task out of the queue
Task task = std::move(*it);

// Erase the old position in the queue
queue.erase(it);

if (pool->pool_descriptor.counts_for_idle && task.checked_runnable) {
++total_runnable_tasks;
}

// If we were idle, we are about to not be
if (pool->pool_descriptor.counts_for_idle && idle) {
--idle_threads;
--pool->idle_threads;
}

// Return the task
return task;
}
else if (!it->checked_runnable) {
if (pool->pool_descriptor.counts_for_idle) {
--total_runnable_tasks;
}
it->checked_runnable = true;
}
}

// If pool concurrency is greater than group concurrency some threads can be left with nothing to do.
Expand All @@ -323,9 +335,14 @@ namespace threading {

// Global idle tasks
if (pool->pool_descriptor.counts_for_idle) {
if (++idle_threads == total_idleable_threads) {
for (auto& t : idle_tasks) {
t.second();
/* mutex scope */ {
if (total_runnable_tasks == 0) {
const std::lock_guard<std::mutex> idle_lock(idle_mutex);
if (total_runnable_tasks == 0) {
for (auto& t : idle_tasks) {
t.second();
}
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/threading/TaskScheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ namespace threading {
util::ThreadPoolDescriptor thread_pool_descriptor;
/// @brief The callback to be executed
std::function<void()> run;
/// @brief If task has been checked for runnable
bool checked_runnable{false};

/**
* @brief Compare tasks based on their priority
Expand Down Expand Up @@ -286,10 +288,12 @@ namespace threading {
/// @brief mutex for the group map
std::mutex group_mutex;

/// @brief mutex for the idle tasks
std::mutex idle_mutex;
/// @brief global idle tasks to be executed when no other tasks are running
std::map<NUClear::id_t, std::function<void()>> idle_tasks{};
/// @brief the total number of threads that can be counted as idle
std::atomic<size_t> total_idleable_threads{0};
std::atomic<size_t> total_runnable_tasks{0};
/// @brief the number of global idle threads
std::atomic<size_t> idle_threads{0};

Expand Down
Loading