diff --git a/src/PowerPlant.cpp b/src/PowerPlant.cpp index 15ecc76f..13c63a86 100644 --- a/src/PowerPlant.cpp +++ b/src/PowerPlant.cpp @@ -30,6 +30,9 @@ #include "dsl/word/Shutdown.hpp" #include "dsl/word/Startup.hpp" #include "dsl/word/emit/Direct.hpp" +#include "extension/ChronoController.hpp" +#include "extension/IOController.hpp" +#include "extension/NetworkController.hpp" #include "message/CommandLineArguments.hpp" #include "message/LogMessage.hpp" #include "threading/ReactionTask.hpp" @@ -55,6 +58,11 @@ PowerPlant::PowerPlant(Configuration config, int argc, const char* argv[]) : sch // Store our static variable powerplant = this; + // Install the extension controllers + install(); + install(); + install(); + // Emit our arguments if any. message::CommandLineArguments args; for (int i = 0; i < argc; ++i) { @@ -112,13 +120,13 @@ void PowerPlant::submit(std::unique_ptr&& task, const b if (task) { try { const std::shared_ptr t(std::move(task)); - submit(t->id, t->priority, t->group_descriptor, t->thread_pool_descriptor, immediate, [t]() { t->run(); }); + submit(t->id, t->priority, t->group_descriptor, t->pool_descriptor, immediate, [t]() { t->run(); }); } catch (const std::exception& ex) { - task->parent.reactor.log("There was an exception while submitting a reaction", ex.what()); + task->parent->reactor.log("There was an exception while submitting a reaction", ex.what()); } catch (...) { - task->parent.reactor.log("There was an unknown exception while submitting a reaction"); + task->parent->reactor.log("There was an unknown exception while submitting a reaction"); } } } @@ -130,7 +138,7 @@ void PowerPlant::log(const LogLevel& level, std::string message) { // Direct emit the log message so that any direct loggers can use it emit(std::make_unique( level, - current_task != nullptr ? current_task->parent.reactor.log_level : LogLevel::UNKNOWN, + current_task != nullptr ? current_task->parent->reactor.log_level : LogLevel::UNKNOWN, std::move(message), current_task != nullptr ? current_task->stats : nullptr)); } diff --git a/src/dsl/Parse.hpp b/src/dsl/Parse.hpp index 82df66fa..841e4018 100644 --- a/src/dsl/Parse.hpp +++ b/src/dsl/Parse.hpp @@ -40,36 +40,36 @@ namespace dsl { return DSL::template bind>(r, std::forward(args)...); } - static auto get(threading::Reaction& r) + static auto get(threading::ReactionTask& task) -> decltype(std::conditional_t::value, DSL, fusion::NoOp>::template get< - Parse>(r)) { + Parse>(task)) { return std::conditional_t::value, DSL, fusion::NoOp>::template get>( - r); + task); } - static bool precondition(threading::Reaction& r) { + static bool precondition(threading::ReactionTask& task) { return std::conditional_t::value, DSL, fusion::NoOp>::template precondition< - Parse>(r); + Parse>(task); } - static int priority(threading::Reaction& r) { + static int priority(threading::ReactionTask& task) { return std::conditional_t::value, DSL, fusion::NoOp>::template priority< - Parse>(r); + Parse>(task); } - static util::GroupDescriptor group(threading::Reaction& r) { + static util::GroupDescriptor group(threading::ReactionTask& task) { return std::conditional_t::value, DSL, fusion::NoOp>::template group< - Parse>(r); + Parse>(task); } - static util::ThreadPoolDescriptor pool(threading::Reaction& r) { + static util::ThreadPoolDescriptor pool(threading::ReactionTask& task) { return std::conditional_t::value, DSL, fusion::NoOp>::template pool< - Parse>(r); + Parse>(task); } - static void postcondition(threading::ReactionTask& r) { + static void postcondition(threading::ReactionTask& task) { std::conditional_t::value, DSL, fusion::NoOp>::template postcondition< - Parse>(r); + Parse>(task); } }; diff --git a/src/dsl/fusion/GetFusion.hpp b/src/dsl/fusion/GetFusion.hpp index 831c7a96..83f72095 100644 --- a/src/dsl/fusion/GetFusion.hpp +++ b/src/dsl/fusion/GetFusion.hpp @@ -40,8 +40,8 @@ namespace dsl { */ template struct GetCaller { - static auto call(threading::Reaction& reaction) -> decltype(Function::template get(reaction)) { - return Function::template get(reaction); + static auto call(threading::ReactionTask& task) -> decltype(Function::template get(task)) { + return Function::template get(task); } }; @@ -86,19 +86,19 @@ namespace dsl { struct GetFuser> { template - static auto get(threading::Reaction& reaction) + static auto get(threading::ReactionTask& task) -> decltype(util::FunctionFusion, - decltype(std::forward_as_tuple(reaction)), + decltype(std::forward_as_tuple(task)), GetCaller, std::tuple, - 1>::call(reaction)) { + 1>::call(task)) { // Perform our function fusion return util::FunctionFusion, - decltype(std::forward_as_tuple(reaction)), + decltype(std::forward_as_tuple(task)), GetCaller, std::tuple, - 1>::call(reaction); + 1>::call(task); } }; diff --git a/src/dsl/fusion/GroupFusion.hpp b/src/dsl/fusion/GroupFusion.hpp index 2a717d7b..6debac53 100644 --- a/src/dsl/fusion/GroupFusion.hpp +++ b/src/dsl/fusion/GroupFusion.hpp @@ -77,10 +77,10 @@ namespace dsl { struct GroupFuser> { template - static util::GroupDescriptor group(threading::Reaction& reaction) { + static util::GroupDescriptor group(threading::ReactionTask& task) { // Return our group - return Word::template group(reaction); + return Word::template group(task); } }; @@ -89,7 +89,7 @@ namespace dsl { struct GroupFuser> { template - static void group(const threading::Reaction& /*reaction*/) { + static void group(const threading::ReactionTask& /*task*/) { throw std::invalid_argument("Can not be a member of more than one group"); } }; diff --git a/src/dsl/fusion/NoOp.hpp b/src/dsl/fusion/NoOp.hpp index 3adf3e11..73984ce6 100644 --- a/src/dsl/fusion/NoOp.hpp +++ b/src/dsl/fusion/NoOp.hpp @@ -47,27 +47,27 @@ namespace dsl { } template - static std::tuple<> get(const threading::Reaction& /*reaction*/) { + static std::tuple<> get(const threading::ReactionTask& /*task*/) { return {}; } template - static bool precondition(const threading::Reaction& /*reaction*/) { + static bool precondition(const threading::ReactionTask& /*task*/) { return true; } template - static int priority(const threading::Reaction& /*reaction*/) { + static int priority(const threading::ReactionTask& /*task*/) { return word::Priority::NORMAL::value; } template - static util::GroupDescriptor group(const threading::Reaction& /*reaction*/) { - return util::GroupDescriptor{}; + static util::GroupDescriptor group(const threading::ReactionTask& /*task*/) { + return {}; } template - static util::ThreadPoolDescriptor pool(const threading::Reaction& /*reaction*/) { + static util::ThreadPoolDescriptor pool(const threading::ReactionTask& /*task*/) { return util::ThreadPoolDescriptor{}; } @@ -86,15 +86,15 @@ namespace dsl { static std::tuple<> bind(const std::shared_ptr&); - static std::tuple<> get(threading::Reaction&); + static std::tuple<> get(threading::ReactionTask&); - static bool precondition(threading::Reaction&); + static bool precondition(threading::ReactionTask&); - static int priority(threading::Reaction&); + static int priority(threading::ReactionTask&); - static util::GroupDescriptor group(threading::Reaction&); + static util::GroupDescriptor group(threading::ReactionTask&); - static util::ThreadPoolDescriptor pool(threading::Reaction&); + static util::ThreadPoolDescriptor pool(threading::ReactionTask&); static void postcondition(threading::ReactionTask&); }; diff --git a/src/dsl/fusion/PoolFusion.hpp b/src/dsl/fusion/PoolFusion.hpp index 4064dc5b..65313fe7 100644 --- a/src/dsl/fusion/PoolFusion.hpp +++ b/src/dsl/fusion/PoolFusion.hpp @@ -26,7 +26,7 @@ #include #include -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "../operation/DSLProxy.hpp" #include "has_pool.hpp" @@ -77,10 +77,10 @@ namespace dsl { struct PoolFuser> { template - static util::ThreadPoolDescriptor pool(threading::Reaction& reaction) { + static util::ThreadPoolDescriptor pool(threading::ReactionTask& task) { // Return our pool - return Word::template pool(reaction); + return Word::template pool(task); } }; @@ -89,7 +89,7 @@ namespace dsl { struct PoolFuser> { template - static util::ThreadPoolDescriptor pool(const threading::Reaction& /*reaction*/) { + static util::ThreadPoolDescriptor pool(const threading::ReactionTask& /*task*/) { throw std::invalid_argument("Can not be a member of more than one pool"); } }; diff --git a/src/dsl/fusion/PreconditionFusion.hpp b/src/dsl/fusion/PreconditionFusion.hpp index ae1c418e..5275d5fb 100644 --- a/src/dsl/fusion/PreconditionFusion.hpp +++ b/src/dsl/fusion/PreconditionFusion.hpp @@ -23,7 +23,7 @@ #ifndef NUCLEAR_DSL_FUSION_PRECONDITION_FUSION_HPP #define NUCLEAR_DSL_FUSION_PRECONDITION_FUSION_HPP -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "../operation/DSLProxy.hpp" #include "has_precondition.hpp" @@ -75,10 +75,10 @@ namespace dsl { struct PreconditionFuser> { template - static bool precondition(threading::Reaction& reaction) { + static bool precondition(threading::ReactionTask& task) { // Run our remaining precondition - return Word::template precondition(reaction); + return Word::template precondition(task); } }; @@ -87,11 +87,11 @@ namespace dsl { struct PreconditionFuser> { template - static bool precondition(threading::Reaction& reaction) { + static bool precondition(threading::ReactionTask& task) { // Perform a recursive and operation ending with the first false - return Word1::template precondition(reaction) - && PreconditionFuser>::template precondition(reaction); + return Word1::template precondition(task) + && PreconditionFuser>::template precondition(task); } }; diff --git a/src/dsl/fusion/PriorityFusion.hpp b/src/dsl/fusion/PriorityFusion.hpp index a28c78af..80f8e375 100644 --- a/src/dsl/fusion/PriorityFusion.hpp +++ b/src/dsl/fusion/PriorityFusion.hpp @@ -23,7 +23,7 @@ #ifndef NUCLEAR_DSL_FUSION_PRIORITY_FUSION_HPP #define NUCLEAR_DSL_FUSION_PRIORITY_FUSION_HPP -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "../operation/DSLProxy.hpp" #include "has_priority.hpp" @@ -74,10 +74,10 @@ namespace dsl { struct PriorityFuser> { template - static int priority(threading::Reaction& reaction) { + static int priority(threading::ReactionTask& task) { // Return our priority - return Word::template priority(reaction); + return Word::template priority(task); } }; @@ -86,11 +86,11 @@ namespace dsl { struct PriorityFuser> { template - static int priority(threading::Reaction& reaction) { + static int priority(threading::ReactionTask& task) { // Choose our maximum priority - return std::max(Word1::template priority(reaction), - PriorityFuser>::template priority(reaction)); + return std::max(Word1::template priority(task), + PriorityFuser>::template priority(task)); } }; diff --git a/src/dsl/fusion/has_get.hpp b/src/dsl/fusion/has_get.hpp index 0649519c..e07f2a76 100644 --- a/src/dsl/fusion/has_get.hpp +++ b/src/dsl/fusion/has_get.hpp @@ -23,7 +23,7 @@ #ifndef NUCLEAR_DSL_FUSION_HAS_GET_HPP #define NUCLEAR_DSL_FUSION_HAS_GET_HPP -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "NoOp.hpp" namespace NUClear { @@ -42,7 +42,8 @@ namespace dsl { using no = std::false_type; template - static auto test(int) -> decltype(U::template get(std::declval()), yes()); + static auto test(int) -> decltype(U::template get(std::declval()), + yes()); template static no test(...); diff --git a/src/dsl/fusion/has_group.hpp b/src/dsl/fusion/has_group.hpp index 6c2a83d8..e5bf5071 100644 --- a/src/dsl/fusion/has_group.hpp +++ b/src/dsl/fusion/has_group.hpp @@ -23,7 +23,7 @@ #ifndef NUCLEAR_DSL_FUSION_HAS_GROUP_HPP #define NUCLEAR_DSL_FUSION_HAS_GROUP_HPP -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "NoOp.hpp" namespace NUClear { @@ -42,7 +42,7 @@ namespace dsl { using no = std::false_type; template - static auto test(int) -> decltype(U::template group(std::declval()), + static auto test(int) -> decltype(U::template group(std::declval()), yes()); template static no test(...); diff --git a/src/dsl/fusion/has_pool.hpp b/src/dsl/fusion/has_pool.hpp index 9715e7b3..2f5c81e0 100644 --- a/src/dsl/fusion/has_pool.hpp +++ b/src/dsl/fusion/has_pool.hpp @@ -23,7 +23,7 @@ #ifndef NUCLEAR_DSL_FUSION_HAS_POOL_HPP #define NUCLEAR_DSL_FUSION_HAS_POOL_HPP -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "NoOp.hpp" namespace NUClear { @@ -42,7 +42,7 @@ namespace dsl { using no = std::false_type; template - static auto test(int) -> decltype(U::template pool(std::declval()), + static auto test(int) -> decltype(U::template pool(std::declval()), yes()); template static no test(...); diff --git a/src/dsl/fusion/has_precondition.hpp b/src/dsl/fusion/has_precondition.hpp index 4fa9e36f..5fc22554 100644 --- a/src/dsl/fusion/has_precondition.hpp +++ b/src/dsl/fusion/has_precondition.hpp @@ -23,7 +23,7 @@ #ifndef NUCLEAR_DSL_FUSION_HAS_PRECONDITION_HPP #define NUCLEAR_DSL_FUSION_HAS_PRECONDITION_HPP -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "NoOp.hpp" namespace NUClear { @@ -43,7 +43,7 @@ namespace dsl { template static auto test(int) - -> decltype(U::template precondition(std::declval()), yes()); + -> decltype(U::template precondition(std::declval()), yes()); template static no test(...); diff --git a/src/dsl/fusion/has_priority.hpp b/src/dsl/fusion/has_priority.hpp index e033b271..6d97e01b 100644 --- a/src/dsl/fusion/has_priority.hpp +++ b/src/dsl/fusion/has_priority.hpp @@ -23,7 +23,7 @@ #ifndef NUCLEAR_DSL_FUSION_HAS_PRIORITY_HPP #define NUCLEAR_DSL_FUSION_HAS_PRIORITY_HPP -#include "../../threading/Reaction.hpp" +#include "../../threading/ReactionTask.hpp" #include "NoOp.hpp" namespace NUClear { @@ -42,8 +42,8 @@ namespace dsl { using no = std::false_type; template - static auto test(int) -> decltype(U::template priority(std::declval()), - yes()); + static auto test(int) + -> decltype(U::template priority(std::declval()), yes()); template static no test(...); diff --git a/src/dsl/operation/CacheGet.hpp b/src/dsl/operation/CacheGet.hpp index 8d2280e4..818945b4 100644 --- a/src/dsl/operation/CacheGet.hpp +++ b/src/dsl/operation/CacheGet.hpp @@ -43,7 +43,7 @@ namespace dsl { struct CacheGet { template - static std::shared_ptr get(const threading::Reaction& /*reaction*/) { + static std::shared_ptr get(const threading::ReactionTask& /*task*/) { return store::ThreadStore>::value == nullptr ? store::DataStore::get() diff --git a/src/dsl/word/Always.hpp b/src/dsl/word/Always.hpp index 74885c7c..85ccc3a1 100644 --- a/src/dsl/word/Always.hpp +++ b/src/dsl/word/Always.hpp @@ -28,6 +28,7 @@ #include #include "../../id.hpp" +#include "../../threading/ReactionIdentifiers.hpp" #include "../../threading/ReactionTask.hpp" #include "../../util/ThreadPoolDescriptor.hpp" @@ -57,7 +58,7 @@ namespace dsl { * * @par Ensure Clean Shutdown * If the reaction associated with this task is performing a blocking operation, developers should make the - * the reaction interruptible with an on reaction. This will enforce a clean shutdown in the system. + * the reaction interruptable with an on reaction. This will enforce a clean shutdown in the system. * * @attention * Where possible, developers should avoid using this keyword. It has been provided, but should only be @@ -71,87 +72,70 @@ namespace dsl { struct Always { template - static util::ThreadPoolDescriptor pool(const threading::Reaction& reaction) { - static std::map pool_id; + static util::ThreadPoolDescriptor pool(const threading::ReactionTask& task) { + static std::map pool_ids; static std::mutex mutex; - const std::lock_guard lock(mutex); - if (pool_id.count(reaction.id) == 0) { - pool_id[reaction.id] = util::ThreadPoolDescriptor::get_unique_pool_id(); + const auto& reaction = *task.parent; + id_t pool_id = 0; + + /*mutex scope*/ { + const std::lock_guard lock(mutex); + if (pool_ids.count(reaction.id) == 0) { + pool_ids[reaction.id] = util::ThreadPoolDescriptor::get_unique_pool_id(); + } + pool_id = pool_ids.at(reaction.id); } - return util::ThreadPoolDescriptor{pool_id[reaction.id], 1, false}; + + return util::ThreadPoolDescriptor{pool_id, 1, false}; } template - static void bind(const std::shared_ptr& always_reaction) { - /** - * Static map mapping reaction id (from the always reaction) to a pair of reaction pointers -- one for - * the always reaction and one for the idle reaction that we generate in this function - * The main purpose of this map is to ensure that the always reaction pointer doesn't get destroyed - */ - static std::map, std::shared_ptr>> - reaction_store = {}; - - /** - * Generate a new reaction for an idle task. - * - * The purpose of this reaction is to ensure that the always reaction is resubmitted in the event that - * the precondition fails. (e.g. on> will fail the precondition if there are no X - * messages previously emitted) - * - * In the event that the precondition on the always reaction fails this idle task will run and resubmit - * both the always reaction and the idle reaction. - * - * The idle reaction must have a lower priority than the always reaction and must also run in the same - * thread pool and group as the always reaction. - */ - auto idle_reaction = std::make_shared( - always_reaction->reactor, - threading::ReactionIdentifiers{always_reaction->identifiers->name + " - IDLE Task", - always_reaction->identifiers->reactor, - always_reaction->identifiers->dsl, - always_reaction->identifiers->function}, - [always_reaction](threading::Reaction& ir) -> util::GeneratedCallback { - auto callback = [&ir, always_reaction](const threading::ReactionTask& /*task*/) { - // Get a task for the always reaction and submit it to the scheduler - always_reaction->reactor.powerplant.submit(always_reaction->get_task()); - - // Get a task for the idle reaction and submit it to the scheduler - ir.reactor.powerplant.submit(ir.get_task()); - }; - - // Make sure that idle reaction always has lower priority than the always reaction - return {DSL::priority(*always_reaction) - 1, - DSL::group(*always_reaction), - DSL::pool(*always_reaction), - callback}; - }); - - // Don't emit stats for the idle reaction - idle_reaction->emit_stats = false; - - // Keep this reaction handy so it doesn't go out of scope - reaction_store[always_reaction->id] = {always_reaction, idle_reaction}; + static void bind(const std::shared_ptr& reaction) { // Create an unbinder for the always reaction - always_reaction->unbinders.push_back([](threading::Reaction& r) { + reaction->unbinders.push_back([](threading::Reaction& r) { r.enabled = false; - reaction_store.erase(r.id); // TODO(Alex/Trent) Clean up thread pool too }); - // Get a task for the always reaction and submit it to the scheduler - always_reaction->reactor.powerplant.submit(always_reaction->get_task()); - - // Get a task for the idle reaction and submit it to the scheduler - idle_reaction->reactor.powerplant.submit(idle_reaction->get_task()); + // Submit the always and idle task to the scheduler + PowerPlant::powerplant->submit(reaction->get_task()); + PowerPlant::powerplant->submit(make_idle_task(reaction)); } template static void postcondition(threading::ReactionTask& task) { // Get a task for the always reaction and submit it to the scheduler - task.parent.reactor.powerplant.submit(task.parent.get_task()); + PowerPlant::powerplant->submit(task.parent->get_task()); + } + + private: + /** + * Generate an idle task for Always which will be used to resubmit the Always task if it fails + * + * @tparam DSL the DSL that the Always task is using + * @param reaction the reaction that the Always task is associated with + * + * @return a unique pointer to the idle task which will resubmit the Always task and itself + */ + template + static std::unique_ptr make_idle_task( + const std::shared_ptr& reaction) { + + auto idle_task = std::make_unique( + reaction, + [](threading::ReactionTask& task) { return DSL::priority(task) - 1; }, + DSL::pool, + DSL::group); + + idle_task->callback = [](threading::ReactionTask& task) { + // Submit the always and idle tasks to the scheduler + PowerPlant::powerplant->submit(task.parent->get_task()); + PowerPlant::powerplant->submit(make_idle_task(task.parent)); + }; + + return idle_task; } }; diff --git a/src/dsl/word/Buffer.hpp b/src/dsl/word/Buffer.hpp index 17205968..f62f47b3 100644 --- a/src/dsl/word/Buffer.hpp +++ b/src/dsl/word/Buffer.hpp @@ -47,9 +47,9 @@ namespace dsl { struct Buffer { template - static bool precondition(const threading::Reaction& reaction) { + static bool precondition(const threading::ReactionTask& task) { // We only run if there are less than the target number of active tasks - return reaction.active_tasks < (n + 1); + return task.parent->active_tasks < (n + 1); } }; diff --git a/src/dsl/word/Group.hpp b/src/dsl/word/Group.hpp index 7b6bf54c..77b02424 100644 --- a/src/dsl/word/Group.hpp +++ b/src/dsl/word/Group.hpp @@ -72,7 +72,7 @@ namespace dsl { static const util::GroupDescriptor group_descriptor; template - static util::GroupDescriptor group(const threading::Reaction& /*reaction*/) { + static util::GroupDescriptor group(const threading::ReactionTask& /*task*/) { return group_descriptor; } }; diff --git a/src/dsl/word/IO.hpp b/src/dsl/word/IO.hpp index 662d859c..bd3a75f2 100644 --- a/src/dsl/word/IO.hpp +++ b/src/dsl/word/IO.hpp @@ -142,7 +142,7 @@ namespace dsl { } template - static Event get(const threading::Reaction& /*reaction*/) { + static Event get(const threading::ReactionTask& /*task*/) { // If our thread store has a value if (ThreadEventStore::value) { @@ -155,7 +155,7 @@ namespace dsl { template static void postcondition(threading::ReactionTask& task) { - task.parent.reactor.emit(std::make_unique(task.parent.id)); + task.parent->reactor.emit(std::make_unique(task.parent->id)); } }; diff --git a/src/dsl/word/Idle.hpp b/src/dsl/word/Idle.hpp index 6d123109..a4e160f0 100644 --- a/src/dsl/word/Idle.hpp +++ b/src/dsl/word/Idle.hpp @@ -68,7 +68,10 @@ namespace dsl { struct Idle { template static void bind(const std::shared_ptr& reaction) { - bind_idle(reaction, PoolType::template pool(*reaction)); + + // Make a fake task to use for finding an appropriate descriptor + threading::ReactionTask task(reaction, DSL::priority, DSL::pool, DSL::group); + bind_idle(reaction, PoolType::template pool(task)); } }; diff --git a/src/dsl/word/Last.hpp b/src/dsl/word/Last.hpp index 8ec9bd77..49fdaab6 100644 --- a/src/dsl/word/Last.hpp +++ b/src/dsl/word/Last.hpp @@ -165,18 +165,18 @@ namespace dsl { public: template - static auto get(threading::Reaction& reaction) + static auto get(threading::ReactionTask& task) -> decltype(wrap( - Fusion::template get(reaction), + Fusion::template get(task), util::GenerateSequence< 0, - std::tuple_size::template get(reaction))>::value>())) { + std::tuple_size::template get(task))>::value>())) { // Wrap all of our data in last list wrappers - return wrap(Fusion::template get(reaction), + return wrap(Fusion::template get(task), util::GenerateSequence< 0, - std::tuple_size::template get(reaction))>::value>()); + std::tuple_size::template get(task))>::value>()); } }; diff --git a/src/dsl/word/MainThread.hpp b/src/dsl/word/MainThread.hpp index e42bdda3..d8a789bf 100644 --- a/src/dsl/word/MainThread.hpp +++ b/src/dsl/word/MainThread.hpp @@ -40,9 +40,16 @@ namespace dsl { */ struct MainThread { + /// the description of the thread pool to be used for this PoolType + static util::ThreadPoolDescriptor descriptor() { + return util::ThreadPoolDescriptor{NUClear::id_t(util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID), + 1, + true}; + } + template - static util::ThreadPoolDescriptor pool(const threading::Reaction& /*reaction*/) { - return util::ThreadPoolDescriptor{util::ThreadPoolDescriptor::MAIN_THREAD_POOL_ID, 1, true}; + static util::ThreadPoolDescriptor pool(const threading::ReactionTask& /*task*/) { + return descriptor(); } }; diff --git a/src/dsl/word/Network.hpp b/src/dsl/word/Network.hpp index 5c488f8f..4e064fc5 100644 --- a/src/dsl/word/Network.hpp +++ b/src/dsl/word/Network.hpp @@ -89,8 +89,7 @@ namespace dsl { } template - static std::tuple, NetworkData> get( - const threading::Reaction& /*reaction*/) { + static std::tuple, NetworkData> get(threading::ReactionTask& /*task*/) { auto* data = store::ThreadStore>::value; auto* source = store::ThreadStore::value; diff --git a/src/dsl/word/Once.hpp b/src/dsl/word/Once.hpp index 41a2c9df..d0bb339b 100644 --- a/src/dsl/word/Once.hpp +++ b/src/dsl/word/Once.hpp @@ -40,11 +40,10 @@ namespace dsl { struct Once : Single { // Post condition to unbind this reaction. - template - static void postcondition(threading::ReactionTask& reaction) { + static void postcondition(threading::ReactionTask& task) { // Unbind: - reaction.parent.unbind(); + task.parent->unbind(); } }; diff --git a/src/dsl/word/Optional.hpp b/src/dsl/word/Optional.hpp index e699e15c..abf6cfc3 100644 --- a/src/dsl/word/Optional.hpp +++ b/src/dsl/word/Optional.hpp @@ -74,18 +74,18 @@ namespace dsl { public: template - static auto get(threading::Reaction& reaction) + static auto get(threading::ReactionTask& task) -> decltype(wrap( - Fusion::template get(reaction), + Fusion::template get(task), util::GenerateSequence< 0, - std::tuple_size::template get(reaction))>::value>())) { + std::tuple_size::template get(task))>::value>())) { // Wrap all of our data in optional wrappers - return wrap(Fusion::template get(reaction), + return wrap(Fusion::template get(task), util::GenerateSequence< 0, - std::tuple_size::template get(reaction))>::value>()); + std::tuple_size::template get(task))>::value>()); } }; diff --git a/src/dsl/word/Pool.hpp b/src/dsl/word/Pool.hpp index 95ca8adf..b5ff11a9 100644 --- a/src/dsl/word/Pool.hpp +++ b/src/dsl/word/Pool.hpp @@ -77,7 +77,7 @@ namespace dsl { * @tparam DSL the DSL used for this reaction */ template - static util::ThreadPoolDescriptor pool(const threading::Reaction& /*reaction*/) { + static util::ThreadPoolDescriptor pool(const threading::ReactionTask& /*task*/) { return pool_descriptor; } }; @@ -86,7 +86,7 @@ namespace dsl { template <> struct Pool { template - static util::ThreadPoolDescriptor pool(const threading::Reaction& /*reaction*/) { + static util::ThreadPoolDescriptor pool(const threading::ReactionTask& /*task*/) { return util::ThreadPoolDescriptor{}; } }; diff --git a/src/dsl/word/Priority.hpp b/src/dsl/word/Priority.hpp index b4351083..38e5d954 100644 --- a/src/dsl/word/Priority.hpp +++ b/src/dsl/word/Priority.hpp @@ -74,7 +74,7 @@ namespace dsl { static constexpr int value = 1000; template - static int priority(const threading::Reaction& /*reaction*/) { + static int priority(const threading::ReactionTask& /*task*/) { return value; } }; @@ -84,7 +84,7 @@ namespace dsl { static constexpr int value = 750; template - static int priority(const threading::Reaction& /*reaction*/) { + static int priority(const threading::ReactionTask& /*task*/) { return value; } }; @@ -94,7 +94,7 @@ namespace dsl { static constexpr int value = 500; template - static int priority(const threading::Reaction& /*reaction*/) { + static int priority(const threading::ReactionTask& /*task*/) { return value; } }; @@ -104,7 +104,7 @@ namespace dsl { static constexpr int value = 250; template - static int priority(const threading::Reaction& /*reaction*/) { + static int priority(const threading::ReactionTask& /*task*/) { return value; } }; @@ -114,7 +114,7 @@ namespace dsl { static constexpr int value = 0; template - static int priority(const threading::Reaction& /*reaction*/) { + static int priority(const threading::ReactionTask& /*task*/) { return value; } }; diff --git a/src/dsl/word/TCP.hpp b/src/dsl/word/TCP.hpp index 8c0a497d..790a9ddf 100644 --- a/src/dsl/word/TCP.hpp +++ b/src/dsl/word/TCP.hpp @@ -160,10 +160,10 @@ namespace dsl { } template - static Connection get(threading::Reaction& reaction) { + static Connection get(threading::ReactionTask& task) { // Get our file descriptor from the magic cache - auto event = IO::get(reaction); + auto event = IO::get(task); // If our get is being run without an fd (something else triggered) then short circuit if (!event) { diff --git a/src/dsl/word/UDP.hpp b/src/dsl/word/UDP.hpp index b7ca39f0..6a214d69 100644 --- a/src/dsl/word/UDP.hpp +++ b/src/dsl/word/UDP.hpp @@ -337,9 +337,9 @@ namespace dsl { } template - static RecvResult read(threading::Reaction& reaction) { + static RecvResult read(threading::ReactionTask& task) { // Get our file descriptor from the magic cache - auto event = IO::get(reaction); + auto event = IO::get(task); // If our get is being run without an fd (something else triggered) then short circuit if (!event) { @@ -422,8 +422,8 @@ namespace dsl { } template - static Packet get(threading::Reaction& reaction) { - RecvResult result = read(reaction); + static Packet get(threading::ReactionTask& task) { + RecvResult result = read(task); Packet p{}; p.valid = result.valid; @@ -467,8 +467,8 @@ namespace dsl { } template - static Packet get(threading::Reaction& reaction) { - RecvResult result = read(reaction); + static Packet get(threading::ReactionTask& task) { + RecvResult result = read(task); // Broadcast is only IPv4 if (result.local.sock.sa_family == AF_INET) { @@ -514,8 +514,8 @@ namespace dsl { } template - static Packet get(threading::Reaction& reaction) { - RecvResult result = read(reaction); + static Packet get(threading::ReactionTask& task) { + RecvResult result = read(task); const auto& a = result.local; const bool multicast = @@ -538,6 +538,7 @@ namespace dsl { } }; }; + } // namespace word namespace trait { diff --git a/src/extension/IOController_Windows.ipp b/src/extension/IOController_Windows.ipp index b7c124be..1c8f39a2 100644 --- a/src/extension/IOController_Windows.ipp +++ b/src/extension/IOController_Windows.ipp @@ -137,7 +137,6 @@ namespace extension { } } - IOController::IOController(std::unique_ptr environment) : Reactor(std::move(environment)) { // Create an event to use for the notifier (used for getting out of WSAWaitForMultipleEvents()) diff --git a/src/threading/Reaction.cpp b/src/threading/Reaction.cpp index 9527aada..2b39d6d2 100644 --- a/src/threading/Reaction.cpp +++ b/src/threading/Reaction.cpp @@ -26,7 +26,6 @@ #include #include "../id.hpp" -#include "../util/GeneratedCallback.hpp" #include "ReactionIdentifiers.hpp" #include "ReactionTask.hpp" @@ -49,20 +48,8 @@ namespace threading { return nullptr; } - // Run our generator to get a functor we can run - auto callback = generator(*this); - - // If our generator returns a valid function - if (callback) { - return std::make_unique(*this, - callback.priority, - callback.group, - callback.pool, - std::move(callback.callback)); - } - - // Otherwise we return a null pointer - return nullptr; + // Return the task returned by the generator + return generator(this->shared_from_this()); } void Reaction::unbind() { diff --git a/src/threading/Reaction.hpp b/src/threading/Reaction.hpp index c930907b..243520d9 100644 --- a/src/threading/Reaction.hpp +++ b/src/threading/Reaction.hpp @@ -53,14 +53,14 @@ namespace threading { * It also holds a function which is used to generate databound Task objects. * i.e. callback with the function arguments already loaded and ready to run. */ - class Reaction { + class Reaction : public std::enable_shared_from_this { // Reaction handles are given to user code to enable and disable the reaction friend class ReactionHandle; friend class ReactionTask; public: // The type of the generator that is used to create functions for ReactionTask objects - using TaskGenerator = std::function; + using TaskGenerator = std::function(const std::shared_ptr&)>; /** * Constructs a new Reaction with the passed callback generator and options. diff --git a/src/threading/ReactionTask.cpp b/src/threading/ReactionTask.cpp index 4b9c73bb..a81988bc 100644 --- a/src/threading/ReactionTask.cpp +++ b/src/threading/ReactionTask.cpp @@ -24,11 +24,9 @@ #include #include -#include #include #include "../id.hpp" -#include "../message/ReactionStatistics.hpp" #include "../util/platform.hpp" #include "Reaction.hpp" @@ -39,38 +37,30 @@ namespace threading { return current_task; } - ReactionTask::ReactionTask(Reaction& parent, - const int& priority, - const util::GroupDescriptor& group_descriptor, - const util::ThreadPoolDescriptor& thread_pool_descriptor, - TaskFunction&& callback) - : parent(parent) - , priority(priority) - , stats(std::make_shared(parent.identifiers, - parent.id, - id, - current_task != nullptr ? current_task->parent.id : 0, - current_task != nullptr ? current_task->id : 0, - clock::now(), - clock::time_point(std::chrono::seconds(0)), - clock::time_point(std::chrono::seconds(0)), - nullptr)) - , emit_stats(parent.emit_stats && (current_task != nullptr ? current_task->emit_stats : true)) - , group_descriptor(group_descriptor) - , thread_pool_descriptor(thread_pool_descriptor) - , callback(std::move(callback)) {} + ReactionTask::~ReactionTask() { + // Decrement the number of active tasks + if (parent != nullptr) { + --parent->active_tasks; + } + } void ReactionTask::run() { + // Update the current task + auto* t = std::exchange(current_task, this); + try { + // Run our callback + callback(*this); + } + catch (...) { // NOLINT(bugprone-empty-catch) + // This shouldn't happen, but either way no exceptions should ever leave this function + // They should have all been caught and callback is noexcept + // However somehow it still happens sometimes so we need to catch it + } - // Update our current task - const std::shared_ptr lock(current_task, [](ReactionTask* t) { current_task = t; }); - current_task = this; - - // Run our callback - callback(*this); + // Restore the current task + current_task = t; } - NUClear::id_t ReactionTask::new_task_id() { static std::atomic task_id_source(0); return ++task_id_source; diff --git a/src/threading/ReactionTask.hpp b/src/threading/ReactionTask.hpp index 606840a7..44808ba0 100644 --- a/src/threading/ReactionTask.hpp +++ b/src/threading/ReactionTask.hpp @@ -28,6 +28,7 @@ #include #include +#include "../clock.hpp" #include "../id.hpp" #include "../util/GroupDescriptor.hpp" #include "../util/ThreadPoolDescriptor.hpp" @@ -56,7 +57,7 @@ namespace threading { public: /// Type of the functions that ReactionTasks execute - using TaskFunction = std::function; + using TaskFunction = std::function; /** * Gets the current executing task, or nullptr if there isn't one. @@ -68,24 +69,56 @@ namespace threading { /** * Creates a new ReactionTask object bound with the parent Reaction object (that created it) and task. * - * @param parent The Reaction object that spawned this ReactionTask - * @param priority The priority to use when executing this task - * @param group_descriptor The descriptor for the group that this task should run in - * @param thread_pool_descriptor The descriptor for the thread pool that this task should be queued in - * @param callback The data bound callback to be executed in the thread pool + * @param parent The Reaction object that spawned this ReactionTask. + * @param priority_fn A function that can be called to get the priority of this task + * @param thread_pool_fn A function that can be called to get the thread pool descriptor for this task + * @param group_fn A function that can be called to get the list of group descriptors for this task */ - ReactionTask(Reaction& parent, - const int& priority, - const util::GroupDescriptor& group_descriptor, - const util::ThreadPoolDescriptor& thread_pool_descriptor, - TaskFunction&& callback); - + template + ReactionTask(const std::shared_ptr& parent, + const GetPriority& priority_fn, + const GetThreadPool& thread_pool_fn, + const GetGroup& group_fn) + : parent(parent) + , id(new_task_id()) + , priority(priority_fn(*this)) + , pool_descriptor(thread_pool_fn(*this)) + , group_descriptor(group_fn(*this)) + // Only create a stats object if we wouldn't cause an infinite loop of stats + , stats(parent != nullptr && parent->emit_stats + && (current_task == nullptr || current_task->stats != nullptr) + ? std::make_shared( + parent->identifiers, + parent->id, + id, + current_task != nullptr ? current_task->parent->id : 0, + current_task != nullptr ? current_task->id : 0, + clock::now(), + clock::time_point(std::chrono::seconds(0)), + clock::time_point(std::chrono::seconds(0)), + nullptr) + : nullptr) { + // Increment the number of active tasks + if (parent != nullptr) { + parent->active_tasks.fetch_add(1, std::memory_order_release); + } + } + + // No copying or moving of tasks (use unique_ptrs to manage tasks) + ReactionTask(const ReactionTask&) = delete; + ReactionTask& operator=(const ReactionTask&) = delete; + ReactionTask(ReactionTask&&) = delete; + ReactionTask& operator=(ReactionTask&&) = delete; /** - * Runs the internal data bound task and times it. + * Destructor for the ReactionTask object. * - * This runs the internal data bound task and times how long the execution takes. - * These figures can then be used in a debugging context to calculate how long callbacks are taking to run. + * This will decrement the active_tasks counter on the parent Reaction object. + */ + ~ReactionTask(); + + /** + * Runs the internal data bound task. */ void run(); @@ -96,29 +129,44 @@ namespace threading { */ static NUClear::id_t new_task_id(); - /// The parent Reaction object which spawned this - Reaction& parent; + /// The parent Reaction object which spawned this, or nullptr if this is a floating task + std::shared_ptr parent; /// The task id of this task (the sequence number of this particular task) - NUClear::id_t id{new_task_id()}; + NUClear::id_t id; + /// The priority to run this task at int priority; - /// The statistics object that persists after this for information and debugging - std::shared_ptr stats; - /// If these stats are safe to emit. It should start true, and as soon as we are a reaction based on - /// reaction statistics becomes false for all created tasks. - /// This is to stop infinite loops tasks triggering tasks. - bool emit_stats; - + /// Details about the thread pool that this task will run from, this will also influence what task queue + /// the tasks will be queued on + util::ThreadPoolDescriptor pool_descriptor; /// Details about the group that this task will run in util::GroupDescriptor group_descriptor; - /// Details about the thread pool that this task will run from, this will also influence what task queue the - /// tasks will be queued on - util::ThreadPoolDescriptor thread_pool_descriptor; + /// The statistics object that records run details about this reaction task + /// This will be nullptr if this task is ineligible to emit stats (e.g. it would cause a loop) + std::shared_ptr stats; /// The data bound callback to be executed /// @attention note this must be last in the list as the this pointer is passed to the callback generator TaskFunction callback; + + /** + * This operator compares two ReactionTask objects based on their priority and ID. + * + * It sorts tasks in the order that they should be executed by comparing their priority the the order they were + * created. The task that should execute first will have the lowest sort order value and the task that should + * execute last will have the highest sort order. + * + * The task with higher priority is considered less. + * If two tasks have equal priority, the one with the lower ID is considered less. + * + * @param other The other ReactionTask object to compare with. + * + * @return true if the current object is less than the other object, false otherwise. + */ + bool operator<(const ReactionTask& other) const { + return priority == other.priority ? id < other.id : priority > other.priority; + } }; } // namespace threading diff --git a/src/util/CallbackGenerator.hpp b/src/util/CallbackGenerator.hpp index 673b6e6f..3818de88 100644 --- a/src/util/CallbackGenerator.hpp +++ b/src/util/CallbackGenerator.hpp @@ -32,7 +32,6 @@ #include "../util/apply.hpp" #include "../util/unpack.hpp" #include "../util/update_current_thread_priority.hpp" -#include "GeneratedCallback.hpp" namespace NUClear { namespace util { @@ -69,22 +68,17 @@ namespace util { std::get(data))...); } - GeneratedCallback operator()(threading::Reaction& r) { + std::unique_ptr operator()(const std::shared_ptr& r) { - // Add one to our active tasks - ++r.active_tasks; + auto task = std::make_unique(r, DSL::priority, DSL::pool, DSL::group); // Check if we should even run - if (!DSL::precondition(r)) { - // Take one from our active tasks - --r.active_tasks; - - // We cancel our execution by returning an empty function - return {}; + if (!DSL::precondition(*task)) { + return nullptr; } // Bind our data to a variable (this will run in the dispatching thread) - auto data = DSL::get(r); + auto data = DSL::get(*task); // Merge our transient data in merge_transients(data, @@ -93,49 +87,47 @@ namespace util { // Check if our data is good (all the data exists) otherwise terminate the call if (!check_data(data)) { - // Take one from our active tasks - --r.active_tasks; - - // We cancel our execution by returning an empty function - return {}; + return nullptr; } // We have to make a copy of the callback because the "this" variable can go out of scope - auto c = callback; - return GeneratedCallback(DSL::priority(r), - DSL::group(r), - DSL::pool(r), - [c, data](threading::ReactionTask& task) noexcept { - // Update our thread's priority to the correct level - update_current_thread_priority(task.priority); - - // Record our start time - task.stats->started = clock::now(); - - // We have to catch any exceptions - try { - // We call with only the relevant arguments to the passed function - util::apply_relevant(c, std::move(data)); - } - catch (...) { - // Catch our exception if it happens - task.stats->exception = std::current_exception(); - } - - // Our finish time - task.stats->finished = clock::now(); - - // Run our postconditions - DSL::postcondition(task); - - // Take one from our active tasks - --task.parent.active_tasks; - - // Emit our reaction statistics if it wouldn't cause a loop - if (task.emit_stats) { - PowerPlant::powerplant->emit_shared(task.stats); - } - }); + auto c = callback; + task->callback = [c, data](threading::ReactionTask& task) noexcept { + // Update our thread's priority to the correct level + update_current_thread_priority(task.priority); + + // Record our start time + if (task.stats != nullptr) { + task.stats->started = clock::now(); + } + + // We have to catch any exceptions + try { + // We call with only the relevant arguments to the passed function + util::apply_relevant(c, std::move(data)); + } + catch (...) { + // Catch our exception if it happens + if (task.stats != nullptr) { + task.stats->exception = std::current_exception(); + } + } + + // Our finish time + if (task.stats != nullptr) { + task.stats->finished = clock::now(); + } + + // Run our postconditions + DSL::postcondition(task); + + // Emit our reaction statistics if it wouldn't cause a loop + if (task.stats != nullptr) { + PowerPlant::powerplant->emit_shared(task.stats); + } + }; + + return task; } Function callback; diff --git a/src/util/TransientDataElements.hpp b/src/util/TransientDataElements.hpp index 65fab318..d1d0da5d 100644 --- a/src/util/TransientDataElements.hpp +++ b/src/util/TransientDataElements.hpp @@ -51,7 +51,7 @@ namespace util { }; template - struct TransientDataElements : ExtractTransient()))> {}; + struct TransientDataElements : ExtractTransient()))> {}; } // namespace util } // namespace NUClear diff --git a/tests/tests/dsl/CustomGet.cpp b/tests/tests/dsl/CustomGet.cpp index 6ddf1331..62e39c7f 100644 --- a/tests/tests/dsl/CustomGet.cpp +++ b/tests/tests/dsl/CustomGet.cpp @@ -33,7 +33,7 @@ std::vector events; // NOLINT(cppcoreguidelines-avoid-non-const-gl struct CustomGet : NUClear::dsl::operation::TypeBind { template - static std::shared_ptr get(const NUClear::threading::Reaction& /*unused*/) { + static std::shared_ptr get(const NUClear::threading::ReactionTask& /*task*/) { return std::make_shared("Data from a custom getter"); } }; diff --git a/tests/tests/dsl/Transient.cpp b/tests/tests/dsl/Transient.cpp index a04bc674..33240297 100644 --- a/tests/tests/dsl/Transient.cpp +++ b/tests/tests/dsl/Transient.cpp @@ -60,10 +60,10 @@ std::vector events; // NOLINT(cppcoreguidelines-avoid-non-const-gl struct TransientGetter : NUClear::dsl::operation::TypeBind { template - static TransientMessage get(NUClear::threading::Reaction& r) { + static TransientMessage get(NUClear::threading::ReactionTask& task) { // Get the real message and return it directly so transient can activate - auto raw = NUClear::dsl::operation::CacheGet::get(r); + auto raw = NUClear::dsl::operation::CacheGet::get(task); if (raw == nullptr) { return {}; }