diff --git a/src/threading/TaskScheduler.cpp b/src/threading/TaskScheduler.cpp index 7fc010972..61a3b6d5a 100644 --- a/src/threading/TaskScheduler.cpp +++ b/src/threading/TaskScheduler.cpp @@ -73,7 +73,6 @@ namespace threading { const size_t count = pool->pool_descriptor.counts_for_idle ? 1 : 0; // Sum up the number of threads that are idleable - total_idleable_threads += count; pool->n_threads += count; // When task is nullptr there are no more tasks to get and the scheduler is shutting down @@ -84,10 +83,12 @@ namespace threading { } catch (...) { } + if (pool->pool_descriptor.counts_for_idle) { + --total_runnable_tasks; + } } pool->n_threads -= count; - total_idleable_threads -= count; // Clear the current queue current_queue = nullptr; @@ -213,8 +214,12 @@ namespace threading { // We do not accept new tasks once we are shutdown if (running.load()) { + const std::shared_ptr pool = get_pool_queue(task.thread_pool_descriptor); + if (pool->pool_descriptor.counts_for_idle) { + ++total_runnable_tasks; + } // Get the appropiate pool for this task and submit - get_pool_queue(task.thread_pool_descriptor)->submit(std::move(task)); + pool->submit(std::move(task)); } } @@ -284,22 +289,29 @@ namespace threading { // Check if we can run the task if (is_runnable(it->group_descriptor)) { - // Move the task out of the queue Task task = std::move(*it); - // Erase the old position in the queue queue.erase(it); + if (pool->pool_descriptor.counts_for_idle && task.checked_runnable) { + ++total_runnable_tasks; + } + // If we were idle, we are about to not be if (pool->pool_descriptor.counts_for_idle && idle) { - --idle_threads; --pool->idle_threads; } // Return the task return task; } + else if (!it->checked_runnable) { + if (pool->pool_descriptor.counts_for_idle) { + --total_runnable_tasks; + } + it->checked_runnable = true; + } } // If pool concurrency is greater than group concurrency some threads can be left with nothing to do. @@ -323,9 +335,14 @@ namespace threading { // Global idle tasks if (pool->pool_descriptor.counts_for_idle) { - if (++idle_threads == total_idleable_threads) { - for (auto& t : idle_tasks) { - t.second(); + /* mutex scope */ { + if (total_runnable_tasks == 0) { + const std::lock_guard idle_lock(idle_mutex); + if (total_runnable_tasks == 0) { + for (auto& t : idle_tasks) { + t.second(); + } + } } } } diff --git a/src/threading/TaskScheduler.hpp b/src/threading/TaskScheduler.hpp index 758d36cd6..b17348fbd 100644 --- a/src/threading/TaskScheduler.hpp +++ b/src/threading/TaskScheduler.hpp @@ -103,6 +103,8 @@ namespace threading { util::ThreadPoolDescriptor thread_pool_descriptor; /// @brief The callback to be executed std::function run; + /// @brief If task has been checked for runnable + bool checked_runnable{false}; /** * @brief Compare tasks based on their priority @@ -286,10 +288,12 @@ namespace threading { /// @brief mutex for the group map std::mutex group_mutex; + /// @brief mutex for the idle tasks + std::mutex idle_mutex; /// @brief global idle tasks to be executed when no other tasks are running std::map> idle_tasks{}; /// @brief the total number of threads that can be counted as idle - std::atomic total_idleable_threads{0}; + std::atomic total_runnable_tasks{0}; /// @brief the number of global idle threads std::atomic idle_threads{0};