From a5d12d3eac6e5485b746b094560c21ad60bd4116 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Tue, 6 Aug 2024 17:13:22 +1000 Subject: [PATCH] Implement the new task scheduler and remove the old one --- src/PowerPlant.cpp | 13 +- src/PowerPlant.hpp | 20 +- src/dsl/word/Idle.hpp | 6 +- src/threading/TaskScheduler.cpp | 360 -------------------------- src/threading/TaskScheduler.hpp | 275 -------------------- src/threading/scheduler/Pool.cpp | 195 ++++++++++++++ src/threading/scheduler/Pool.hpp | 190 ++++++++++++++ src/threading/scheduler/Scheduler.cpp | 156 +++++++++++ src/threading/scheduler/Scheduler.hpp | 138 ++++++++++ tests/tests/threading/Scheduler.cpp | 9 + 10 files changed, 705 insertions(+), 657 deletions(-) delete mode 100644 src/threading/TaskScheduler.cpp delete mode 100644 src/threading/TaskScheduler.hpp create mode 100644 src/threading/scheduler/Pool.cpp create mode 100644 src/threading/scheduler/Pool.hpp create mode 100644 src/threading/scheduler/Scheduler.cpp create mode 100644 src/threading/scheduler/Scheduler.hpp create mode 100644 tests/tests/threading/Scheduler.cpp diff --git a/src/PowerPlant.cpp b/src/PowerPlant.cpp index 32d3b237..69d27ec5 100644 --- a/src/PowerPlant.cpp +++ b/src/PowerPlant.cpp @@ -108,14 +108,13 @@ void PowerPlant::log(const LogLevel& level, std::stringstream& message) { log(level, message.str()); } -void PowerPlant::add_idle_task(const NUClear::id_t& id, - const util::ThreadPoolDescriptor& pool_descriptor, - std::function&& task) { - scheduler.add_idle_task(id, pool_descriptor, std::move(task)); +void PowerPlant::add_idle_task(const util::ThreadPoolDescriptor& pool_descriptor, + const std::shared_ptr& task) { + scheduler.add_idle_task(pool_descriptor, task); } -void PowerPlant::remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor) { - scheduler.remove_idle_task(id, pool_descriptor); +void PowerPlant::remove_idle_task(const util::ThreadPoolDescriptor& pool_descriptor, const NUClear::id_t& id) { + scheduler.remove_idle_task(pool_descriptor, id); } void PowerPlant::shutdown() { @@ -128,7 +127,7 @@ void PowerPlant::shutdown() { emit(std::make_unique()); // Shutdown the scheduler - scheduler.shutdown(); + scheduler.stop(); } bool PowerPlant::running() const { diff --git a/src/PowerPlant.hpp b/src/PowerPlant.hpp index 665f258e..3368d479 100644 --- a/src/PowerPlant.hpp +++ b/src/PowerPlant.hpp @@ -36,7 +36,7 @@ #include "LogLevel.hpp" #include "id.hpp" #include "threading/ReactionTask.hpp" -#include "threading/TaskScheduler.hpp" +#include "threading/scheduler/Scheduler.hpp" #include "util/FunctionFusion.hpp" #include "util/demangle.hpp" @@ -158,16 +158,14 @@ class PowerPlant { * Adds an idle task to the task scheduler. * * This function adds an idle task to the task scheduler, which will be executed when the thread pool associated - * with the given `pool_id` has no other tasks to execute. The `task` parameter is a callable object that - * represents the idle task to be executed. + * with the given `pool_id` has no other tasks to execute. The `task` parameter is a Reaction from which a task will + * be submitted when the pool is idle. * - * @param id The ID of the task. * @param pool_descriptor The descriptor for the thread pool to test for idle - * @param task The idle task to be executed. + * @param reaction The reaction to be executed when idle */ - void add_idle_task(const NUClear::id_t& id, - const util::ThreadPoolDescriptor& pool_descriptor, - std::function&& task); + void add_idle_task(const util::ThreadPoolDescriptor& pool_descriptor, + const std::shared_ptr& reaction); /** * Removes an idle task from the task scheduler. @@ -175,10 +173,10 @@ class PowerPlant { * This function removes an idle task from the task scheduler. The `id` and `pool_id` parameters are used to * identify the idle task to be removed. * - * @param id The ID of the task. * @param pool_descriptor The descriptor for the thread pool to test for idle + * @param id The reaction id of the task to be removed */ - void remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor); + void remove_idle_task(const util::ThreadPoolDescriptor& pool_descriptor, const NUClear::id_t& id); /** * Submits a new task to the ThreadPool to be queued and then executed. @@ -320,7 +318,7 @@ class PowerPlant { /// A list of tasks that must be run when the powerplant starts up std::vector> tasks; /// Our TaskScheduler that handles distributing task to the pool threads - threading::TaskScheduler scheduler; + threading::scheduler::Scheduler scheduler; /// Our vector of Reactors, will get destructed when this vector is std::vector> reactors; /// True if the powerplant is running diff --git a/src/dsl/word/Idle.hpp b/src/dsl/word/Idle.hpp index 4b884e5c..6b156e7f 100644 --- a/src/dsl/word/Idle.hpp +++ b/src/dsl/word/Idle.hpp @@ -45,12 +45,10 @@ namespace dsl { // Our unbinder to remove this reaction reaction->unbinders.push_back([pool_descriptor](const threading::Reaction& r) { // - r.reactor.powerplant.remove_idle_task(r.id, pool_descriptor); + r.reactor.powerplant.remove_idle_task(pool_descriptor, r.id); }); - reaction->reactor.powerplant.add_idle_task(reaction->id, pool_descriptor, [reaction] { - reaction->reactor.powerplant.submit(reaction->get_task()); - }); + reaction->reactor.powerplant.add_idle_task(pool_descriptor, reaction); } /** diff --git a/src/threading/TaskScheduler.cpp b/src/threading/TaskScheduler.cpp deleted file mode 100644 index c3d90494..00000000 --- a/src/threading/TaskScheduler.cpp +++ /dev/null @@ -1,360 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2013 NUClear Contributors - * - * This file is part of the NUClear codebase. - * See https://github.com/Fastcode/NUClear for further info. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the - * Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE - * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR - * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#include "TaskScheduler.hpp" - -#include -#include -#include -#include -#include -#include -#include - -#include "../dsl/word/MainThread.hpp" -#include "../id.hpp" -#include "../util/GroupDescriptor.hpp" -#include "../util/ThreadPoolDescriptor.hpp" -#include "../util/platform.hpp" -#include "../util/update_current_thread_priority.hpp" - -namespace NUClear { -namespace threading { - - bool TaskScheduler::is_runnable(const util::GroupDescriptor& group) { - - // Default group is always runnable - if (group.group_id == 0) { - return true; - } - - if (groups.count(group.group_id) == 0) { - groups[group.group_id] = 0; - } - - // Task can run if the group it belongs to has spare threads - if (groups.at(group.group_id) < group.thread_count) { - // This task is about to run in this group, increase the number of active tasks in the group - groups.at(group.group_id)++; - return true; - } - - return false; - } - - void TaskScheduler::run_task(std::unique_ptr&& task) { - task->run(); - - // We need to do group counting if this isn't the default group - if (task->group_descriptor.group_id != 0) { - const std::lock_guard group_lock(group_mutex); - --groups.at(task->group_descriptor.group_id); - } - } - - void TaskScheduler::pool_func(std::shared_ptr pool) { - - // Set the thread pool for this thread so it can be accessed elsewhere - current_queue = &pool; - - // When task is nullptr there are no more tasks to get and the scheduler is shutting down - while (running.load() || !pool->queue.empty()) { - try { - // Run the next task - run_task(get_task()); - } - catch (...) { - } - if (pool->pool_descriptor.counts_for_idle) { - --global_runnable_tasks; - --pool->runnable_tasks; - } - } - - // Clear the current queue - current_queue = nullptr; - } - - TaskScheduler::TaskScheduler(const size_t& thread_count) { - // Make the queue for the main thread - auto main_descriptor = dsl::word::MainThread::main_descriptor(); - pool_queues[main_descriptor.pool_id] = std::make_shared(main_descriptor); - - // Make the default pool with the correct number of threads - auto default_descriptor = util::ThreadPoolDescriptor{}; - default_descriptor.thread_count = thread_count; - get_pool_queue(default_descriptor); - } - - void TaskScheduler::start_threads(const std::shared_ptr& pool) { - // The main thread never needs to be started - if (pool->pool_descriptor.pool_id != NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID)) { - const std::lock_guard lock(pool->mutex); - while (pool->threads.size() < pool->pool_descriptor.thread_count) { - pool->threads.emplace_back(std::make_unique(&TaskScheduler::pool_func, this, pool)); - } - } - } - - std::shared_ptr TaskScheduler::get_pool_queue(const util::ThreadPoolDescriptor& pool) { - // If the pool does not exist, create it - const std::lock_guard pool_lock(pool_mutex); - if (pool_queues.count(pool.pool_id) == 0) { - // Create the pool - auto queue = std::make_shared(pool); - pool_queues[pool.pool_id] = queue; - - // If the scheduler has not yet started then don't start the threads for this pool yet - if (started.load()) { - start_threads(queue); - } - } - - return pool_queues.at(pool.pool_id); - } - - void TaskScheduler::start() { - - // The scheduler is now started - started.store(true); - - // Start all our threads - for (const auto& pool : pool_queues) { - start_threads(pool.second); - } - - // Run main thread tasks - pool_func(pool_queues.at(NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID))); - - /** - * Once the main thread reaches this point it is because the powerplant, and by extension the scheduler, have - * been shutdown and the main thread is now about to leave the scheduler. - */ - - // Poke all of the threads to make sure they are awake and then wait for them to finish - for (auto& pool : pool_queues) { - /* mutex scope */ { - const std::lock_guard queue_lock(pool.second->mutex); - pool.second->condition.notify_all(); - } - for (auto& thread : pool.second->threads) { - try { - if (thread->joinable()) { - thread->join(); - } - } - // This gets thrown some time if between checking if joinable and joining - // the thread is no longer joinable - catch (const std::system_error&) { - } - } - } - } - - void TaskScheduler::shutdown() { - started.store(false); - running.store(false); - for (auto& pool : pool_queues) { - const std::lock_guard lock(pool.second->mutex); - pool.second->condition.notify_all(); - } - } - - void TaskScheduler::PoolQueue::submit(std::unique_ptr&& task) { - const std::lock_guard lock(mutex); - - // Insert in sorted order - PoolQueue::Task item{std::move(task), false}; - queue.insert(std::lower_bound(queue.begin(), queue.end(), item), std::move(item)); - - // Notify a single thread that there is a new task - condition.notify_one(); - } - - void TaskScheduler::submit(std::unique_ptr&& task, const bool& immediate) noexcept { - - // Immediate tasks are executed directly on the current thread if they can be - // If something is blocking them from running right now they are added to the queue - if (immediate) { - bool runnable = false; - /* mutex scope */ { - const std::lock_guard group_lock(group_mutex); - runnable = is_runnable(task->group_descriptor); - } - if (runnable) { - run_task(std::move(task)); - return; - } - } - - // We only accept new tasks while the scheduler is running - if (running.load()) { - const std::shared_ptr pool = get_pool_queue(task->thread_pool_descriptor); - if (pool->pool_descriptor.counts_for_idle) { - ++global_runnable_tasks; - ++pool->runnable_tasks; - } - // Get the appropiate pool for this task and submit - pool->submit(std::move(task)); - } - } - - void TaskScheduler::add_idle_task(const NUClear::id_t& id, - const util::ThreadPoolDescriptor& pool_descriptor, - std::function&& task) { - - if (pool_descriptor.pool_id == NUClear::id_t(-1)) { - const std::lock_guard lock(pool_mutex); - idle_tasks.emplace(id, std::move(task)); - } - else { - // Get the appropiate pool for this task - const std::shared_ptr pool = get_pool_queue(pool_descriptor); - - // Find where to insert the new task to maintain task order - const std::lock_guard lock(pool->mutex); - pool->idle_tasks.emplace(id, std::move(task)); - } - } - - void TaskScheduler::remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor) { - - if (pool_descriptor.pool_id == NUClear::id_t(-1)) { - const std::lock_guard lock(pool_mutex); - if (idle_tasks.count(id) > 0) { - idle_tasks.erase(id); - } - } - else { - // Get the appropiate pool for this task - const std::shared_ptr pool = get_pool_queue(pool_descriptor); - - // Find the idle task with the given id and remove it if it exists - const std::lock_guard lock(pool->mutex); - if (pool->idle_tasks.count(id) > 0) { - pool->idle_tasks.erase(id); - } - } - } - - std::unique_ptr TaskScheduler::get_task() { - - // Wait at a high (but not realtime) priority to reduce latency for picking up a new task - update_current_thread_priority(1000); - - if (current_queue == nullptr) { - throw std::runtime_error("Only threads managed by the TaskScheduler can get tasks"); - } - - // Get the queue for this thread from its thread local storage - const std::shared_ptr pool = *current_queue; - auto& queue = pool->queue; - auto& condition = pool->condition; - - // Keep looking for tasks while the scheduler is still running, or while there are still tasks to process - std::unique_lock lock(pool->mutex); - bool idle = false; - while (running.load() || !queue.empty()) { - - // Only one thread can be checking group concurrency at a time otherwise the ordering might not be correct - /* mutex scope */ { - const std::lock_guard group_lock(group_mutex); - - // Iterate over all the tasks in the current thread pool queue, looking for one that we can run - for (auto it = queue.begin(); it != queue.end(); ++it) { - - // Check if we can run the task - if (is_runnable(it->task->group_descriptor)) { - // Move the task out of the queue - auto task = std::move(*it); - // Erase the old position in the queue - queue.erase(it); - - // If it was blocked and is now unblocked, there is an additional runnable task - if (pool->pool_descriptor.counts_for_idle && task.blocked) { - ++global_runnable_tasks; - ++pool->runnable_tasks; - } - - // Return the task - return std::move(task.task); - } - - // The task is blocked, mark it as such and lower the number of runnable tasks - if (!it->blocked) { - if (pool->pool_descriptor.counts_for_idle) { - --global_runnable_tasks; - --pool->runnable_tasks; - } - it->blocked = true; - } - } - - // If pool concurrency is greater than group concurrency some threads can be left with nothing to do. - // Since running is false there will likely never be anything new to do and we are shutting down anyway. - // So if we can't find a task to run we should just quit. - if (!running.load()) { - condition.notify_all(); - throw ShutdownThreadException(); - } - } - - if (pool->pool_descriptor.counts_for_idle && !idle) { - idle = true; - - /* mutex scope */ { - const std::lock_guard idle_lock(idle_mutex); - // Local idle tasks - if (pool->runnable_tasks == 0) { - for (auto& t : pool->idle_tasks) { - t.second(); - } - } - - // Global idle tasks - if (global_runnable_tasks == 0) { - for (auto& t : idle_tasks) { - t.second(); - } - } - } - } - else { - - // Wait for something to happen! - // We don't have a condition on this lock as the check would be this doing this loop again to see if - // there are any tasks we can execute (checking all the groups) so therefore we already did the entry - // predicate. Putting a condition on this would stop spurious wakeups of which the cost would be equal - // to the loop. - condition.wait(lock); // NOSONAR - } - } - - // If we get out here then we are finished running. - throw ShutdownThreadException(); - } - - // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) - ATTRIBUTE_TLS std::shared_ptr* TaskScheduler::current_queue = nullptr; - -} // namespace threading -} // namespace NUClear diff --git a/src/threading/TaskScheduler.hpp b/src/threading/TaskScheduler.hpp deleted file mode 100644 index 5162def2..00000000 --- a/src/threading/TaskScheduler.hpp +++ /dev/null @@ -1,275 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2013 NUClear Contributors - * - * This file is part of the NUClear codebase. - * See https://github.com/Fastcode/NUClear for further info. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to - * permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the - * Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE - * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR - * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#ifndef NUCLEAR_THREADING_TASK_SCHEDULER_HPP -#define NUCLEAR_THREADING_TASK_SCHEDULER_HPP - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "../id.hpp" -#include "../util/GroupDescriptor.hpp" -#include "../util/ThreadPoolDescriptor.hpp" -#include "../util/platform.hpp" -#include "ReactionTask.hpp" - -namespace NUClear { -namespace threading { - - /** - * This class is responsible for scheduling tasks and distributing them amongst threads. - * - * PRIORITY - * what priority this task should run with - * tasks are ordered by priority -> creation order - * POOL - * which thread pool this task should execute in - * 0 being execute on the main thread - * 1 being the default pool - * GROUP - * which grouping this task belongs to for concurrency (default to the 0 group) - * only run if there are less than N tasks running in this group - * IMMEDIATE - * if the submitter of this task should wait until this task is finished before returning (for DIRECT - * emits) - * - * @em Priority - * @code Priority

@endcode - * When a priority is encountered, the task will be scheduled to execute based on this. If one of the three - * normal options are specified (HIGH, DEFAULT and LOW), then within the specified Sync group, it will run - * before, normally or after other reactions. - * - * @em Sync - * @code Sync @endcode - * When a Sync type is encountered, the system uses this as a compile time mutex flag. It will not allow two - * callbacks with the same Sync type to execute at the same time. It will effectively ensure that all of the - * callbacks with this type run in sequence with each other, rather then in parallel. - * - * @em Group - * @code Group @endcode - * When a Group type is encountered, the system uses this as a compile time semaphore flag. It will not allow - * (GroupConcurrency + 1) callbacks with the same Group type to execute at the same time. It will effectively - * ensure that the first GroupConcurrency callbacks with this type run in parallel and all subsequent callbacks - * will be queued to run when one of the first GroupConcurrency callbacks have returned - * - * @em Single - * @code Single @endcode - * If single is encountered while processing the function, and a Task object for this Reaction is already - * running in a thread, or waiting in the Queue, then this task is ignored and dropped from the system. - */ - class TaskScheduler { - private: - /** - * Exception thrown when a thread in the pool should shut down. - */ - class ShutdownThreadException : public std::exception {}; - - /** - * A struct which contains all the information about an individual thread pool - */ - struct PoolQueue { - struct Task { - std::unique_ptr task; - bool blocked; - bool operator<(const Task& other) const { - return *task < *other.task; - } - }; - - explicit PoolQueue(const util::ThreadPoolDescriptor& pool_descriptor) : pool_descriptor(pool_descriptor) {} - /// The descriptor for this thread pool - const util::ThreadPoolDescriptor pool_descriptor; - /// The threads which are running in this thread pool - std::vector> threads; - /// The number of runnable tasks in this thread pool - size_t runnable_tasks{0}; - /// The queue of tasks for this specific thread pool and if they are group blocked - std::vector queue; - /// The mutex which protects the queue - std::recursive_mutex mutex; - /// The condition variable which threads wait on if they can't get a task - std::condition_variable_any condition; - /// The map of idle tasks for this thread pool - std::map> idle_tasks; - - /** - * Submit a new task to this thread pool - * - * @param task the reaction task task to submit - */ - void submit(std::unique_ptr&& task); - }; - - public: - /** - * Constructs a new TaskScheduler instance, and builds the nullptr sync queue. - */ - explicit TaskScheduler(const size_t& default_thread_count); - - /** - * Starts the scheduler, and begins executing tasks. - * - * The main thread will stay in this function executing tasks until the scheduler is shutdown. - */ - void start(); - - /** - * Shuts down the scheduler, all waiting threads are woken, and attempting to get a task results in an exception - */ - void shutdown(); - - /** - * Submit a new task to be executed to the Scheduler. - * - * This method submits a new task to the scheduler. This task will then be sorted into the appropriate - * queue based on it's sync type and priority. It will then wait there until it is removed by a thread to - * be processed. - * - * @param task the reaction task task to submit - * @param immediate if this task should run immediately in the current thread. If immediate execution of this - * task is not possible (e.g. due to group concurrency restrictions) this task will be queued - * as normal - */ - void submit(std::unique_ptr&& task, const bool& immediate) noexcept; - - /** - * Adds an idle task to the task scheduler. - * - * This function adds an idle task to the task scheduler, which will be executed when the thread pool associated - * with the given `pool_id` has no other tasks to execute. The `task` parameter is a callable object that - * represents the idle task to be executed. - * - * @param id The ID of the task. - * @param pool_descriptor The descriptor for the thread pool to test for idle - * @param task The idle task to be executed. - */ - void add_idle_task(const NUClear::id_t& id, - const util::ThreadPoolDescriptor& pool_descriptor, - std::function&& task); - - /** - * Removes an idle task from the task scheduler. - * - * This function removes an idle task from the task scheduler. The `id` and `pool_id` parameters are used to - * identify the idle task to be removed. - * - * @param id The ID of the task - * @param pool_descriptor The descriptor for the thread pool to test for idle - */ - void remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor); - - private: - /** - * Get a task object to be executed by a thread. - * - * This method will get a task object to be executed from the queue. It will block until such a time as a - * task is available to be executed. For example, if a task with a particular sync type was out, then this - * thread would block until that sync type was no longer out, and then it would take a task. - * - * @return the task which has been given to be executed - */ - std::unique_ptr get_task(); - - /** - * Gets a pool queue for the given thread pool descriptor or creates one if it does not exist - * - * @param pool the descriptor for the thread pool to get or create - * - * @return a shared pointer to the pool queue for the given thread pool descriptor - */ - std::shared_ptr get_pool_queue(const util::ThreadPoolDescriptor& pool); - - /** - * The function that each thread runs - * - * This function will repeatedly query the task queue for new a task to run and then execute that task - * - * @param pool the thread pool to run from and the task queue to get tasks from - */ - void pool_func(std::shared_ptr pool); - - /** - * Start all threads for the given thread pool - */ - void start_threads(const std::shared_ptr& pool); - - /** - * Execute the given task - * - * After execution of the task has completed the number of active tasks in the tasks' group is decremented - * - * @param task the task to execute - */ - void run_task(std::unique_ptr&& task); - - /** - * Determines if the given task is able to be executed - * - * If the current thread is able to be executed the number of active tasks in the tasks' groups is incremented - * - * @param group the group descriptor for the task - * - * @return true if the task is currently runnable - * @return false if the task is not currently runnable - */ - bool is_runnable(const util::GroupDescriptor& group); - - /// if the scheduler is running, and accepting new tasks. If this is false and a new, non-immediate, task - /// is submitted it will be ignored - std::atomic running{true}; - /// if the scheduler has been started. This is set to true after a call to start is made. Once this is - /// set to true all threads will begin executing tasks from the tasks queue - std::atomic started{false}; - - /// A map of group ids to the number of active tasks currently running in that group - std::map groups{}; - /// mutex for the group map - std::mutex group_mutex; - - /// mutex for the idle tasks - std::mutex idle_mutex; - /// global idle tasks to be executed when no other tasks are running - std::map> idle_tasks{}; - /// the total number of threads that have runnable tasks - std::atomic global_runnable_tasks{0}; - - /// A map of pool descriptor ids to pool descriptors - std::map> pool_queues{}; - /// a mutex for when we are modifying the pool_queues map - std::mutex pool_mutex; - /// a pointer to the pool_queue for the current thread so it does not have to access via the map - // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) - static ATTRIBUTE_TLS std::shared_ptr* current_queue; - }; - -} // namespace threading -} // namespace NUClear - -#endif // NUCLEAR_THREADING_TASK_SCHEDULER_HPP diff --git a/src/threading/scheduler/Pool.cpp b/src/threading/scheduler/Pool.cpp new file mode 100644 index 00000000..ee19c71d --- /dev/null +++ b/src/threading/scheduler/Pool.cpp @@ -0,0 +1,195 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "Pool.hpp" + +#include "../../message/ReactionStatistics.hpp" +#include "../ReactionTask.hpp" +#include "IdleLock.hpp" +#include "Scheduler.hpp" + +namespace NUClear { +namespace threading { + namespace scheduler { + + Pool::Pool(Scheduler& scheduler, const util::ThreadPoolDescriptor& descriptor) + : scheduler(scheduler), descriptor(descriptor) {} + + void Pool::start() { + // Increase the number of active threads if this pool counts for idle + active = descriptor.counts_for_idle ? descriptor.thread_count : 0; + scheduler.active.fetch_add(active, std::memory_order_relaxed); + + // The main thread never needs to be started + if (descriptor.pool_id != NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID)) { + const std::lock_guard lock(mutex); + while (threads.size() < descriptor.thread_count) { + threads.emplace_back(std::make_unique(&Pool::run, this)); + } + } + else { + // The main thread is the current thread so we can just run it + run(); + } + } + + void Pool::stop() { + // Stop the pool threads + std::lock_guard lock(mutex); + running.store(false); + condition.notify_all(); + } + + void Pool::notify() { + std::lock_guard lock(mutex); + checked = false; + condition.notify_one(); + } + + void Pool::join() { + // Join all the threads + for (auto& thread : threads) { + if (thread->joinable()) { + thread->join(); + } + } + } + + void Pool::submit(Task&& task) { + const std::lock_guard lock(mutex); + + // Insert in sorted order + queue.insert(std::lower_bound(queue.begin(), queue.end(), task), std::move(task)); + checked = false; + + // Notify a single thread that there is a new task + condition.notify_one(); + } + + void Pool::add_idle_task(const std::shared_ptr& reaction) { + const std::lock_guard lock(mutex); + idle_tasks.push_back(reaction); + + // If we previously had no idle tasks, it's possible every thread is sleeping (idle) + // Therefore we need to notify all since we don't know which thread is the one that holds the idle lock + if (idle_tasks.size() == 1) { + checked = false; + condition.notify_all(); + } + } + + void Pool::remove_idle_task(const NUClear::id_t& id) { + const std::lock_guard lock(mutex); + idle_tasks.erase( + std::remove_if(idle_tasks.begin(), idle_tasks.end(), [&](const auto& r) { return r->id == id; }), + idle_tasks.end()); + } + + void Pool::run() { + + // When task is nullptr there are no more tasks to get and the scheduler is shutting down + while (running.load() || !queue.empty()) { + try { + // Run the next task + Task task = get_task(); + task.task->run(); + } + catch (...) { + } + } + } + + Pool::Task Pool::get_task() { + + std::unique_lock lock(mutex); + while (running.load(std::memory_order_relaxed) || !queue.empty()) { + if (!checked) { + // Get the first task that can be run + for (auto it = queue.begin(); it != queue.end(); ++it) { + // If the task is not a group member, or we can get a token for the group then we can run it + if (it->lock == nullptr || it->lock->lock()) { + // If the task is not group blocked or we can lock the group then we can run it + Task task = std::move(*it); + queue.erase(it); + return task; + } + } + checked = true; + } + + // If we reach here before we sleep check if we can run the idle task + auto idle_task = get_idle_task(); + if (idle_task.task != nullptr && idle_task.lock->lock()) { + return idle_task; + } + + // Wait for something to happen! + condition.wait(lock, [this] { return !checked || (!running.load() && queue.empty()); }); + } + + condition.notify_all(); + throw ShutdownThreadException(); + } + + Pool::Task Pool::get_idle_task() { + // If this pool does not count for idle, it can't participate in idle tasks + if (!descriptor.counts_for_idle) { + return Task{nullptr, nullptr}; + } + + // Make the idle lock to which will make this thread count as idle + auto idle_lock = std::make_unique(active, scheduler.active); + + // If we weren't the last, just return no task along with the lock to hold the idle state + if (!idle_lock->lock()) { + return Task{nullptr, std::move(idle_lock)}; + } + + std::vector> tasks; + if (idle_lock->local_lock()) { + tasks.insert(tasks.end(), idle_tasks.begin(), idle_tasks.end()); + } + if (idle_lock->global_lock()) { + // TODO unprotected access to scheduler idle tasks here + tasks.insert(tasks.end(), scheduler.idle_tasks.begin(), scheduler.idle_tasks.end()); + } + + // If there are no idle tasks, + + auto task = std::make_unique( + nullptr, + [](const ReactionTask&) { return 0; }, + [](const ReactionTask&) { return util::ThreadPoolDescriptor{}; }, + [](const ReactionTask&) { return util::GroupDescriptor{}; }); + + task->callback = [this, tasks = std::move(tasks)](const ReactionTask& /*task*/) { + for (auto& idle_task : tasks) { + // Submit all the idle tasks to the scheduler + scheduler.submit(idle_task->get_task(), false); + } + }; + + return Task{std::move(task), std::move(idle_lock)}; + } + + } // namespace scheduler +} // namespace threading +} // namespace NUClear diff --git a/src/threading/scheduler/Pool.hpp b/src/threading/scheduler/Pool.hpp new file mode 100644 index 00000000..5b6064bc --- /dev/null +++ b/src/threading/scheduler/Pool.hpp @@ -0,0 +1,190 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef NUCLEAR_THREADING_SCHEDULER_POOL_HPP +#define NUCLEAR_THREADING_SCHEDULER_POOL_HPP + +#include +#include +#include +#include +#include + +#include "../../util/ThreadPoolDescriptor.hpp" +#include "../ReactionTask.hpp" +#include "Group.hpp" +#include "IdleLock.hpp" + +namespace NUClear { +namespace threading { + namespace scheduler { + + // Forward declare the scheduler + class Scheduler; + + class Pool { + public: + struct Task { + /// Holds the task to execute + std::unique_ptr task; + /// A lock that is held while the task is being executed. + /// This lock should release via RAII when the task is done. + std::unique_ptr lock; + + /** + * Sorts the tasks by the sort order of the reaction tasks + * + * @param other the other task to compare to + * + * @return true if this task should be executed before the other task + */ + inline bool operator<(const Task& other) const { + return *task < *other.task; + } + }; + + /** + * Exception thrown when a thread in the pool should shut down. + */ + class ShutdownThreadException : public std::exception {}; + + /** + * Construct a new thread pool with the given descriptor + * + * @param scheduler the scheduler parent of this pool + * @param descriptor the descriptor for this thread pool + */ + explicit Pool(Scheduler& scheduler, const util::ThreadPoolDescriptor& descriptor); + + /** + * Starts the thread pool and begins executing tasks. + * + * If the main thread pool is started then the main thread will stay in this function executing tasks until + * the scheduler is shutdown. + */ + void start(); + + /** + * Stops the thread pool, all threads are woken and once the task queue is empty the threads will exit. + * This function returns immediately, use join to wait for the threads to exit. + */ + void stop(); + + /** + * Notify a thread in this pool that there is work to do. + * + * It will wake up a thread if one is waiting for work, otherwise it will be picked up by the next thread. + */ + void notify(); + + /** + * Wait for all threads in this pool to exit. + */ + void join(); + + /** + * Submit a new task to this thread pool + * + * @param task the reaction task task to submit + */ + void submit(Task&& task); + + /** + * Add an idle task to this pool. + * + * This will add a task to the idle task list for this pool. + * + * @param reaction the reaction to add to the idle task list + */ + void add_idle_task(const std::shared_ptr& reaction); + + /** + * Remove an idle task from this pool. + * + * @param id the id of the reaction to remove from the idle task list + */ + void remove_idle_task(const NUClear::id_t& id); + + private: + /** + * The main function executed by each thread in the pool. + * + * The thread will wait for a task to be available and then execute it. + * This will continue until the pool is stopped. + */ + void run(); + + /** + * Get the next task to execute. + * + * This will return the next task to execute or block until a task is available. + * + * @return the next task to execute + */ + Task get_task(); + + /** + * Get an idle task to execute or hold. + * + * This will return an idle task instance. + * If the lock on the idle task returns true, it will then execute the idle task which enqueues the tasks + * that have been declared. + * + * If this was not the last thread to become idle, it will return an object which will not lock and the + * thread should then sleep until it is woken. + * + * @return the idle task to execute if it is lockable or hold if it is not + */ + Task get_idle_task(); + + // The scheduler parent of this pool + Scheduler& scheduler; + + /// The descriptor for this thread pool + const util::ThreadPoolDescriptor descriptor; + + /// If running is false this means the pool is shutting down and no more tasks will be accepted + std::atomic running{true}; + + /// The threads which are running in this thread pool + std::vector> threads; + + /// The mutex which protects the queue and idle tasks + std::mutex mutex; + /// The queue of tasks for this specific thread pool + std::vector queue; + /// The condition variable which threads wait on if they can't get a task + std::condition_variable condition; + + /// The number of active threads in this pool + std::atomic active{0}; + /// The idle tasks for this pool + std::vector> idle_tasks; + + /// Holds true if the queue has been checked and nothing could run + bool checked = false; + }; + + } // namespace scheduler +} // namespace threading +} // namespace NUClear + +#endif // NUCLEAR_THREADING_SCHEDULER_POOL_HPP diff --git a/src/threading/scheduler/Scheduler.cpp b/src/threading/scheduler/Scheduler.cpp new file mode 100644 index 00000000..d601eba1 --- /dev/null +++ b/src/threading/scheduler/Scheduler.cpp @@ -0,0 +1,156 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "Scheduler.hpp" + +#include "../../dsl/word/MainThread.hpp" +#include "GroupLock.hpp" + +namespace NUClear { +namespace threading { + namespace scheduler { + + Scheduler::Scheduler(const size_t& thread_count) { + // Make the queue for the main thread + auto main_descriptor = dsl::word::MainThread::main_descriptor(); + pools[main_descriptor.pool_id] = std::make_shared(*this, main_descriptor); + + // Make the default pool with the correct number of threads + auto default_descriptor = util::ThreadPoolDescriptor{}; + default_descriptor.thread_count = thread_count; + get_pool(default_descriptor); + } + + void Scheduler::start() { + std::lock_guard lock(mutex); + + started.store(true, std::memory_order_relaxed); + + // Start all of the pools except the main thread pool + for (auto& pool : pools) { + if (pool.first != NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID)) { + pool.second->start(); + } + } + + // Start the main thread pool which will block the main thread + pools.at(NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID))->start(); + + // The main thread will reach this point when the PowerPlant is shutting down + // Calling stop on each pool will wait for each pool to finish processing all tasks before returning + for (auto& pool : pools) { + pool.second->join(); + } + } + + void Scheduler::stop() { + running.store(false, std::memory_order_relaxed); + for (auto& pool : pools) { + pool.second->stop(); + } + } + + void Scheduler::add_idle_task(const util::ThreadPoolDescriptor& desc, + const std::shared_ptr& reaction) { + // If this is the "ALL" pool then add it to the schedulers list of tasks + if (desc.pool_id == util::ThreadPoolDescriptor::AllPools().pool_id) { + idle_tasks.push_back(reaction); + // Notify the main thread pool just in case there were no global idle tasks and now there are + pools.at(NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID))->notify(); + } + else { + get_pool(desc)->add_idle_task(reaction); + } + } + + void Scheduler::remove_idle_task(const util::ThreadPoolDescriptor& desc, const NUClear::id_t& id) { + // If this is the "ALL" pool then remove it from the schedulers list of tasks + if (desc.pool_id == util::ThreadPoolDescriptor::AllPools().pool_id) { + idle_tasks.erase( + std::remove_if(idle_tasks.begin(), idle_tasks.end(), [&](const auto& r) { return r->id == id; }), + idle_tasks.end()); + } + else { + get_pool(desc)->remove_idle_task(id); + } + } + + std::shared_ptr Scheduler::get_pool(const util::ThreadPoolDescriptor& desc) { + // If the pool does not exist, create it + if (pools.count(desc.pool_id) == 0) { + // Create the pool + auto pool = std::make_shared(*this, desc); + pools[desc.pool_id] = pool; + + // If the scheduler has not yet started then don't start the threads for this pool yet + if (started.load(std::memory_order_relaxed)) { + pool->start(); + } + } + + return pools.at(desc.pool_id); + } + + std::shared_ptr Scheduler::get_group(const util::GroupDescriptor& desc) { + // Default group is ungrouped + if (desc.group_id == 0) { + return nullptr; + } + + // If the group does not exist, create it + if (groups.count(desc.group_id) == 0) { + groups[desc.group_id] = std::make_shared(desc); + } + + return groups.at(desc.group_id); + } + + void Scheduler::submit(std::unique_ptr&& task, const bool& immediate) noexcept { + + // If this task should run immediately and is not grouped then run it immediately + if (immediate && task->group_descriptor.group_id == 0) { + task->run(); + return; + } + + auto group = get_group(task->group_descriptor); + if (immediate) { + GroupLock lock(group); + if (lock.lock()) { + task->run(); + return; + } + } + + auto pool = get_pool(task->thread_pool_descriptor); + + // Submit the task to the pool + if (running.load(std::memory_order_relaxed)) { + pool->submit({ + std::move(task), + group != nullptr ? std::make_unique(group, [pool] { pool->notify(); }) : nullptr, + }); + } + } + + } // namespace scheduler +} // namespace threading +} // namespace NUClear diff --git a/src/threading/scheduler/Scheduler.hpp b/src/threading/scheduler/Scheduler.hpp new file mode 100644 index 00000000..8e1bbabe --- /dev/null +++ b/src/threading/scheduler/Scheduler.hpp @@ -0,0 +1,138 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef NUCLEAR_THREADING_TASK_SCHEDULER_HPP +#define NUCLEAR_THREADING_TASK_SCHEDULER_HPP + +#include +#include +#include +#include +#include + +#include "../../util/GroupDescriptor.hpp" +#include "../../util/ThreadPoolDescriptor.hpp" +#include "../ReactionTask.hpp" +#include "Group.hpp" +#include "IdleLock.hpp" +#include "Pool.hpp" + +namespace NUClear { +namespace threading { + namespace scheduler { + + class Scheduler { + public: + Scheduler(const size_t& thread_count); + + /** + * Starts the scheduler, and begins executing tasks. + * + * The main thread will stay in this function executing tasks until the scheduler is shutdown. + */ + void start(); + + /** + * Shuts down the scheduler, all waiting threads are woken, and attempting to get a task results in an + * exception + */ + void stop(); + + /** + * Submit a new task to be executed to the Scheduler. + * + * This method submits a new task to the scheduler. This task will then be sorted into the appropriate + * queue based on it's sync type and priority. It will then wait there until it is removed by a thread to + * be processed. + * + * @param task the reaction task task to submit + * @param immediate if this task should run immediately in the current thread. If immediate execution of + * this task is not possible (e.g. due to group concurrency restrictions) this task will be queued as normal + */ + void submit(std::unique_ptr&& task, const bool& immediate) noexcept; + + /** + * Adds a task to the idle task list. + * + * This task will be executed when all pools are idle. + * + * @param desc the descriptor for the pool to add the task to or ALL for the global idle task list + * @param reaction the reaction to add to the idle task list + */ + void add_idle_task(const util::ThreadPoolDescriptor& desc, const std::shared_ptr& reaction); + + /** + * Removes a task from the idle task list. + * + * @param desc the descriptor for the pool to remove the task from or ALL for the global idle task list + * @param id the reaction id to remove from the idle task list + */ + void remove_idle_task(const util::ThreadPoolDescriptor& desc, const NUClear::id_t& id); + + private: + /** + * Gets a pointer to a specific thread pool, or creates a new one if it does not exist. + * + * If the scheduler has already started, this will also start the pool. + * Otherwise the pool will be started when the scheduler is started. + * + * @param desc the descriptor for the pool to get + * + * @return a shared pointer to the pool + */ + std::shared_ptr get_pool(const util::ThreadPoolDescriptor& desc); + + /** + * Gets a pointer to a specific group, or creates a new one if it does not exist. + * + * @param desc the descriptor for the group to get + * + * @return a shared pointer to the group + */ + std::shared_ptr get_group(const util::GroupDescriptor& desc); + + /// If running is false this means the scheduler is shutting down and no more tasks will be accepted + std::atomic running{true}; + /// If started is false pools will not be started until start is called + /// once start is called future pools will be started immediately + std::atomic started{false}; + + /// A mutex for when we are modifying the groups or pools + std::mutex mutex; + /// A map of group ids to the number of active tasks currently running in that group + std::map> groups{}; + /// A map of pool descriptor ids to pool descriptors + std::map> pools{}; + + /// A list of idle tasks to execute when all pools are idle + std::vector> idle_tasks{}; + /// The number of active threads across all pools in the scheduler + std::atomic active{0}; + + // Pool works with scheduler to manage the thread pools + friend class Pool; + }; + + } // namespace scheduler +} // namespace threading +} // namespace NUClear + +#endif // NUCLEAR_THREADING_TASK_SCHEDULER_HPP diff --git a/tests/tests/threading/Scheduler.cpp b/tests/tests/threading/Scheduler.cpp new file mode 100644 index 00000000..50ed59c6 --- /dev/null +++ b/tests/tests/threading/Scheduler.cpp @@ -0,0 +1,9 @@ +// TODO need to test the following: + +// Idle tasks + +// Multiple pools + +// Priority sorting + +// Sync groups