Skip to content

Commit

Permalink
Update global idle task check to use total runnable task count
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom0Brien committed Jan 16, 2024
1 parent 7a82aaf commit b95f972
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
35 changes: 26 additions & 9 deletions src/threading/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ namespace threading {
const size_t count = pool->pool_descriptor.counts_for_idle ? 1 : 0;

// Sum up the number of threads that are idleable
total_idleable_threads += count;
pool->n_threads += count;

// When task is nullptr there are no more tasks to get and the scheduler is shutting down
Expand All @@ -84,10 +83,12 @@ namespace threading {
}
catch (...) {
}
if (pool->pool_descriptor.counts_for_idle) {
--total_runnable_tasks;
}
}

pool->n_threads -= count;
total_idleable_threads -= count;

// Clear the current queue
current_queue = nullptr;
Expand Down Expand Up @@ -213,8 +214,12 @@ namespace threading {

// We do not accept new tasks once we are shutdown
if (running.load()) {
const std::shared_ptr<PoolQueue> pool = get_pool_queue(task.thread_pool_descriptor);
if (pool->pool_descriptor.counts_for_idle) {
++total_runnable_tasks;
}
// Get the appropiate pool for this task and submit
get_pool_queue(task.thread_pool_descriptor)->submit(std::move(task));
pool->submit(std::move(task));
}
}

Expand Down Expand Up @@ -284,22 +289,29 @@ namespace threading {

// Check if we can run the task
if (is_runnable(it->group_descriptor)) {

// Move the task out of the queue
Task task = std::move(*it);

// Erase the old position in the queue
queue.erase(it);

if (pool->pool_descriptor.counts_for_idle && task.checked_runnable) {
++total_runnable_tasks;
}

// If we were idle, we are about to not be
if (pool->pool_descriptor.counts_for_idle && idle) {
--idle_threads;
--pool->idle_threads;
}

// Return the task
return task;
}
else if (!it->checked_runnable) {
if (pool->pool_descriptor.counts_for_idle) {
--total_runnable_tasks;
}
it->checked_runnable = true;
}
}

// If pool concurrency is greater than group concurrency some threads can be left with nothing to do.
Expand All @@ -323,9 +335,14 @@ namespace threading {

// Global idle tasks
if (pool->pool_descriptor.counts_for_idle) {
if (++idle_threads == total_idleable_threads) {
for (auto& t : idle_tasks) {
t.second();
/* mutex scope */ {
if (total_runnable_tasks == 0) {
const std::lock_guard<std::mutex> idle_lock(idle_mutex);
if (total_runnable_tasks == 0) {
for (auto& t : idle_tasks) {
t.second();
}
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/threading/TaskScheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ namespace threading {
util::ThreadPoolDescriptor thread_pool_descriptor;
/// @brief The callback to be executed
std::function<void()> run;
/// @brief If task has been checked for runnable
bool checked_runnable{false};

/**
* @brief Compare tasks based on their priority
Expand Down Expand Up @@ -286,10 +288,12 @@ namespace threading {
/// @brief mutex for the group map
std::mutex group_mutex;

/// @brief mutex for the idle tasks
std::mutex idle_mutex;
/// @brief global idle tasks to be executed when no other tasks are running
std::map<NUClear::id_t, std::function<void()>> idle_tasks{};
/// @brief the total number of threads that can be counted as idle
std::atomic<size_t> total_idleable_threads{0};
std::atomic<size_t> total_runnable_tasks{0};
/// @brief the number of global idle threads
std::atomic<size_t> idle_threads{0};

Expand Down

0 comments on commit b95f972

Please sign in to comment.