From 1599e91f5d6d253c49748903ceebf12f07cab5d8 Mon Sep 17 00:00:00 2001 From: Trent Houliston Date: Wed, 21 Feb 2024 15:51:19 +1100 Subject: [PATCH] Add a DSL word that runs when a specific pool or all pools are idle (#100) Adds a new DSL word Idle that can be used to execute a reaction when the pool does not have a current task. There are 4 main variants that this implements ```cpp on> ``` Will execute when all non always threads are idle ```cpp on> ``` Will execute when the default thread pool is idle ```cpp on> ``` Will execute when the main thread's pool is idle ```cpp on> ``` Will execute when that specific pool is idle - [x] Fix up the deadlock caused by the recursive mutex acquisition --------- Co-authored-by: Thomas O'Brien <41043317+Tom0Brien@users.noreply.github.com> Co-authored-by: Tom0Brien --- .github/workflows/gcc.yaml | 4 - src/PowerPlant.cpp | 10 +++ src/PowerPlant.hpp | 26 ++++++ src/Reactor.hpp | 16 ++++ src/dsl/word/Always.hpp | 2 +- src/dsl/word/Idle.hpp | 89 +++++++++++++++++++ src/dsl/word/MainThread.hpp | 2 +- src/dsl/word/Pool.hpp | 15 +++- src/threading/TaskScheduler.cpp | 126 ++++++++++++++++++++++----- src/threading/TaskScheduler.hpp | 50 ++++++++++- src/util/GeneratedCallback.hpp | 2 +- src/util/ThreadPoolDescriptor.hpp | 10 +++ tests/dsl/Idle.cpp | 139 ++++++++++++++++++++++++++++++ tests/dsl/IdleSync.cpp | 83 ++++++++++++++++++ tests/dsl/MainThread.cpp | 2 +- tests/dsl/Sync.cpp | 2 +- tests/test_util/TestBase.hpp | 9 +- 17 files changed, 547 insertions(+), 40 deletions(-) create mode 100644 src/dsl/word/Idle.hpp create mode 100644 tests/dsl/Idle.cpp create mode 100644 tests/dsl/IdleSync.cpp diff --git a/.github/workflows/gcc.yaml b/.github/workflows/gcc.yaml index 4bec64c00..5f784c201 100644 --- a/.github/workflows/gcc.yaml +++ b/.github/workflows/gcc.yaml @@ -34,10 +34,6 @@ jobs: version: "8" - container: ubuntu:20.04 version: "7" - - container: ubuntu:18.04 - version: "6" - - container: ubuntu:18.04 - version: "5" name: Linux GCC-${{ matrix.toolchain.version }} runs-on: ubuntu-latest diff --git a/src/PowerPlant.cpp b/src/PowerPlant.cpp index 4e20eb7e7..f4f271e8b 100644 --- a/src/PowerPlant.cpp +++ b/src/PowerPlant.cpp @@ -75,6 +75,16 @@ void PowerPlant::submit(std::unique_ptr&& task, const b } } +void PowerPlant::add_idle_task(const NUClear::id_t& id, + const util::ThreadPoolDescriptor& pool_descriptor, + std::function&& task) { + scheduler.add_idle_task(id, pool_descriptor, std::move(task)); +} + +void PowerPlant::remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor) { + scheduler.remove_idle_task(id, pool_descriptor); +} + void PowerPlant::shutdown() { // Stop running before we emit the Shutdown event diff --git a/src/PowerPlant.hpp b/src/PowerPlant.hpp index ca1807483..ec327ec81 100644 --- a/src/PowerPlant.hpp +++ b/src/PowerPlant.hpp @@ -131,6 +131,32 @@ class PowerPlant { template T& install(Args&&... args); + /** + * @brief Adds an idle task to the task scheduler. + * + * This function adds an idle task to the task scheduler, which will be executed when the thread pool associated + * with the given `pool_id` has no other tasks to execute. The `task` parameter is a callable object that + * represents the idle task to be executed. + * + * @param id The ID of the task. + * @param pool_descriptor The descriptor for the thread pool to test for idle + * @param task The idle task to be executed. + */ + void add_idle_task(const NUClear::id_t& id, + const util::ThreadPoolDescriptor& pool_descriptor, + std::function&& task); + + /** + * @brief Removes an idle task from the task scheduler. + * + * This function removes an idle task from the task scheduler. The `id` and `pool_id` parameters are used to + * identify the idle task to be removed. + * + * @param id The ID of the task. + * @param pool_descriptor The descriptor for the thread pool to test for idle + */ + void remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor); + /** * @brief Generic submit function for submitting tasks to the thread pool. * diff --git a/src/Reactor.hpp b/src/Reactor.hpp index 9d8dea7cb..cbff255d5 100644 --- a/src/Reactor.hpp +++ b/src/Reactor.hpp @@ -54,6 +54,9 @@ namespace dsl { struct Priority; + template + struct Idle; + struct IO; struct UDP; @@ -190,6 +193,10 @@ class Reactor { /// @copydoc dsl::word::Once using Once = dsl::word::Once; + /// @copydoc dsl::word::Idle + template + using Idle = dsl::word::Idle; + /// @copydoc dsl::word::IO using IO = dsl::word::IO; @@ -227,6 +234,14 @@ class Reactor { /// @copydoc dsl::word::Shutdown using Shutdown = dsl::word::Shutdown; + /// @copydoc dsl::word::Pool + template + using Pool = dsl::word::Pool; + + /// @copydoc dsl::word::Group + template + using Group = dsl::word::Group; + /// @copydoc dsl::word::Every template using Every = dsl::word::Every; @@ -426,6 +441,7 @@ class Reactor { #include "dsl/word/Every.hpp" #include "dsl/word/Group.hpp" #include "dsl/word/IO.hpp" +#include "dsl/word/Idle.hpp" #include "dsl/word/Last.hpp" #include "dsl/word/MainThread.hpp" #include "dsl/word/Network.hpp" diff --git a/src/dsl/word/Always.hpp b/src/dsl/word/Always.hpp index d23a06d47..70e328268 100644 --- a/src/dsl/word/Always.hpp +++ b/src/dsl/word/Always.hpp @@ -81,7 +81,7 @@ namespace dsl { if (pool_id.count(reaction.id) == 0) { pool_id[reaction.id] = util::ThreadPoolDescriptor::get_unique_pool_id(); } - return util::ThreadPoolDescriptor{pool_id[reaction.id], 1}; + return util::ThreadPoolDescriptor{pool_id[reaction.id], 1, false}; } template diff --git a/src/dsl/word/Idle.hpp b/src/dsl/word/Idle.hpp new file mode 100644 index 000000000..beb64f90b --- /dev/null +++ b/src/dsl/word/Idle.hpp @@ -0,0 +1,89 @@ +/* + * MIT License + * + * Copyright (c) 2023 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef NUCLEAR_DSL_WORD_IDLE_HPP +#define NUCLEAR_DSL_WORD_IDLE_HPP + +#include +#include +#include + +#include "../../threading/ReactionTask.hpp" +#include "../fusion/NoOp.hpp" +#include "MainThread.hpp" +#include "Pool.hpp" + +namespace NUClear { +namespace dsl { + namespace word { + + /** + * @brief A base type to handle the common code for idling after turning the pool descriptor into an id. + */ + inline void bind_idle(const std::shared_ptr& reaction, + const util::ThreadPoolDescriptor& pool_descriptor) { + + // Our unbinder to remove this reaction + reaction->unbinders.push_back([pool_descriptor](const threading::Reaction& r) { // + r.reactor.powerplant.remove_idle_task(r.id, pool_descriptor); + }); + + reaction->reactor.powerplant.add_idle_task(reaction->id, pool_descriptor, [reaction] { + reaction->reactor.powerplant.submit(reaction->get_task()); + }); + } + + /** + * @brief Execute a task when there is nothing currently running on the thread pool. + * + * @details + * @code on>() @endcode + * When the thread pool is idle, this task will be executed. This is use + * + * @par Implements + * Bind + * + * @tparam PoolType the descriptor that was used to create the thread pool + * void for the default pool + * MainThread for the main thread pool + */ + template + struct Idle { + template + static inline void bind(const std::shared_ptr& reaction) { + bind_idle(reaction, PoolType::template pool(*reaction)); + } + }; + + template <> + struct Idle { + template + static inline void bind(const std::shared_ptr& reaction) { + bind_idle(reaction, util::ThreadPoolDescriptor::AllPools()); + } + }; + + } // namespace word +} // namespace dsl +} // namespace NUClear + +#endif // NUCLEAR_DSL_WORD_IDLE_HPP diff --git a/src/dsl/word/MainThread.hpp b/src/dsl/word/MainThread.hpp index 29c91b795..de4830f71 100644 --- a/src/dsl/word/MainThread.hpp +++ b/src/dsl/word/MainThread.hpp @@ -42,7 +42,7 @@ namespace dsl { template static inline util::ThreadPoolDescriptor pool(const threading::Reaction& /*reaction*/) { - return util::ThreadPoolDescriptor{util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID, 1}; + return util::ThreadPoolDescriptor{util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID, 1, true}; } }; diff --git a/src/dsl/word/Pool.hpp b/src/dsl/word/Pool.hpp index 63c539199..7f7844807 100644 --- a/src/dsl/word/Pool.hpp +++ b/src/dsl/word/Pool.hpp @@ -65,7 +65,7 @@ namespace dsl { * }; * @endcode */ - template + template struct Pool { static_assert(PoolType::thread_count > 0, "Can not have a thread pool with less than 1 thread"); @@ -84,11 +84,22 @@ namespace dsl { } }; + // When given void as the pool type we use the default thread pool + template <> + struct Pool { + template + static inline util::ThreadPoolDescriptor pool(const threading::Reaction& /*reaction*/) { + return util::ThreadPoolDescriptor{}; + } + }; + // Initialise the thread pool descriptor template const util::ThreadPoolDescriptor Pool::pool_descriptor = { util::ThreadPoolDescriptor::get_unique_pool_id(), - PoolType::thread_count}; + PoolType::thread_count, + true, + }; } // namespace word } // namespace dsl diff --git a/src/threading/TaskScheduler.cpp b/src/threading/TaskScheduler.cpp index 96d8ac326..bba1154ee 100644 --- a/src/threading/TaskScheduler.cpp +++ b/src/threading/TaskScheduler.cpp @@ -79,6 +79,10 @@ namespace threading { } catch (...) { } + if (pool->pool_descriptor.counts_for_idle) { + --global_runnable_tasks; + --pool->runnable_tasks; + } } // Clear the current queue @@ -87,17 +91,18 @@ namespace threading { TaskScheduler::TaskScheduler(const size_t& thread_count) { // Make the queue for the main thread - pool_queues[util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID] = - std::make_shared(util::ThreadPoolDescriptor{util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID, 1}); + pool_queues[util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID] = std::make_shared( + util::ThreadPoolDescriptor{util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID, 1, true}); // Make the default pool with the correct number of threads - get_pool_queue(util::ThreadPoolDescriptor{util::ThreadPoolDescriptor::DEFAULT_THREAD_POOL_ID, thread_count}); + get_pool_queue( + util::ThreadPoolDescriptor{util::ThreadPoolDescriptor::DEFAULT_THREAD_POOL_ID, thread_count, true}); } void TaskScheduler::start_threads(const std::shared_ptr& pool) { // The main thread never needs to be started if (pool->pool_descriptor.pool_id != util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID) { - const std::lock_guard lock(pool->mutex); + const std::lock_guard lock(pool->mutex); while (pool->threads.size() < pool->pool_descriptor.thread_count) { pool->threads.emplace_back(std::make_unique(&TaskScheduler::pool_func, this, pool)); } @@ -142,7 +147,7 @@ namespace threading { // Poke all of the threads to make sure they are awake and then wait for them to finish for (auto& pool : pool_queues) { /* mutex scope */ { - const std::lock_guard queue_lock(pool.second->mutex); + const std::lock_guard queue_lock(pool.second->mutex); pool.second->condition.notify_all(); } for (auto& thread : pool.second->threads) { @@ -163,11 +168,21 @@ namespace threading { started.store(false); running.store(false); for (auto& pool : pool_queues) { - const std::lock_guard lock(pool.second->mutex); + const std::lock_guard lock(pool.second->mutex); pool.second->condition.notify_all(); } } + void TaskScheduler::PoolQueue::submit(Task&& task) { + const std::lock_guard lock(mutex); + + // Insert in sorted order + queue.insert(std::lower_bound(queue.begin(), queue.end(), task), std::move(task)); + + // Notify a single thread that there is a new task + condition.notify_one(); + } + void TaskScheduler::submit(const NUClear::id_t& id, const int& priority, const util::GroupDescriptor& group_descriptor, @@ -194,17 +209,51 @@ namespace threading { // We do not accept new tasks once we are shutdown if (running.load()) { - // Get the appropiate pool for this task const std::shared_ptr pool = get_pool_queue(task.thread_pool_descriptor); + if (pool->pool_descriptor.counts_for_idle) { + ++global_runnable_tasks; + ++pool->runnable_tasks; + } + // Get the appropiate pool for this task and submit + pool->submit(std::move(task)); + } + } + + void TaskScheduler::add_idle_task(const NUClear::id_t& id, + const util::ThreadPoolDescriptor& pool_descriptor, + std::function&& task) { + + if (pool_descriptor.pool_id == NUClear::id_t(-1)) { + const std::lock_guard lock(pool_mutex); + idle_tasks.emplace(id, std::move(task)); + } + else { + // Get the appropiate pool for this task + const std::shared_ptr pool = get_pool_queue(pool_descriptor); // Find where to insert the new task to maintain task order - const std::lock_guard queue_lock(pool->mutex); - auto& queue = pool->queue; - auto it = std::lower_bound(queue.begin(), queue.end(), task); - queue.insert(it, std::move(task)); + const std::lock_guard lock(pool->mutex); + pool->idle_tasks.emplace(id, std::move(task)); + } + } + + void TaskScheduler::remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor) { - // Notify a single thread that there is a new task - pool->condition.notify_one(); + if (pool_descriptor.pool_id == NUClear::id_t(-1)) { + const std::lock_guard lock(pool_mutex); + if (idle_tasks.count(id) > 0) { + idle_tasks.erase(id); + } + } + else { + // Get the appropiate pool for this task + const std::shared_ptr pool = get_pool_queue(pool_descriptor); + + // Find the idle task with the given id and remove it if it exists + const std::lock_guard lock(pool->mutex); + if (pool->idle_tasks.count(id) > 0) { + pool->idle_tasks.erase(id); + } } } @@ -223,7 +272,8 @@ namespace threading { auto& condition = pool->condition; // Keep looking for tasks while the scheduler is still running, or while there are still tasks to process - std::unique_lock lock(pool->mutex); + std::unique_lock lock(pool->mutex); + bool idle = false; while (running.load() || !queue.empty()) { // Only one thread can be checking group concurrency at a time otherwise the ordering might not be correct @@ -235,16 +285,26 @@ namespace threading { // Check if we can run the task if (is_runnable(it->group_descriptor)) { - // Move the task out of the queue Task task = std::move(*it); - // Erase the old position in the queue queue.erase(it); + if (pool->pool_descriptor.counts_for_idle && task.checked_runnable) { + ++global_runnable_tasks; + ++pool->runnable_tasks; + } + // Return the task return task; } + if (!it->checked_runnable) { + if (pool->pool_descriptor.counts_for_idle) { + --global_runnable_tasks; + --pool->runnable_tasks; + } + it->checked_runnable = true; + } } // If pool concurrency is greater than group concurrency some threads can be left with nothing to do. @@ -256,11 +316,35 @@ namespace threading { } } - // Wait for something to happen! - // We don't have a condition on this lock as the check would be this doing this loop again to see if there - // are any tasks we can execute (checking all the groups) so therefore we already did the entry predicate. - // Putting a condition on this would stop spurious wakeups of which the cost would be equal to the loop. - condition.wait(lock); // NOSONAR + if (pool->pool_descriptor.counts_for_idle && !idle) { + idle = true; + + /* mutex scope */ { + const std::lock_guard idle_lock(idle_mutex); + // Local idle tasks + if (pool->runnable_tasks == 0) { + for (auto& t : pool->idle_tasks) { + t.second(); + } + } + + // Global idle tasks + if (global_runnable_tasks == 0) { + for (auto& t : idle_tasks) { + t.second(); + } + } + } + } + else { + + // Wait for something to happen! + // We don't have a condition on this lock as the check would be this doing this loop again to see if + // there are any tasks we can execute (checking all the groups) so therefore we already did the entry + // predicate. Putting a condition on this would stop spurious wakeups of which the cost would be equal + // to the loop. + condition.wait(lock); // NOSONAR + } } // If we get out here then we are finished running. diff --git a/src/threading/TaskScheduler.hpp b/src/threading/TaskScheduler.hpp index 3859e0af2..937d855ad 100644 --- a/src/threading/TaskScheduler.hpp +++ b/src/threading/TaskScheduler.hpp @@ -103,6 +103,8 @@ namespace threading { util::ThreadPoolDescriptor thread_pool_descriptor; /// @brief The callback to be executed std::function run; + /// @brief If task has been checked for runnable + bool checked_runnable{false}; /** * @brief Compare tasks based on their priority @@ -124,12 +126,23 @@ namespace threading { const util::ThreadPoolDescriptor pool_descriptor; /// @brief The threads which are running in this thread pool std::vector> threads; + /// @brief The number of runnable tasks in this thread pool + size_t runnable_tasks{0}; /// @brief The queue of tasks for this specific thread pool std::vector queue; /// @brief The mutex which protects the queue - std::mutex mutex; + std::recursive_mutex mutex; /// @brief The condition variable which threads wait on if they can't get a task - std::condition_variable condition; + std::condition_variable_any condition; + /// @brief The map of idle tasks for this thread pool + std::map> idle_tasks; + + /** + * @brief Submit a new task to this thread pool + * + * @param task the task to submit + */ + void submit(Task&& task); }; public: @@ -176,6 +189,32 @@ namespace threading { const bool& immediate, std::function&& func); + /** + * @brief Adds an idle task to the task scheduler. + * + * This function adds an idle task to the task scheduler, which will be executed when the thread pool associated + * with the given `pool_id` has no other tasks to execute. The `task` parameter is a callable object that + * represents the idle task to be executed. + * + * @param id The ID of the task. + * @param pool_descriptor The descriptor for the thread pool to test for idle + * @param task The idle task to be executed. + */ + void add_idle_task(const NUClear::id_t& id, + const util::ThreadPoolDescriptor& pool_descriptor, + std::function&& task); + + /** + * @brief Removes an idle task from the task scheduler. + * + * This function removes an idle task from the task scheduler. The `id` and `pool_id` parameters are used to + * identify the idle task to be removed. + * + * @param id The ID of the task + * @param pool_descriptor The descriptor for the thread pool to test for idle + */ + void remove_idle_task(const NUClear::id_t& id, const util::ThreadPoolDescriptor& pool_descriptor); + private: /** * @brief Get a task object to be executed by a thread. @@ -247,6 +286,13 @@ namespace threading { /// @brief mutex for the group map std::mutex group_mutex; + /// @brief mutex for the idle tasks + std::mutex idle_mutex; + /// @brief global idle tasks to be executed when no other tasks are running + std::map> idle_tasks{}; + /// @brief the total number of threads that have runnable tasks + std::atomic global_runnable_tasks{0}; + /// @brief A map of pool descriptor ids to pool descriptors std::map> pool_queues{}; /// @brief a mutex for when we are modifying the pool_queues map diff --git a/src/util/GeneratedCallback.hpp b/src/util/GeneratedCallback.hpp index af4a2987f..ef2bb58ab 100644 --- a/src/util/GeneratedCallback.hpp +++ b/src/util/GeneratedCallback.hpp @@ -47,7 +47,7 @@ namespace util { /// @brief the descriptor for the group the task should run in GroupDescriptor group{0, std::numeric_limits::max()}; /// @brief the descriptor the thread pool and task queue that the should run in - ThreadPoolDescriptor pool{util::ThreadPoolDescriptor::DEFAULT_THREAD_POOL_ID, 0}; + ThreadPoolDescriptor pool{util::ThreadPoolDescriptor::DEFAULT_THREAD_POOL_ID, 0, true}; /// @brief the function that should be executed in order to run the task threading::ReactionTask::TaskFunction callback{}; diff --git a/src/util/ThreadPoolDescriptor.hpp b/src/util/ThreadPoolDescriptor.hpp index f6de67408..1c868a479 100644 --- a/src/util/ThreadPoolDescriptor.hpp +++ b/src/util/ThreadPoolDescriptor.hpp @@ -36,11 +36,21 @@ namespace util { * @brief A description of a thread pool */ struct ThreadPoolDescriptor { + ThreadPoolDescriptor() = default; + ThreadPoolDescriptor(const NUClear::id_t& pool_id, size_t thread_count, bool counts_for_idle) + : pool_id(pool_id), thread_count(thread_count), counts_for_idle(counts_for_idle) {} + + static ThreadPoolDescriptor AllPools() { + return ThreadPoolDescriptor{NUClear::id_t(-1), size_t(-1), false}; + } + /// @brief a unique identifier for this pool NUClear::id_t pool_id{ThreadPoolDescriptor::DEFAULT_THREAD_POOL_ID}; /// @brief the number of threads this thread pool will use size_t thread_count{0}; + /// @brief if these threads count towards system idle + bool counts_for_idle{true}; /// @brief the ID of the main thread pool (not to be confused with the ID of the main thread) static const NUClear::id_t MAIN_THREAD_POOL_ID; diff --git a/tests/dsl/Idle.cpp b/tests/dsl/Idle.cpp new file mode 100644 index 000000000..3b5620d06 --- /dev/null +++ b/tests/dsl/Idle.cpp @@ -0,0 +1,139 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include +#include + +#include "test_util/TestBase.hpp" + +namespace { + +/// @brief A vector of events that have happened +std::vector events; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + +struct SimpleMessage { + SimpleMessage(int data) : data(data) {} + int data; +}; + +constexpr int time_step = 50; + +class TestReactor : public test_util::TestBase { +public: + template + struct CustomPool { + static constexpr int thread_count = 2; + }; + + template + void do_step(const std::string& name) { + std::this_thread::sleep_until(start_time + std::chrono::milliseconds(time_step * N)); + events.push_back(name + " " + std::to_string(N)); + emit(std::make_unique>()); + } + + TestReactor(std::unique_ptr environment) : TestBase(std::move(environment), false) { + + start_time = NUClear::clock::now(); + + // Idle testing for default thread + on>>().then([this] { do_step<1>("Default Startup"); }); + on>>().then([this] { do_step<2>("Default Step"); }); + on>>().then([this] { do_step<3>("Default Step"); }); + drh = on>>().then([this] { do_step<4>("Default Idle"); }); + on>>().then([this] { do_step<5>("Default Step"); }); + on>>().then([this] { do_step<6>("Default Step"); }); + on>>().then([this] { do_step<7>("Default Step"); }); + on>>().then([this] { drh.unbind(); }); + + // Idle testing for main thread + on>, MainThread>().then([this] { do_step<9>("Main Startup"); }); + on>, MainThread>().then([this] { do_step<10>("Main Step"); }); + on>, MainThread>().then([this] { do_step<11>("Main Step"); }); + mrh = on>().then([this] { do_step<12>("Main Idle"); }); + on>, MainThread>().then([this] { do_step<13>("Main Step"); }); + on>, MainThread>().then([this] { do_step<14>("Main Step"); }); + on>, MainThread>().then([this] { do_step<15>("Main Step"); }); + on>, MainThread>().then([this] { mrh.unbind(); }); + + // Idle testing for custom pool + on>, Pool>>().then([this] { do_step<17>("Custom<1> Startup"); }); + on>, Pool>>().then([this] { do_step<18>("Custom<1> Step"); }); + on>, Pool>>().then([this] { do_step<19>("Custom<1> Step"); }); + crh = on>>>().then([this] { do_step<20>("Custom<1> Idle"); }); + on>, Pool>>().then([this] { do_step<21>("Custom<1> Step"); }); + on>, Pool>>().then([this] { do_step<22>("Custom<1> Step"); }); + on>, Pool>>().then([this] { do_step<23>("Custom<1> Step"); }); + on>, Pool>>().then([this] { crh.unbind(); }); + + // Idle testing for global + on>, Pool>>().then([this] { do_step<25>("Custom<2> Startup"); }); + on>, Pool>>().then([this] { do_step<26>("Custom<2> Step"); }); + on>, Pool>>().then([this] { do_step<27>("Custom<2> Step"); }); + grh = on>().then([this] { do_step<28>("Global Idle"); }); + on>, Pool>>().then([this] { do_step<29>("Custom<2> Step"); }); + on>, Pool>>().then([this] { do_step<30>("Custom<2> Step"); }); + on>, Pool>>().then([this] { do_step<31>("Custom<2> Step"); }); + on>, Pool>>().then([this] { powerplant.shutdown(); }); + + on().then([this] { + emit(std::make_unique>()); + emit(std::make_unique>()); + emit(std::make_unique>()); + emit(std::make_unique>()); + }); + } + +private: + NUClear::clock::time_point start_time; + NUClear::threading::ReactionHandle drh; + NUClear::threading::ReactionHandle mrh; + NUClear::threading::ReactionHandle crh; + NUClear::threading::ReactionHandle grh; +}; + +} // namespace + + +TEST_CASE("Test that pool idle triggers when nothing is running", "[api][idle]") { + + NUClear::Configuration config; + config.thread_count = 4; + NUClear::PowerPlant plant(config); + plant.install(); + plant.start(); + + const std::vector expected = { + "Default Startup 1", "Default Step 2", "Default Step 3", "Default Idle 4", "Default Step 5", + "Default Step 6", "Default Step 7", "Main Startup 9", "Main Step 10", "Main Step 11", + "Main Idle 12", "Main Step 13", "Main Step 14", "Main Step 15", "Custom<1> Startup 17", + "Custom<1> Step 18", "Custom<1> Step 19", "Custom<1> Idle 20", "Custom<1> Step 21", "Custom<1> Step 22", + "Custom<1> Step 23", "Custom<2> Startup 25", "Custom<2> Step 26", "Custom<2> Step 27", "Global Idle 28", + "Custom<2> Step 29", "Custom<2> Step 30", "Custom<2> Step 31", + }; + + // Make an info print the diff in an easy to read way if we fail + INFO(test_util::diff_string(expected, events)); + + // Check the events fired in order and only those events + REQUIRE(events == expected); +} diff --git a/tests/dsl/IdleSync.cpp b/tests/dsl/IdleSync.cpp new file mode 100644 index 000000000..9cfe8f1fc --- /dev/null +++ b/tests/dsl/IdleSync.cpp @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include +#include + +#include "test_util/TestBase.hpp" + +namespace { + +/// @brief A vector of events that have happened +std::vector events; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + +class TestReactor : public test_util::TestBase { +public: + TestReactor(std::unique_ptr environment) : TestBase(std::move(environment), false) { + + on>, MainThread>().then([this] { + emit(std::make_unique>()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + emit(std::make_unique>()); + }); + + // Idle testing for default thread + on>, Sync>().then([] { + events.push_back("Default Start"); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + events.push_back("Default End"); + }); + + on>, Sync, MainThread>().then([] { events.push_back("Main Task"); }); + + on>().then([this] { + events.push_back("Idle Main Thread"); + powerplant.shutdown(); + }); + + on().then([this] { emit(std::make_unique>()); }); + } +}; + +} // namespace + +TEST_CASE("Test that pool idle triggers when a waiting task prevents running", "[api][idle]") { + + NUClear::Configuration config; + config.thread_count = 4; + NUClear::PowerPlant plant(config); + plant.install(); + plant.start(); + + const std::vector expected = { + "Default Start", + "Idle Main Thread", + "Default End", + "Main Task", + }; + + // Make an info print the diff in an easy to read way if we fail + INFO(test_util::diff_string(expected, events)); + + // Check the events fired in order and only those events + REQUIRE(events == expected); +} diff --git a/tests/dsl/MainThread.cpp b/tests/dsl/MainThread.cpp index 084ddcae4..05aa84000 100644 --- a/tests/dsl/MainThread.cpp +++ b/tests/dsl/MainThread.cpp @@ -35,7 +35,7 @@ struct MessageB {}; class TestReactor : public test_util::TestBase { public: - TestReactor(std::unique_ptr environment) : TestBase(std::move(environment), false) { + TestReactor(std::unique_ptr environment) : TestBase(std::move(environment)) { // Run a task without MainThread to make sure it isn't on the main thread on>().then("Non-MainThread reaction", [this] { diff --git a/tests/dsl/Sync.cpp b/tests/dsl/Sync.cpp index a63ad26ea..c2658d743 100644 --- a/tests/dsl/Sync.cpp +++ b/tests/dsl/Sync.cpp @@ -38,7 +38,7 @@ struct Message { class TestReactor : public test_util::TestBase { public: - TestReactor(std::unique_ptr environment) : TestBase(std::move(environment), false) { + TestReactor(std::unique_ptr environment) : TestBase(std::move(environment)) { on>, Sync>().then([this](const Message<0>& m) { events.push_back("Sync A " + m.data); diff --git a/tests/test_util/TestBase.hpp b/tests/test_util/TestBase.hpp index 59b530fe6..ffadda856 100644 --- a/tests/test_util/TestBase.hpp +++ b/tests/test_util/TestBase.hpp @@ -53,12 +53,9 @@ class TestBase : public NUClear::Reactor { : Reactor(std::move(environment)) { // Shutdown if the system is idle - on, Priority::IDLE>().then([this] { powerplant.shutdown(); }); - on().then([this, shutdown_on_idle] { - if (shutdown_on_idle) { - emit(std::make_unique()); - } - }); + if (shutdown_on_idle) { + on>().then([this] { powerplant.shutdown(); }); + } // Timeout if the test doesn't complete in time on, MainThread>().then([this] {