Skip to content

Commit

Permalink
Replace Direct with Inline which can be controlled at both ends (#143)
Browse files Browse the repository at this point in the history
Emit DIRECT was sometimes hard to work with as the other end may not
want to be executed inline, or the other end may always want to be
executed inline.

This replaces DIRECT with INLINE which has DSL words at both ends.
This way a Log handler can run in a separate thread rather than
executing inline where the log line happens.

Or alternatively the on functions that interact with DSL bindings can
themselves specify that they should be executed inline in the current
thread.
  • Loading branch information
TrentHouliston authored Aug 30, 2024
1 parent 8a18e32 commit 5ff730b
Show file tree
Hide file tree
Showing 35 changed files with 551 additions and 99 deletions.
4 changes: 2 additions & 2 deletions docs/dsl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ Scope::LOCAL
````````````
.. doxygenstruct:: NUClear::dsl::word::emit::Local

Scope::DIRECT
Scope::INLINE
`````````````
.. doxygenstruct:: NUClear::dsl::word::emit::Direct
.. doxygenstruct:: NUClear::dsl::word::emit::Inline

Scope::Initialise
``````````````````
Expand Down
6 changes: 3 additions & 3 deletions docs/extension.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ passed in. It is important to note that the type will only be considered by NUCl
attributes need to be stored in the DSL word type template it and use static variables, see `Sync`.

There are DSL words that are not meant to be used directly but as a part of other words, see `CacheGet` and `TypeBind`.
`TypeBind` adds the reaction to the list of reactions to be run when a `Local` or `Direct` emit is called for the data
`TypeBind` adds the reaction to the list of reactions to be run when a `Local` or `Inline` emit is called for the data
type. `CacheGet` gets the last value from a thread-local cache (see `ThreadSore` below) this cache is usually populated
in the last a `Local` or `Direct` emit call for the data type.
in the last a `Local` or `Inline` emit call for the data type.

If the type you want to become a DSL extension word is not defined within your control specialise `DSLProxy<>` with the
type. Provide the template methods to the specialisation of `DSLProxy<>` as if it were the type.
Expand All @@ -56,7 +56,7 @@ destructor.
e.g. for the `IO` word we have
.. codeblock:: c++
reaction->unbinders.push_back([](const threading::Reaction& r) {
r.reactor.emit<emit::Direct>(std::make_unique<operation::Unbind<IO>>(r.id));
r.reactor.emit<emit::Inline>(std::make_unique<operation::Unbind<IO>>(r.id));
});

which will tell the extension reactor that this reaction no longer exists.
Expand Down
10 changes: 5 additions & 5 deletions docs/startup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ Data Emission Statements

As the system is single threaded at this time, the order in which reactors are installed can be significantly important.
This is pertinent when dealing with any data emissions during reactor construction which are NOT emitted under
:ref:`Scope::Initialise`. For example; data emission during the construction of a reactor using :ref:`Scope::DIRECT`,
:ref:`Scope::Initialise`. For example; data emission during the construction of a reactor using :ref:`Scope::INLINE`,
:ref:`Scope::UDP`, or :ref:`Scope::Network` will trigger any necessary activity to run inline. Should any reactions be
defined to run as a result of the emission, the task will be generated and also run inline. It is here where the order
in which reactors are installed becomes important. Suppose Reactor1 were to emit under :ref:`Scope::DIRECT`, and
in which reactors are installed becomes important. Suppose Reactor1 were to emit under :ref:`Scope::INLINE`, and
Reactor2 had a reaction defined to run on the associated datatype. In this case, the reaction defined by Reactor2 would
not run, as it was not yet defined at the time of data emission. However, should the roles be reserved, then the
reaction would run.
Expand Down Expand Up @@ -95,7 +95,7 @@ reaction would run.
to use :ref:`Scope::Initialise`. This will put a hold on the data emission, until the next step in the process
:ref:`Initialise Scope Tasks`, ensuring that any reactions subscribed to the emission will run.

Anything else?** Emissions during the construction of reactors using :ref:`Scope::DIRECT`, :ref:`Scope::UDP` and
Anything else?** Emissions during the construction of reactors using :ref:`Scope::INLINE`, :ref:`Scope::UDP` and
:ref:`Scope::Network` will trigger any reactions (which have already been defined - before the data emission) and
force any associated tasks to run inline.

Expand Down Expand Up @@ -164,7 +164,7 @@ Any on<:ref:`Shutdown`>() reaction requests will then be queued (in the order in
:ref:`Priority`::IDLE.

Note that during this phase, any other task which would normally be scheduled as a result of a non-direct emission will
be silently dropped, while any tasks which would occur as a result of a :ref:`Scope::DIRECT` emission will interrupt the
be silently dropped, while any tasks which would occur as a result of a :ref:`Scope::INLINE` emission will interrupt the
shutdown process and run as normal.

.. todo::
Expand All @@ -184,7 +184,7 @@ Emissions Scope Table
+==========================+=============================================================================================================================================================================================================================================================================================================================================+======================================================================================================================================================================================================================+=====================================================================================================================================================================================================================+
| :ref:`Scope::LOCAL` | Schedules any tasks for reactions which are currently loaded and bound to the emission data. Adds to the queue of tasks to start running when the system shifts to the :ref:`Execution Phase (multithreaded)` | Schedules any tasks for reactions which are bound to the emission data. Adds to the queue of tasks based on the desired :ref:`Priority` level | Any emissions under this scope while the system is in the shutdown phase are ignored. |
+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| :ref:`Scope::DIRECT` | Schedules any tasks for reactions which are currently loaded and bound to the emission data. Pauses the initialization phase, and runs the task in-line. The initialization phase continues upon task completion. | Schedules any tasks for reactions which are currently loaded and bound to the emission data. Pauses the task currently executing and runs the new task in-line. The execution phase continues upon task completion. | Schedules any tasks for reactions which are currently loaded and bound to the emission data. Pauses the task currently executing and runs the new task in-line. The shutdown phase continues upon task completion. |
| :ref:`Scope::INLINE` | Schedules any tasks for reactions which are currently loaded and bound to the emission data. Pauses the initialization phase, and runs the task in-line. The initialization phase continues upon task completion. | Schedules any tasks for reactions which are currently loaded and bound to the emission data. Pauses the task currently executing and runs the new task in-line. The execution phase continues upon task completion. | Schedules any tasks for reactions which are currently loaded and bound to the emission data. Pauses the task currently executing and runs the new task in-line. The shutdown phase continues upon task completion. |
+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| :ref:`Scope::Initialise` | Data emitted under this scope during this phase will wait until all reactors have been installed into the powerPlant before triggering any reactions. Any tasks generated as a result of this emission type are the first tasks to run when the powerPlant starts. This is the recommended emission type for this phase of system startup. | Any emissions under this scope while the system is in the execution phase are ignored. | Any emissions under this scope while the system is in the shutdown phase are ignored. |
+--------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Expand Down
16 changes: 8 additions & 8 deletions src/PowerPlant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "dsl/store/DataStore.hpp"
#include "dsl/word/Shutdown.hpp"
#include "dsl/word/Startup.hpp"
#include "dsl/word/emit/Direct.hpp"
#include "dsl/word/emit/Inline.hpp"
#include "extension/ChronoController.hpp"
#include "extension/IOController.hpp"
#include "extension/NetworkController.hpp"
Expand Down Expand Up @@ -80,9 +80,9 @@ PowerPlant::~PowerPlant() {

void PowerPlant::start() {

// Direct emit startup event and command line arguments
emit<dsl::word::emit::Direct>(std::make_unique<dsl::word::Startup>());
emit_shared<dsl::word::emit::Direct>(dsl::store::DataStore<message::CommandLineArguments>::get());
// Inline emit startup event and command line arguments
emit<dsl::word::emit::Inline>(std::make_unique<dsl::word::Startup>());
emit_shared<dsl::word::emit::Inline>(dsl::store::DataStore<message::CommandLineArguments>::get());

// Start all of the threads
scheduler.start();
Expand All @@ -98,16 +98,16 @@ void PowerPlant::remove_idle_task(const NUClear::id_t& id,
scheduler.remove_idle_task(id, pool_descriptor);
}

void PowerPlant::submit(std::unique_ptr<threading::ReactionTask>&& task, const bool& immediate) noexcept {
scheduler.submit(std::move(task), immediate);
void PowerPlant::submit(std::unique_ptr<threading::ReactionTask>&& task) noexcept {
scheduler.submit(std::move(task));
}

void PowerPlant::log(const LogLevel& level, std::string message) {
// Get the current task
const auto* current_task = threading::ReactionTask::get_current_task();

// Direct emit the log message so that any direct loggers can use it
emit<dsl::word::emit::Direct>(std::make_unique<message::LogMessage>(
// Inline emit the log message to default handlers to pause the current task until the log message is processed
emit<dsl::word::emit::Inline>(std::make_unique<message::LogMessage>(
level,
current_task != nullptr ? current_task->parent->reactor.log_level : LogLevel::UNKNOWN,
std::move(message),
Expand Down
3 changes: 1 addition & 2 deletions src/PowerPlant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ class PowerPlant {
* Submits a new task to the ThreadPool to be queued and then executed.
*
* @param task The Reaction task to be executed in the thread pool
* @param immediate If this task should run immediately in the current thread
*/
void submit(std::unique_ptr<threading::ReactionTask>&& task, const bool& immediate = false) noexcept;
void submit(std::unique_ptr<threading::ReactionTask>&& task) noexcept;

/**
* Log a message through NUClear's system.
Expand Down
14 changes: 10 additions & 4 deletions src/Reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ namespace dsl {

struct NetworkSource;

struct Inline;

template <typename>
struct Trigger;

Expand Down Expand Up @@ -112,7 +114,7 @@ namespace dsl {
template <typename T>
struct Local;
template <typename T>
struct Direct;
struct Inline;
template <typename T>
struct Delay;
template <typename T>
Expand Down Expand Up @@ -233,6 +235,9 @@ class Reactor {
/// @copydoc dsl::word::Network
using NetworkSource = dsl::word::NetworkSource;

/// @copydoc dsl::word::Inline
using Inline = dsl::word::Inline;

/// @copydoc dsl::word::Shutdown
using Shutdown = dsl::word::Shutdown;

Expand Down Expand Up @@ -281,9 +286,9 @@ class Reactor {
template <typename T>
using LOCAL = dsl::word::emit::Local<T>;

/// @copydoc dsl::word::emit::Direct
/// @copydoc dsl::word::emit::Inline
template <typename T>
using DIRECT = dsl::word::emit::Direct<T>;
using INLINE = dsl::word::emit::Inline<T>;

/// @copydoc dsl::word::emit::Delay
template <typename T>
Expand Down Expand Up @@ -456,6 +461,7 @@ class Reactor {
#include "dsl/word/Group.hpp"
#include "dsl/word/IO.hpp"
#include "dsl/word/Idle.hpp"
#include "dsl/word/Inline.hpp"
#include "dsl/word/Last.hpp"
#include "dsl/word/MainThread.hpp"
#include "dsl/word/Network.hpp"
Expand All @@ -473,8 +479,8 @@ class Reactor {
#include "dsl/word/Watchdog.hpp"
#include "dsl/word/With.hpp"
#include "dsl/word/emit/Delay.hpp"
#include "dsl/word/emit/Direct.hpp"
#include "dsl/word/emit/Initialise.hpp"
#include "dsl/word/emit/Inline.hpp"
#include "dsl/word/emit/Local.hpp"
#include "dsl/word/emit/Network.hpp"
#include "dsl/word/emit/UDP.hpp"
Expand Down
4 changes: 3 additions & 1 deletion src/dsl/Fusion.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "fusion/BindFusion.hpp"
#include "fusion/GetFusion.hpp"
#include "fusion/GroupFusion.hpp"
#include "fusion/InlineFusion.hpp"
#include "fusion/PoolFusion.hpp"
#include "fusion/PostconditionFusion.hpp"
#include "fusion/PreconditionFusion.hpp"
Expand All @@ -40,9 +41,10 @@ namespace dsl {
struct Fusion
: fusion::BindFusion<Words...>
, fusion::GetFusion<Words...>
, fusion::GroupFusion<Words...>
, fusion::InlineFusion<Words...>
, fusion::PreconditionFusion<Words...>
, fusion::PriorityFusion<Words...>
, fusion::GroupFusion<Words...>
, fusion::PoolFusion<Words...>
, fusion::PostconditionFusion<Words...> {};

Expand Down
15 changes: 10 additions & 5 deletions src/dsl/Parse.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ namespace dsl {
task);
}

static std::set<std::shared_ptr<const util::GroupDescriptor>> group(threading::ReactionTask& task) {
return std::conditional_t<fusion::has_group<DSL>::value, DSL, fusion::NoOp>::template group<
Parse<Sentence...>>(task);
}

static util::Inline run_inline(threading::ReactionTask& task) {
return std::conditional_t<fusion::has_run_inline<DSL>::value, DSL, fusion::NoOp>::template run_inline<
Parse<Sentence...>>(task);
}

static bool precondition(threading::ReactionTask& task) {
return std::conditional_t<fusion::has_precondition<DSL>::value, DSL, fusion::NoOp>::template precondition<
Parse<Sentence...>>(task);
Expand All @@ -57,11 +67,6 @@ namespace dsl {
Parse<Sentence...>>(task);
}

static std::set<std::shared_ptr<const util::GroupDescriptor>> group(threading::ReactionTask& task) {
return std::conditional_t<fusion::has_group<DSL>::value, DSL, fusion::NoOp>::template group<
Parse<Sentence...>>(task);
}

static std::shared_ptr<const util::ThreadPoolDescriptor> pool(threading::ReactionTask& task) {
return std::conditional_t<fusion::has_pool<DSL>::value, DSL, fusion::NoOp>::template pool<
Parse<Sentence...>>(task);
Expand Down
79 changes: 79 additions & 0 deletions src/dsl/fusion/InlineFusion.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* MIT License
*
* Copyright (c) 2014 NUClear Contributors
*
* This file is part of the NUClear codebase.
* See https://github.com/Fastcode/NUClear for further info.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#ifndef NUCLEAR_DSL_FUSION_INLINABLE_FUSION_HPP
#define NUCLEAR_DSL_FUSION_INLINABLE_FUSION_HPP

#include "../../threading/ReactionTask.hpp"
#include "../../util/Inline.hpp"
#include "../operation/DSLProxy.hpp"
#include "FindWords.hpp"
#include "has_run_inline.hpp"

namespace NUClear {
namespace dsl {
namespace fusion {

// Default case where there are no Inline words
template <typename Words>
struct InlineFuser {};

// Case where there is only a single word remaining
template <typename Word>
struct InlineFuser<std::tuple<Word>> {

template <typename DSL>
static util::Inline run_inline(threading::ReactionTask& task) {

// Run our remaining run_inline
return Word::template run_inline<DSL>(task);
}
};

// Case where there is more 2 more more words remaining
template <typename Word1, typename Word2, typename... WordN>
struct InlineFuser<std::tuple<Word1, Word2, WordN...>> {

template <typename DSL>
static util::Inline run_inline(threading::ReactionTask& task) {
auto a = Word1::template run_inline<DSL>(task);
auto b = InlineFuser<std::tuple<Word2, WordN...>>::template run_inline<DSL>(task);

// Must agree or make a choice
if ((a == util::Inline::ALWAYS && b == util::Inline::NEVER)
|| (a == util::Inline::NEVER && b == util::Inline::ALWAYS)) {
throw std::logic_error("Cannot both always and never inline a reaction");
}

// Otherwise return the one that made a choice
return a == util::Inline::NEUTRAL ? b : a;
}
};

template <typename Word1, typename... WordN>
struct InlineFusion : InlineFuser<FindWords<has_run_inline, Word1, WordN...>> {};

} // namespace fusion
} // namespace dsl
} // namespace NUClear

#endif // NUCLEAR_DSL_FUSION_INLINABLE_FUSION_HPP
Loading

0 comments on commit 5ff730b

Please sign in to comment.