Skip to content

Commit

Permalink
Tighten the mutexes and atomics in the scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
TrentHouliston committed Aug 8, 2024
1 parent 3f0a42e commit 405e9ac
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/dsl/word/MainThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace dsl {
struct MainThread {

/// the description of the thread pool to be used for this PoolType
static inline util::ThreadPoolDescriptor main_descriptor() {
static inline util::ThreadPoolDescriptor descriptor() {
return util::ThreadPoolDescriptor{"Main",
NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID),
1,
Expand All @@ -50,7 +50,7 @@ namespace dsl {

template <typename DSL>
static inline util::ThreadPoolDescriptor pool(const threading::ReactionTask& /*task*/) {
return main_descriptor();
return descriptor();
}
};

Expand Down
41 changes: 23 additions & 18 deletions src/threading/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,28 @@ namespace threading {
namespace scheduler {

Scheduler::Scheduler(const size_t& thread_count) {
// Make the queue for the main thread
auto main_descriptor = dsl::word::MainThread::main_descriptor();
pools[main_descriptor.pool_id] = std::make_shared<Pool>(*this, main_descriptor);

// Make the default pool with the correct number of threads
auto default_descriptor = util::ThreadPoolDescriptor{};
default_descriptor.thread_count = thread_count;
get_pool(default_descriptor);
}

void Scheduler::start() {
std::lock_guard<std::mutex> lock(mutex);

started.store(true, std::memory_order_relaxed);

// Start all of the pools except the main thread pool
for (auto& pool : pools) {
if (pool.first != NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID)) {
pool.second->start();
// We have to scope this mutex, otherwise the main thread will hold the mutex while it is running
/*mutex scope*/ {
std::lock_guard<std::mutex> lock(pools_mutex);

started = true;
// Start all of the pools except the main thread pool
for (auto& pool : pools) {
if (pool.first != NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID)) {
pool.second->start();
}
}
}

// Start the main thread pool which will block the main thread
pools.at(NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID))->start();
get_pool(dsl::word::MainThread::descriptor())->start();

// The main thread will reach this point when the PowerPlant is shutting down
// Calling stop on each pool will wait for each pool to finish processing all tasks before returning
Expand All @@ -62,7 +60,8 @@ namespace threading {
}

void Scheduler::stop() {
running.store(false, std::memory_order_relaxed);
running.store(false, std::memory_order_release);
std::lock_guard<std::mutex> lock(pools_mutex);
for (auto& pool : pools) {
pool.second->stop();
}
Expand All @@ -72,9 +71,12 @@ namespace threading {
const std::shared_ptr<Reaction>& reaction) {
// If this is the "ALL" pool then add it to the schedulers list of tasks
if (desc.pool_id == util::ThreadPoolDescriptor::AllPools().pool_id) {
idle_tasks.push_back(reaction);
/*mutex scope*/ {
std::lock_guard<std::mutex> lock(idle_mutex);
idle_tasks.push_back(reaction);
}
// Notify the main thread pool just in case there were no global idle tasks and now there are
pools.at(NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID))->notify();
get_pool(dsl::word::MainThread::descriptor())->notify();
}
else {
get_pool(desc)->add_idle_task(reaction);
Expand All @@ -84,6 +86,7 @@ namespace threading {
void Scheduler::remove_idle_task(const util::ThreadPoolDescriptor& desc, const NUClear::id_t& id) {
// If this is the "ALL" pool then remove it from the schedulers list of tasks
if (desc.pool_id == util::ThreadPoolDescriptor::AllPools().pool_id) {
std::lock_guard<std::mutex> lock(idle_mutex);
idle_tasks.erase(
std::remove_if(idle_tasks.begin(), idle_tasks.end(), [&](const auto& r) { return r->id == id; }),
idle_tasks.end());
Expand All @@ -94,14 +97,15 @@ namespace threading {
}

std::shared_ptr<Pool> Scheduler::get_pool(const util::ThreadPoolDescriptor& desc) {
std::lock_guard<std::mutex> lock(pools_mutex);
// If the pool does not exist, create it
if (pools.count(desc.pool_id) == 0) {
// Create the pool
auto pool = std::make_shared<Pool>(*this, desc);
pools[desc.pool_id] = pool;

// If the scheduler has not yet started then don't start the threads for this pool yet
if (started.load(std::memory_order_relaxed)) {
if (started) {
pool->start();
}
}
Expand All @@ -110,6 +114,7 @@ namespace threading {
}

std::shared_ptr<Group> Scheduler::get_group(const util::GroupDescriptor& desc) {
std::lock_guard<std::mutex> lock(groups_mutex);
// If the group does not exist, create it
if (groups.count(desc.group_id) == 0) {
groups[desc.group_id] = std::make_shared<Group>(desc);
Expand Down Expand Up @@ -150,7 +155,7 @@ namespace threading {
}

// Submit the task to the appropriate pool
if (running.load(std::memory_order_relaxed)) {
if (running.load(std::memory_order_acquire)) {
pool->submit({std::move(task), std::move(group_lock)});
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/threading/scheduler/Scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ namespace threading {

/// If running is false this means the scheduler is shutting down and no more tasks will be accepted
std::atomic<bool> running{true};
/// If started is false pools will not be started until start is called
/// once start is called future pools will be started immediately
std::atomic<bool> started{false};

/// A mutex for when we are modifying groups
std::mutex groups_mutex;
Expand All @@ -138,6 +135,9 @@ namespace threading {
std::mutex pools_mutex;
/// A map of pool descriptor ids to pool descriptors
std::map<NUClear::id_t, std::shared_ptr<Pool>> pools{};
/// If started is false pools will not be started until start is called
/// once start is called future pools will be started immediately
bool started = false;

/// A mutex to protect the idle tasks list
std::mutex idle_mutex;
Expand Down

0 comments on commit 405e9ac

Please sign in to comment.