diff --git a/src/dsl/word/MainThread.hpp b/src/dsl/word/MainThread.hpp index 5a395022..197be337 100644 --- a/src/dsl/word/MainThread.hpp +++ b/src/dsl/word/MainThread.hpp @@ -41,7 +41,7 @@ namespace dsl { struct MainThread { /// the description of the thread pool to be used for this PoolType - static inline util::ThreadPoolDescriptor main_descriptor() { + static inline util::ThreadPoolDescriptor descriptor() { return util::ThreadPoolDescriptor{"Main", NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID), 1, @@ -50,7 +50,7 @@ namespace dsl { template static inline util::ThreadPoolDescriptor pool(const threading::ReactionTask& /*task*/) { - return main_descriptor(); + return descriptor(); } }; diff --git a/src/threading/scheduler/Scheduler.cpp b/src/threading/scheduler/Scheduler.cpp index 8a87ed4a..a2b79f23 100644 --- a/src/threading/scheduler/Scheduler.cpp +++ b/src/threading/scheduler/Scheduler.cpp @@ -29,10 +29,6 @@ namespace threading { namespace scheduler { Scheduler::Scheduler(const size_t& thread_count) { - // Make the queue for the main thread - auto main_descriptor = dsl::word::MainThread::main_descriptor(); - pools[main_descriptor.pool_id] = std::make_shared(*this, main_descriptor); - // Make the default pool with the correct number of threads auto default_descriptor = util::ThreadPoolDescriptor{}; default_descriptor.thread_count = thread_count; @@ -40,19 +36,21 @@ namespace threading { } void Scheduler::start() { - std::lock_guard lock(mutex); - - started.store(true, std::memory_order_relaxed); - - // Start all of the pools except the main thread pool - for (auto& pool : pools) { - if (pool.first != NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID)) { - pool.second->start(); + // We have to scope this mutex, otherwise the main thread will hold the mutex while it is running + /*mutex scope*/ { + std::lock_guard lock(pools_mutex); + + started = true; + // Start all of the pools except the main thread pool + for (auto& pool : pools) { + if (pool.first != NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID)) { + pool.second->start(); + } } } // Start the main thread pool which will block the main thread - pools.at(NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID))->start(); + get_pool(dsl::word::MainThread::descriptor())->start(); // The main thread will reach this point when the PowerPlant is shutting down // Calling stop on each pool will wait for each pool to finish processing all tasks before returning @@ -62,7 +60,8 @@ namespace threading { } void Scheduler::stop() { - running.store(false, std::memory_order_relaxed); + running.store(false, std::memory_order_release); + std::lock_guard lock(pools_mutex); for (auto& pool : pools) { pool.second->stop(); } @@ -72,9 +71,12 @@ namespace threading { const std::shared_ptr& reaction) { // If this is the "ALL" pool then add it to the schedulers list of tasks if (desc.pool_id == util::ThreadPoolDescriptor::AllPools().pool_id) { - idle_tasks.push_back(reaction); + /*mutex scope*/ { + std::lock_guard lock(idle_mutex); + idle_tasks.push_back(reaction); + } // Notify the main thread pool just in case there were no global idle tasks and now there are - pools.at(NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID))->notify(); + get_pool(dsl::word::MainThread::descriptor())->notify(); } else { get_pool(desc)->add_idle_task(reaction); @@ -84,6 +86,7 @@ namespace threading { void Scheduler::remove_idle_task(const util::ThreadPoolDescriptor& desc, const NUClear::id_t& id) { // If this is the "ALL" pool then remove it from the schedulers list of tasks if (desc.pool_id == util::ThreadPoolDescriptor::AllPools().pool_id) { + std::lock_guard lock(idle_mutex); idle_tasks.erase( std::remove_if(idle_tasks.begin(), idle_tasks.end(), [&](const auto& r) { return r->id == id; }), idle_tasks.end()); @@ -94,6 +97,7 @@ namespace threading { } std::shared_ptr Scheduler::get_pool(const util::ThreadPoolDescriptor& desc) { + std::lock_guard lock(pools_mutex); // If the pool does not exist, create it if (pools.count(desc.pool_id) == 0) { // Create the pool @@ -101,7 +105,7 @@ namespace threading { pools[desc.pool_id] = pool; // If the scheduler has not yet started then don't start the threads for this pool yet - if (started.load(std::memory_order_relaxed)) { + if (started) { pool->start(); } } @@ -110,6 +114,7 @@ namespace threading { } std::shared_ptr Scheduler::get_group(const util::GroupDescriptor& desc) { + std::lock_guard lock(groups_mutex); // If the group does not exist, create it if (groups.count(desc.group_id) == 0) { groups[desc.group_id] = std::make_shared(desc); @@ -150,7 +155,7 @@ namespace threading { } // Submit the task to the appropriate pool - if (running.load(std::memory_order_relaxed)) { + if (running.load(std::memory_order_acquire)) { pool->submit({std::move(task), std::move(group_lock)}); } } diff --git a/src/threading/scheduler/Scheduler.hpp b/src/threading/scheduler/Scheduler.hpp index 0312d932..d0a831cc 100644 --- a/src/threading/scheduler/Scheduler.hpp +++ b/src/threading/scheduler/Scheduler.hpp @@ -125,9 +125,6 @@ namespace threading { /// If running is false this means the scheduler is shutting down and no more tasks will be accepted std::atomic running{true}; - /// If started is false pools will not be started until start is called - /// once start is called future pools will be started immediately - std::atomic started{false}; /// A mutex for when we are modifying groups std::mutex groups_mutex; @@ -138,6 +135,9 @@ namespace threading { std::mutex pools_mutex; /// A map of pool descriptor ids to pool descriptors std::map> pools{}; + /// If started is false pools will not be started until start is called + /// once start is called future pools will be started immediately + bool started = false; /// A mutex to protect the idle tasks list std::mutex idle_mutex;