diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 5e311df1f..ae87e0f63 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -110,8 +110,10 @@ jobs: working-directory: ${{github.workspace}}/build shell: bash run: | - cmake -S $GITHUB_WORKSPACE/doc/dox/dev/devsamp/main -B test_install_devsamp -DCMAKE_PREFIX_PATH=${{github.workspace}}/install || (cat test_install_devsamp/CMakeFiles/CMakeOutput.log && cat test_install_devsamp/CMakeFiles/CMakeError.log) - cmake --build test_install_devsamp + cmake -S $GITHUB_WORKSPACE/doc/dox/dev/devsamp/main -B test_install_devsamp_main -DCMAKE_PREFIX_PATH=${{github.workspace}}/install || (cat test_install_devsamp_main/CMakeFiles/CMakeOutput.log && cat test_install_devsamp_main/CMakeFiles/CMakeError.log) + cmake --build test_install_devsamp_main + cmake -S $GITHUB_WORKSPACE/doc/dox/dev/devsamp/fibonacci -B test_install_devsamp_fibonacci -DCMAKE_PREFIX_PATH=${{github.workspace}}/install || (cat test_install_devsamp_fibonacci/CMakeFiles/CMakeOutput.log && cat test_install_devsamp_fibonacci/CMakeFiles/CMakeError.log) + cmake --build test_install_devsamp_fibonacci cmake -E make_directory test_install_userexamples cat > test_install_userexamples/CMakeLists.txt < @@ -38,7 +43,8 @@ int main(int argc, char *argv[]) { ``` `CMakeLists.txt` -~~~~~~~~~~~~~{.cmake} + +```cmake cmake_minimum_required(VERSION 3.19) project(TTG-HW CXX) @@ -52,10 +58,11 @@ endif() add_executable(hw-parsec helloworld.cpp) target_link_libraries(hw-parsec PRIVATE ttg-parsec) target_compile_definitions(hw-parsec PRIVATE TTG_USE_PARSEC=1) -~~~~~~~~~~~~~ +``` Configure + build: -```shell + +```sh > cmake -S . -B build && cmake --build build --target hw-parsec ``` @@ -70,6 +77,7 @@ The basic model of computation is built around a Template Task Graph (TTG). A TT Thus, task creation is a byproduct of messages traveling through one or more TTGs. What makes the model powerful is the ability to encode large DAGs of tasks compactly. Before proceeding further, let's refine the few concepts used to define the programming model above: + - `TaskId` (aka `Key`): A unique identifier for each task. It must be _perfectly_ hashable. - `Terminal`: A port for receiving (input) and sending (output) messages. Each message consists of a (potentially void) `TaskId` and an (optional) datum. Terminals are strongly-typed. An {in,out}put terminal can be connected to one or more {out,in}put terminal (as long as the `TaskId` and datum types match). Input terminals are programmable (e.g., incoming messages can be optionally reduced). - `TemplateTask` (aka `TT`): This is a _template_ for creating tasks. Task template creates a task associated with a given `TaskId` when every input terminal received messages for the given `TaskId`. @@ -80,6 +88,7 @@ Due to its simplicity only template tasks appear in the "Hello, World!" program. ## Structure of a Minimal TTG Program Every TTG program must: + - select the TTG backend, - initialize the TTG runtime, - construct a TTG by declaring its constituent nodes, @@ -92,25 +101,31 @@ Let's go over each of these steps using the "Hello, World!" example. TTG C++ implementation is currently supported by 2 backends providing task scheduling, data transfer, and resource management. While it is possible to use specific TTG backend explicitly, by using the appropriate namespaces, it is recommended to write backend-neutral programs that can be specialized to a particular backend as follows. -1. By defining one (and only one) of the following macros, via the command-line argument to the compiler (recommended) or as an explicit `#define` statement in the source code: - - `TTG_USE_PARSEC`: selects the PaRSEC backend as the default; - - `TTG_USE_MADNESS`: selects the MADNESS backend as the default (expert-use only). - Following the definition of this macro it is safe to include the top-level TTG header file: -```cpp + 1. By defining one (and only one) of the following macros, via the command-line argument to the compiler (recommended) or as an explicit `#define` statement in the source code: + - `TTG_USE_PARSEC`: selects the PaRSEC backend as the default; + - `TTG_USE_MADNESS`: selects the MADNESS backend as the default (expert-use only). + + Following the definition of this macro it is safe to include the top-level TTG header file: + + ```cpp #include -``` -2. By including the corresponding backend-specific header directly: + ``` + + 2. By including the corresponding backend-specific header directly: - to use PaRSEC backend only, add: -```cpp + + ```cpp #include -``` + ``` + - to use the MADNESS backend only, add: -```cpp + + ```cpp #include -``` + ``` - This approach does not require inclusion of the top-level TTG header or definition of a backend selection macro. + This approach does not require inclusion of the top-level TTG header or definition of a backend selection macro. ### Initialize @@ -121,6 +136,7 @@ To initialize TTG runtime invoke `ttg::initialize(argc, argv)`; there are severa To make a TTG create and connect one or more TTs. The simplest TTG consists of a single TT. The "Hello, World!" example contains a single TT that executes a single task (hence, task ID can be omitted, i.e., void) that does not take and produce any data. The easiest way to make such a TT is by wrapping a callable (e.g., a lambda) with `ttg::make_tt`: + ```cpp auto tt = ttg::make_tt([]() { std::cout << "Hello, World!"; }); ``` @@ -128,22 +144,26 @@ The "Hello, World!" example contains a single TT that executes a single task (he ## Execute TTG To execute a TTG we must make it executable (this will declare the TTG complete). To execute the TTG its root TT must receive at least one message; since in this case the task does not receive either task ID or data the message is empty (i.e., void): + ```cpp ttg::make_graph_executable(tt); ttg::execute(); if (ttg::get_default_world().rank() == 0) tt->invoke(); ``` + Note that we must ensure that only one such message must be generated. Since TTG execution uses the Single Program Multiple Data (SPMD) model, when launching the TTG program as multiple processes only the first process (rank) gets to send the message. ## Finalize TTG Since TTG program is executed asynchronously, we must ensure that all tasks are finished: + ```cpp ttg::fence(); ``` Before exiting `main()` the TTG runtime should be finalized: + ```cpp ttg::finalize(); ``` @@ -163,9 +183,10 @@ Although the example lacks opportunity for parallelism, the point here is not p This example illustrates how to compute a particular element of the Fibonacci sequence defined by recurrence -. +$F_N = F_{N-1} + F_{N-2}, F_0=0, F_1=1$. `nth-fibonacci.cpp` + ```cpp #include @@ -203,28 +224,233 @@ int main(int argc, char *argv[]) { The TTG consists of 2 TTs, one (`fib`) that implements the Fibonacci recurrence and another (`print`) that prints the result to `std::cout`: -- `fib` computes from and - and either sends and to the next (`n+1`) - instance of `fib`, or, if `n==N`, sends to `print`. Thus `fib` - needs 2 input terminals and 3 output terminals (for better efficiency instead of - sending individual Fibonacci numbers, each over an individual edge, it is better to send - a pair of Fibonacci numbers over a single edge). -- `print` receives a single unannotated datum and produces no data, so it needs a single input terminal and no output terminals. - -Execution of the program starts by explicitly instantiating `fib` for `n=2`. -In total 20 tasks will be executed: 19 instances of `fib` with `n=2..20` and the single instance of `print`. - -Note that unlike typical task-based implementations in the literature which construct tasks _recursively_, -i.e., the task for -computing -is created before the task computing , -the TTG implementation constructs the tasks in the order of increasing `n`. This is because -parametric dataflow of TTG naturally expresses inductive (push) computation patterns rather than -recursive (pull) computation patterns. However, it is easy to implement proper recursion by -separating the downward flow of control (task creation, -) + + - `fib` computes $F_{n}$ from $F_{n-1}$ and $F_{n-2}$ and either sends $F_{n}$ and $F_{n-1}$ to the next ($n+1$) + instance of `fib`, or, if $n=N$, sends $F_{n}$ to `print`. Thus `fib` + needs 2 input terminals and 3 output terminals (for better efficiency instead of sending individual Fibonacci numbers, each over an individual edge, it is better to send a pair of Fibonacci numbers over a single edge). + - `print` receives a single unannotated datum and produces no data, so it needs a single input terminal and no output terminals. + +Execution of the program starts by explicitly instantiating `fib` for $n=2$. +In total 20 tasks will be executed: 19 instances of `fib` with $n=2\dots20$ and the single instance of `print`. + +Note that unlike typical task-based implementations in the literature which construct tasks _recursively_, i.e., the task for +computing $F_{n}$ is created before the task computing $F_{n-1}$, the TTG implementation constructs the tasks in the order of increasing $n$. This is because parametric dataflow of TTG naturally expresses inductive (push) computation patterns rather than recursive (pull) computation patterns. However, it is easy to implement proper recursion by separating the downward flow of control (task creation, $F_{n} \to F_{n-1},F_{n-2}$) from the upward flow of data (task evaluation, -). +$F_{n-1},F_{n-2} \to F_{n}$). + +## Data-Dependent Example : Largest Fibonacci Number < N + +To illustrate the real power of TTG let's tweak the problem slightly: instead of computing first $N$ Fibonacci numbers let's find the largest Fibonacci number smaller than some $N$. The key difference in the latter case is that, unlike the former, the number of tasks is NOT known a priori; furthermore, to make a decision whether we need to compute next Fibonacci number we must examine the value returned by the previous task. This is an example of data-dependent tasking, where the decision which (if any) task to execute next depends on the values produced by previous tasks. The ability to compose regular as well as data-dependent task graphs is a distinguishing strength of TTG. + +To make things even more interesting, we will demonstrate how to implement such program both for execution on CPUs as well as on accelerators (GPUs). + +### The CPU Version + +```cpp +#include +#include "ttg/serialization.h" + +/// N.B. contains values of F_n and F_{n-1} +struct Fn { + int64_t F[2]; // F[0] = F_n, F[1] = F_{n-1} + Fn() { F[0] = 1; F[1] = 0; } + template + void serialize(Archive& ar) { + ar & F; + } + template + void serialize(Archive& ar, const unsigned int) { + ar & F; + } +}; + +auto make_ttg_fib_lt(const int64_t) { + ttg::Edge f2f; + ttg::Edge f2p; + + auto fib = ttg::make_tt( + [=](int64_t n, Fn&& f_n) { + int64_t next_f_n = f_n.F[0] + f_n.F[1]; + f_n.F[1] = f_n.F[0]; + f_n.F[0] = next_f_n; + if (next_f_n < F_n_max) { + ttg::send<0>(n + 1, f_n); + } else { + ttg::send<1>(n, f_n); + } + }, + ttg::edges(f2f), ttg::edges(f2f, f2p), "fib"); + + auto print = ttg::make_tt( + [=](Fn&& f_n) { + std::cout << "The largest Fibonacci number smaller than " << F_n_max << " is " << f_n.F[1] << std::endl; + }, + ttg::edges(f2p), ttg::edges(), "print"); + auto ins = std::make_tuple(fib->template in<0>()); + std::vector> ops; + ops.emplace_back(std::move(fib)); + ops.emplace_back(std::move(print)); + return make_ttg(std::move(ops), ins, std::make_tuple(), "Fib_n < N"); +} + +int main(int argc, char* argv[]) { + ttg::initialize(argc, argv, -1); + int64_t N = 1000; + if (argc > 1) N = std::atol(argv[1]); + + auto fib = make_ttg_fib_lt(N); + ttg::make_graph_executable(fib.get()); + if (ttg::default_execution_context().rank() == 0) + fib->template in<0>()->send(1, Fn{});; + + ttg::execute(); + ttg::fence(); + + ttg::finalize(); + return 0; +} +``` + +[//]: # (TODO: walk through the example, key things to emphasize:) + +[//]: # (- `Fn` aggregates 2 pieces of data that were separate before in preparation for aggregating datums into single continguous chunks that can be allocated on GPU more efficiently) + +[//]: # (- `make_ttg_fib_lt` creates a TTG composed of multiple TTs, whereas before we had disparate TTs connected to each other (i.e. there was no explicit graph object). This allows to support composition of multiple TTGs together, as described in Herault et al DOI 10.1109/PAW-ATM56565.2022.00008) + +#### Utility of `Fn` struct +`Fn` aggregates 2 pieces of data that were separate before in preparation for aggregating datums into single continguous chunks that can be allocated on GPU more efficiently.This arrangement allows each task to access and modify both current and previous Fibonacci values without the need for separate data fields or additional communication overhead. + +- `F[0]` and `F[1]` store the current ($F_n$) and previous ($F_{n-1}$) Fibonacci numbers, respectively. +- The default constructor starts the iteration by initializing `F[0]=1` and `F[1]=0`. + +Because `Fn` is now a user-defined type, for TTG to be able to copy/move it between tasks it needs to know how to serialize and deseralize it. + functions are useful to communicate the struct among the tasks. TTG leverages these functions to serialize and deserialize the data as it is sent and received through the task graph. + +#### Why `make_ttg_fib_lt`? + +Until now we have constructed individual TTs and linked them together; i.e., TTGs until now was implicit. Function `make_ttg_fib_lt` instead explicitly creates a graph of TTs (a TTG). This seemingly small step helps improve composability by allowing to use entire TTGs as a component of other graphs by stitching it with TTs or TTGs together. + +[//]: ![Fibonacci_TTG_example](doc/images/fibonacci_ttg.png) + +### Device Version + +It is currently not possible to have a general-purpose task runtime execute purely on device, hence TTG and the underlying runtimes execute tasks on the host (CPU), and these tasks launch device _kernels_. For technical reasons it is necessary to split the code into the host-only part, which looks remarkably like the CPU-only version above, and the device-specific part that implements the core part of the computation on the device. In the future it _may_ become possible to have single-source programs that contain both host and device parts contain in the same source file. + +#### Host-side Code + +The host-only part is completely independent of the type of the device programming model. + +```cpp +struct Fn : public ttg::TTValue { + std::unique_ptr F; // F[0] = F_n, F[1] = F_{n-1} + ttg::Buffer b; + Fn() : F(std::make_unique(2)), b(F.get(), 2) { F[0] = 1; F[1] = 0; } + Fn(const Fn&) = delete; + Fn(Fn&& other) = default; + Fn& operator=(const Fn& other) = delete; + Fn& operator=(Fn&& other) = default; + template + void serialize(Archive& ar) { + ttg::ttg_abort(); + } + template + void serialize(Archive& ar, const unsigned int) { + ttg::ttg_abort(); + } +}; + +auto make_ttg_fib_lt(const int64_t F_n_max = 1000) { + ttg::Edge f2f; + ttg::Edge f2p; + + auto fib = ttg::make_tt( + [=](int64_t n, Fn&& f_n) -> ttg::device::Task { + assert(n > 0); + ttg::trace("in fib: n=", n, " F_n=", f_n.F[0]); + + co_await ttg::device::select(f_n.b); + + next_value(f_n.b.current_device_ptr()); + + // wait for the task to complete and the values to be brought back to the host + co_await ttg::device::wait(f_n.b); + + if (f_n.F[0] < F_n_max) { + co_await ttg::device::forward(ttg::device::send<0>(n + 1, std::move(f_n))); + } else { + co_await ttg::device::forward(ttg::device::sendv<1>(std::move(f_n))); + } + }, + ttg::edges(f2f), ttg::edges(f2f, f2p), "fib"); + auto print = ttg::make_tt( + [=](Fn&& f_n) { + std::cout << "The largest Fibonacci number smaller than " << F_n_max << " is " << f_n.F[1] << std::endl; + }, + ttg::edges(f2p), ttg::edges(), "print"); + + auto ins = std::make_tuple(fib->template in<0>()); + std::vector> ops; + ops.emplace_back(std::move(fib)); + ops.emplace_back(std::move(print)); + return make_ttg(std::move(ops), ins, std::make_tuple(), "Fib_n < N"); +} +``` + +Although the structure of the device-capable program is nearly identical to the CPU version, there are important differences: + + - `Fn`'s data must exist on the host side (where the task is executed). To automate moving of the data between host and device memories `Fn` is implemented with the help of helper classes `TTValue` and `Buffer`. + - task functions become _coroutines_ (as indicated by their return type `device::Task`) to deal with the asynchrony of the host-device interactions (kernel launch, memory allocation and transfers) + - the target execution space is specified as a template argument of type `ExecutionSpace` to `make_tt` + +##### `TTValue` + +For optimal performance low-level runtime that manages the data motion across the memory hierarchy (host-to-host (i.e., between MPI ranks), host-to-device, and device-to-device) must be able to _track_ each datum as it orchestrates the computation. For example, when a TTG task `send`'s a datum to an output terminal connected to multiple consumers the runtime may avoid unnecessary copies, e.g. by recognizing that all consumers will only need read-only access to the data, hence reference to the same datum can be passed to all consumers. This requires being able to map pointer to a C++ object to the control block that describes that object to the runtime. Deriving C++ type `T` from `TTValue` makes it possible to track objects `T` by embedding the control block into each object. This is particularly important for the data that has to travel to the device. + +##### `Buffer` +`Buffer` is a view of a contiguous sequence of objects of type `T` in the host memory that can be automatically moved by the runtime to/from the device memory. Here `Fn::b` is a view of the 2-element sequence pointed to by `Fn::F`; once it's constructed the content of `Fn::F` will be moved to/from the device by the runtime. The subsequent actions of `Fn::b` cause the automatic transfers of data to (`device::select(f_n.b)`) and from (`ttg::device::wait(f_n.b)`) the device. + +##### `device::Task` + +The key challenge of device programming models is that they are fundamentally _asynchronous_ to hide the large latency of interacting with the device. Kernel launches, unlike function calls on CPU, take 1000s of CPU cycles to occur, and the asynchrony helps amortize these costs by overlapping kernels launch and execution. Task programming models are a seemingly good match for device programming, but the key challenge is how to make device-capable task code look most like standard host-only task code. TTG ability to use _C++ coroutines_ as task bodies allows it to deal with asynchronous calls inside the tasks (the use of coroutines is the primary reason why TTG requires C++20 support by the C++ compiler). Roughly speaking, coroutines are resumable functions; they can return to the caller via a `co_await` statement and resumed at that point once some condition (typically, completion of submitted actions) has been satisdied. Device tasks `co_await` at every point where further progress requires completion of preceding device tasks: + + - First `co_await` ensures that contents of `f_n.F[]` are available on the device. During the first invocation the data resides on the host, hence this allocates memory on the device and transfers the contents of `f_n.F[]` from host to device. During subsequent invocations the contents of `f_n.F[]` are likely already available on the device (unless the runtime decides to compute $F_{n+1}$ on a different device than $F_n$), thus this `co_await` may become a no-op. + - Second `co_await` ensures that the kernel launched by `next_value` has completed and the contents of `f_n.F[]` changed by that kernel are available on the host. This always causes device-to-host transfer. + - Third set of `co_await`'s ensures that the corresponding `device::send`, which sends the data located in the device memory, has completed. Since `device::send` within a task will typically return a local variable exit from coroutine would destroy such variables prematurely, hence instead of a `co_return` the coroutine concludes by waiting for the `device::send` to complete before exiting. + +##### `ExecutionSpace` + +TTG and its underlying runtime needs to be told in which _execution space_ the task code will operate. The current choices are denoted by the `ExecutionSpace` enumeration: + +- `ExecutionSpace::Host`: host processor (default) +- `ExecutionSpace::CUDA`: an NVIDIA CUDA device +- `ExecutionSpace::HIP`: an AMD HIP device +- `ExecutionSpace::L0`: an Intel L0 device + + + +[//]: # (Walk through the key differences ... potentially we could show both side by side ... not sure how to do that in Markdown though ...) + +#### Device Kernel + +Here's the CUDA version of the device kernel and its host-side wrapper; ROCm and SYCL/Level0 variants will be very similar to the CUDA version: + +```cpp +#include "fibonacci_cuda_kernel.h" +#ifdef TTG_HAVE_CUDA + __global__ void cu_next_value(int64_t* fn_and_fnm1) { + int64_t fnp1 = fn_and_fnm1[0] + fn_and_fnm1[1]; + fn_and_fnm1[1] = fn_and_fnm1[0]; + fn_and_fnm1[0] = fnp1; + } + void next_value(int64_t* fn_and_fnm1) { + cu_next_value<<<1, 1>>>(fn_and_fnm1); + } +#endif // TTG_HAVE_CUDA +``` + +`cu_next_value` is the device kernel that evaluates $F_{n+1}$ from $F_{n}$ and $F_{n-1}$. `next_value` is a host function that launches `cu_next_value`; this is the function called in the `fib` task. + +The complete example, including the CMake build harness, can be found in [dox examples](https://github.com/TESSEorg/ttg/tree/master/doc/dox/dev/devsamp/fibonacci). ## Debugging TTG Programs @@ -290,7 +516,9 @@ For example, executing the Fibonacci program described above using 2 MPI process ![Fibonacci_traces_example](doc/images/nth-fib-trace-2proc-2thr.png) # TTG reference documentation -TTG API documentation is available for the following versions:0 + +TTG API documentation is available for the following versions: + - [master branch](https://tesseorg.github.io/ttg/dox-master) . # Cite @@ -301,5 +529,6 @@ When referring to TTG in an academic setting please cite the following publicati # Acknowledgment The development of TTG was made possible by: + - [The EPEXA project](https://tesseorg.github.io/), currently supported by the National Science Foundation under grants [1931387](https://www.nsf.gov/awardsearch/showAward?AWD_ID=1931387) at Stony Brook University, [1931347](https://www.nsf.gov/awardsearch/showAward?AWD_ID=1931347) at Virginia Tech, and [1931384](https://www.nsf.gov/awardsearch/showAward?AWD_ID=1931384) at the University of Tennesse, Knoxville. - The TESSE project, supported by the National Science Foundation under grants [1450344](https://www.nsf.gov/awardsearch/showAward?AWD_ID=1450344) at Stony Brook University, [1450262](https://www.nsf.gov/awardsearch/showAward?AWD_ID=1450262) at Virginia Tech, and [1450300](https://www.nsf.gov/awardsearch/showAward?AWD_ID=1450300) at the University of Tennesse, Knoxville. diff --git a/doc/dox/dev/devsamp/fibonacci/CMakeLists.txt b/doc/dox/dev/devsamp/fibonacci/CMakeLists.txt new file mode 100644 index 000000000..e5058cb43 --- /dev/null +++ b/doc/dox/dev/devsamp/fibonacci/CMakeLists.txt @@ -0,0 +1,14 @@ +cmake_minimum_required(VERSION 3.14) +project(ttg-devsample-fibonacci) + +find_package(ttg REQUIRED) + +add_ttg_executable(fibonacci fibonacci.cc NOT_EXCLUDE_FROM_ALL) +# Fib device test +if (TTG_HAVE_CUDA) + add_ttg_executable(fibonacci_cuda + fibonacci_device.cc + fibonacci_cuda_kernel.h + fibonacci_cuda_kernel.cu + LINK_LIBRARIES std::coroutine RUNTIMES "parsec" NOT_EXCLUDE_FROM_ALL) +endif() \ No newline at end of file diff --git a/doc/dox/dev/devsamp/fibonacci/fibonacci.cc b/doc/dox/dev/devsamp/fibonacci/fibonacci.cc new file mode 100644 index 000000000..d2d829c45 --- /dev/null +++ b/doc/dox/dev/devsamp/fibonacci/fibonacci.cc @@ -0,0 +1,60 @@ +#include +#include "ttg/serialization.h" + +/// N.B. contains values of F_n and F_{n-1} +struct Fn { + int64_t F[2]; // F[0] = F_n, F[1] = F_{n-1} + Fn() { F[0] = 1; F[1] = 0; } + template + void serialize(Archive& ar) { + ar & F; + } + template + void serialize(Archive& ar, const unsigned int) { + ar & F; + } +}; +auto make_ttg_fib_lt(const int64_t F_n_max = 1000) { + ttg::Edge f2f; + ttg::Edge f2p; + + auto fib = ttg::make_tt( + [=](int64_t n, Fn&& f_n) { + int64_t next_f_n = f_n.F[0] + f_n.F[1]; + f_n.F[1] = f_n.F[0]; + f_n.F[0] = next_f_n; + if (next_f_n < F_n_max) { + ttg::send<0>(n + 1, f_n); + } else { + ttg::sendv<1>(f_n); + } + }, + ttg::edges(f2f), ttg::edges(f2f, f2p), "fib"); + + auto print = ttg::make_tt( + [=](Fn&& f_n) { + std::cout << "The largest Fibonacci number smaller than " << F_n_max << " is " << f_n.F[1] << std::endl; + }, + ttg::edges(f2p), ttg::edges(), "print"); + auto ins = std::make_tuple(fib->template in<0>()); + std::vector> ops; + ops.emplace_back(std::move(fib)); + ops.emplace_back(std::move(print)); + return make_ttg(std::move(ops), ins, std::make_tuple(), "Fib_n < N"); +} + +int main(int argc, char* argv[]) { + ttg::initialize(argc, argv, -1); + int64_t N = (argc > 1) ? std::atol(argv[1]) : 1000; + + auto fib = make_ttg_fib_lt(N); + ttg::make_graph_executable(fib.get()); + if (ttg::default_execution_context().rank() == 0) + fib->template in<0>()->send(1, Fn{});; + + ttg::execute(); + ttg::fence(); + + ttg::finalize(); + return 0; +} \ No newline at end of file diff --git a/doc/dox/dev/devsamp/fibonacci/fibonacci_cuda_kernel.cu b/doc/dox/dev/devsamp/fibonacci/fibonacci_cuda_kernel.cu new file mode 100644 index 000000000..6fa316468 --- /dev/null +++ b/doc/dox/dev/devsamp/fibonacci/fibonacci_cuda_kernel.cu @@ -0,0 +1,15 @@ +#include "fibonacci_cuda_kernel.h" + +#ifdef TTG_HAVE_CUDA + +__global__ void cu_next_value(int64_t* fn_and_fnm1) { + int64_t fnp1 = fn_and_fnm1[0] + fn_and_fnm1[1]; + fn_and_fnm1[1] = fn_and_fnm1[0]; + fn_and_fnm1[0] = fnp1; +} + +void next_value(int64_t* fn_and_fnm1) { + cu_next_value<<<1, 1>>>(fn_and_fnm1); +} + +#endif // TTG_HAVE_CUDA diff --git a/doc/dox/dev/devsamp/fibonacci/fibonacci_cuda_kernel.h b/doc/dox/dev/devsamp/fibonacci/fibonacci_cuda_kernel.h new file mode 100644 index 000000000..a096ec3f1 --- /dev/null +++ b/doc/dox/dev/devsamp/fibonacci/fibonacci_cuda_kernel.h @@ -0,0 +1,4 @@ +#include "ttg/config.h" +#include + +void next_value(int64_t* fn_and_fnm1); \ No newline at end of file diff --git a/doc/dox/dev/devsamp/fibonacci/fibonacci_device.cc b/doc/dox/dev/devsamp/fibonacci/fibonacci_device.cc new file mode 100644 index 000000000..a1603cb58 --- /dev/null +++ b/doc/dox/dev/devsamp/fibonacci/fibonacci_device.cc @@ -0,0 +1,88 @@ +#include + +#if defined(TTG_HAVE_CUDA) +#define ES ttg::ExecutionSpace::CUDA +#include "cuda_runtime.h" +#include "fibonacci_cuda_kernel.h" +#else +#error " CUDA is required to build this test!" +#endif + +#include "ttg/serialization.h" + +const int64_t F_n_max = 1000; +/// N.B. contains values of F_n and F_{n-1} +struct Fn : public ttg::TTValue { + std::unique_ptr F; // F[0] = F_n, F[1] = F_{n-1} + ttg::Buffer b; + + Fn() : F(std::make_unique(2)), b(F.get(), 2) { F[0] = 1; F[1] = 0; } + + Fn(const Fn&) = delete; + Fn(Fn&& other) = default; + Fn& operator=(const Fn& other) = delete; + Fn& operator=(Fn&& other) = default; + + template + void serialize(Archive& ar) { + ttg::ttg_abort(); + } + template + void serialize(Archive& ar, const unsigned int) { + ttg::ttg_abort(); + } +}; + +auto make_ttg_fib_lt(const int64_t F_n_max = 1000) { + ttg::Edge f2f; + ttg::Edge f2p; + + auto fib = ttg::make_tt( + [=](int64_t n, Fn&& f_n) -> ttg::device::Task { + assert(n > 0); + ttg::trace("in fib: n=", n, " F_n=", f_n.F[0]); + + co_await ttg::device::select(f_n.b); + + next_value(f_n.b.current_device_ptr()); + + // wait for the task to complete and the values to be brought back to the host + co_await ttg::device::wait(f_n.b); + + if (f_n.F[0] < F_n_max) { + co_await ttg::device::forward(ttg::device::send<0>(n + 1, std::move(f_n))); + } else { + co_await ttg::device::forward(ttg::device::sendv<1>(std::move(f_n))); + } + }, + ttg::edges(f2f), ttg::edges(f2f, f2p), "fib"); + auto print = ttg::make_tt( + [=](Fn&& f_n) { + std::cout << "The largest Fibonacci number smaller than " << F_n_max << " is " << f_n.F[1] << std::endl; + }, + ttg::edges(f2p), ttg::edges(), "print"); + + auto ins = std::make_tuple(fib->template in<0>()); + std::vector> ops; + ops.emplace_back(std::move(fib)); + ops.emplace_back(std::move(print)); + return make_ttg(std::move(ops), ins, std::make_tuple(), "Fib_n < N"); +} + +int main(int argc, char* argv[]) { + ttg::initialize(argc, argv, -1); + ttg::trace_on(); + int64_t N = 1000; + if (argc > 1) N = std::atol(argv[1]); + auto fib = make_ttg_fib_lt(N); // computes largest F_n < N + + ttg::make_graph_executable(fib.get()); + if (ttg::default_execution_context().rank() == 0) + fib->template in<0>()->send(1, Fn{});; + + ttg::execute(ttg::ttg_default_execution_context()); + ttg::fence(ttg::ttg_default_execution_context()); + + ttg::finalize(); + return 0; +} diff --git a/doc/dox/dev/devsamp/main/CMakeLists.txt b/doc/dox/dev/devsamp/main/CMakeLists.txt index 84b2eb865..5a127cd97 100644 --- a/doc/dox/dev/devsamp/main/CMakeLists.txt +++ b/doc/dox/dev/devsamp/main/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.14) -project(test) +project(ttg-devsample-main) find_package(ttg REQUIRED) diff --git a/doc/images/fibonacci_ttg.png b/doc/images/fibonacci_ttg.png new file mode 100644 index 000000000..e97fac19a Binary files /dev/null and b/doc/images/fibonacci_ttg.png differ diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 7f25dd6fe..5c3d4012c 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -27,11 +27,18 @@ add_ttg_executable(core-unittests-ttg "${ut_src}" LINK_LIBRARIES "${ut_libs}" CO add_ttg_executable(serialization serialization.cc unit_main.cpp LINK_LIBRARIES Catch2::Catch2 ttg-serialization $ COMPILE_DEFINITIONS $<$:TTG_HAS_BTAS=1>) -#target_link_libraries(serialization "Catch2::Catch2;ttg-serialization") -#if (TARGET BTAS::BTAS) -# target_link_libraries(serialization BTAS::BTAS) -# target_compile_definitions(serialization PRIVATE TTG_HAS_BTAS=1) -#endif (TARGET BTAS::BTAS) + +# Boost serialization test: checks low-level codegen +add_ttg_executable(serialization_boost serialization_boost.cc + LINK_LIBRARIES ttg-serialization-boost RUNTIMES "parsec") + +# Fib device test +if (TTG_HAVE_CUDA) + add_ttg_executable(fibonacci_device fibonacci_device.cc + fibonacci_cuda_kernel.h + fibonacci_cuda_kernel.cu + LINK_LIBRARIES std::coroutine RUNTIMES "parsec") +endif() # TODO: convert into unit test #if (TARGET MADworld) diff --git a/tests/unit/fibonacci_cuda_kernel.cu b/tests/unit/fibonacci_cuda_kernel.cu new file mode 100644 index 000000000..6fa316468 --- /dev/null +++ b/tests/unit/fibonacci_cuda_kernel.cu @@ -0,0 +1,15 @@ +#include "fibonacci_cuda_kernel.h" + +#ifdef TTG_HAVE_CUDA + +__global__ void cu_next_value(int64_t* fn_and_fnm1) { + int64_t fnp1 = fn_and_fnm1[0] + fn_and_fnm1[1]; + fn_and_fnm1[1] = fn_and_fnm1[0]; + fn_and_fnm1[0] = fnp1; +} + +void next_value(int64_t* fn_and_fnm1) { + cu_next_value<<<1, 1>>>(fn_and_fnm1); +} + +#endif // TTG_HAVE_CUDA diff --git a/tests/unit/fibonacci_cuda_kernel.h b/tests/unit/fibonacci_cuda_kernel.h new file mode 100644 index 000000000..a096ec3f1 --- /dev/null +++ b/tests/unit/fibonacci_cuda_kernel.h @@ -0,0 +1,4 @@ +#include "ttg/config.h" +#include + +void next_value(int64_t* fn_and_fnm1); \ No newline at end of file diff --git a/tests/unit/fibonacci_device.cc b/tests/unit/fibonacci_device.cc new file mode 100644 index 000000000..a1603cb58 --- /dev/null +++ b/tests/unit/fibonacci_device.cc @@ -0,0 +1,88 @@ +#include + +#if defined(TTG_HAVE_CUDA) +#define ES ttg::ExecutionSpace::CUDA +#include "cuda_runtime.h" +#include "fibonacci_cuda_kernel.h" +#else +#error " CUDA is required to build this test!" +#endif + +#include "ttg/serialization.h" + +const int64_t F_n_max = 1000; +/// N.B. contains values of F_n and F_{n-1} +struct Fn : public ttg::TTValue { + std::unique_ptr F; // F[0] = F_n, F[1] = F_{n-1} + ttg::Buffer b; + + Fn() : F(std::make_unique(2)), b(F.get(), 2) { F[0] = 1; F[1] = 0; } + + Fn(const Fn&) = delete; + Fn(Fn&& other) = default; + Fn& operator=(const Fn& other) = delete; + Fn& operator=(Fn&& other) = default; + + template + void serialize(Archive& ar) { + ttg::ttg_abort(); + } + template + void serialize(Archive& ar, const unsigned int) { + ttg::ttg_abort(); + } +}; + +auto make_ttg_fib_lt(const int64_t F_n_max = 1000) { + ttg::Edge f2f; + ttg::Edge f2p; + + auto fib = ttg::make_tt( + [=](int64_t n, Fn&& f_n) -> ttg::device::Task { + assert(n > 0); + ttg::trace("in fib: n=", n, " F_n=", f_n.F[0]); + + co_await ttg::device::select(f_n.b); + + next_value(f_n.b.current_device_ptr()); + + // wait for the task to complete and the values to be brought back to the host + co_await ttg::device::wait(f_n.b); + + if (f_n.F[0] < F_n_max) { + co_await ttg::device::forward(ttg::device::send<0>(n + 1, std::move(f_n))); + } else { + co_await ttg::device::forward(ttg::device::sendv<1>(std::move(f_n))); + } + }, + ttg::edges(f2f), ttg::edges(f2f, f2p), "fib"); + auto print = ttg::make_tt( + [=](Fn&& f_n) { + std::cout << "The largest Fibonacci number smaller than " << F_n_max << " is " << f_n.F[1] << std::endl; + }, + ttg::edges(f2p), ttg::edges(), "print"); + + auto ins = std::make_tuple(fib->template in<0>()); + std::vector> ops; + ops.emplace_back(std::move(fib)); + ops.emplace_back(std::move(print)); + return make_ttg(std::move(ops), ins, std::make_tuple(), "Fib_n < N"); +} + +int main(int argc, char* argv[]) { + ttg::initialize(argc, argv, -1); + ttg::trace_on(); + int64_t N = 1000; + if (argc > 1) N = std::atol(argv[1]); + auto fib = make_ttg_fib_lt(N); // computes largest F_n < N + + ttg::make_graph_executable(fib.get()); + if (ttg::default_execution_context().rank() == 0) + fib->template in<0>()->send(1, Fn{});; + + ttg::execute(ttg::ttg_default_execution_context()); + ttg::fence(ttg::ttg_default_execution_context()); + + ttg::finalize(); + return 0; +} diff --git a/tests/unit/serialization_boost.cc b/tests/unit/serialization_boost.cc new file mode 100644 index 000000000..954a8cc46 --- /dev/null +++ b/tests/unit/serialization_boost.cc @@ -0,0 +1,64 @@ +// +// Created by Eduard Valeyev on 2/27/24. +// + +#include "ttg/serialization.h" + +#include "ttg/util/meta.h" + +#include "ttg/serialization/data_descriptor.h" + +struct pod { + double a; + int b; + float c[3]; + friend bool operator==(const pod& lhs, const pod& rhs) { + return lhs.a == rhs.a && lhs.b == rhs.b && lhs.c[0] == rhs.c[0] && lhs.c[1] == rhs.c[1] && lhs.c[2] == rhs.c[2]; + } +}; + +BOOST_CLASS_IMPLEMENTATION(pod, primitive_type) +BOOST_IS_BITWISE_SERIALIZABLE(pod) + +#include "ttg/serialization/std/vector.h" +#include "ttg/serialization/std/array.h" + +static_assert(ttg::detail::is_memcpyable_v); +static_assert(ttg::detail::is_boost_buffer_serializable_v>); + +template +void save_to_buffer(const T& t, char* buffer, std::size_t buffer_size) { + ttg::detail::byte_ostreambuf oabuf(buffer, buffer_size); + ttg::detail::boost_byte_oarchive oa(oabuf); + oa << t; +} + +int main() { + + std::array buf; + + constexpr auto N = 10; + pod x{1., 2, {3., 4., 5.}}; + std::vector vx(N,x); + std::array ax{{x, x, x, x, x}}; + +// const ttg_data_descriptor* pod_dd = ttg::get_data_descriptor(); +// auto x_size = pod_dd->payload_size(&x); + + auto vx_size = ttg::default_data_descriptor::pack_payload(&vx, size(buf), 0, data(buf)); + auto ax_size = ttg::default_data_descriptor::pack_payload(&ax, size(buf)-vx_size, vx_size, data(buf)); + + decltype(vx) vx_copy; + decltype(ax) ax_copy; + auto vx_copy_size = ttg::default_data_descriptor::unpack_payload(&vx_copy, size(buf), 0, data(buf)); + assert(vx_copy == vx); + ttg::default_data_descriptor::unpack_payload(&ax_copy, size(buf)-vx_copy_size, vx_copy_size, data(buf)); + assert(ax_copy == ax); + +// constexpr std::size_t buffer_size = 4096; +// char buffer[buffer_size]; +// save_to_buffer(vx, buffer, buffer_size); +// save_to_buffer(ax, buffer, buffer_size); + + return 0; +} \ No newline at end of file diff --git a/ttg/ttg/func.h b/ttg/ttg/func.h index 28b75ace9..4273e7c66 100644 --- a/ttg/ttg/func.h +++ b/ttg/ttg/func.h @@ -211,6 +211,8 @@ namespace ttg { template inline std::enable_if_t>, void> send(size_t i, const keyT &key, valueT &&value) { + // to avoid mixups due to value being a terminal tuple + static_assert(!meta::is_output_terminal_v> && !meta::is_output_terminal_tuple_v>, "ttg::send(i, key, x) - invalid invocation, x cannot be a terminal or a tuple of terminals; did you mean to call another version of send, e.g. sendk(key, x)?"); detail::value_copy_handler copy_handler; auto *terminal_ptr = detail::get_out_terminal(i, "ttg::send(i, key, value)"); terminal_ptr->send(key, copy_handler(std::forward(value))); @@ -226,6 +228,8 @@ namespace ttg { template inline std::enable_if_t>, void> send(const keyT &key, valueT &&value) { + // to avoid mixups due to value being a terminal tuple + static_assert(!meta::is_output_terminal_v> && !meta::is_output_terminal_tuple_v>, "ttg::send(key, x) - invalid invocation, x cannot be a terminal or a tuple of terminals; did you mean to call another version of send, e.g. sendk(key, x)?"); send(i, key, std::forward(value)); } @@ -248,6 +252,8 @@ namespace ttg { // clang-format on template inline std::enable_if_t, void> sendk(std::size_t i, const keyT &key) { + // to avoid mixups due to key being a terminal tuple + static_assert(!meta::is_output_terminal_v> && !meta::is_output_terminal_tuple_v>, "ttg::sendk(i, x) - invalid invocation, x cannot be a terminal or a tuple of terminals; did you mean to call another version of send, e.g. send(x)?"); auto *terminal_ptr = detail::get_out_terminal(i, "ttg::sendk(i, key)"); terminal_ptr->sendk(key); } @@ -260,6 +266,8 @@ namespace ttg { // clang-format on template inline std::enable_if_t, void> sendk(const keyT &key) { + // to avoid mixups due to key being a terminal tuple + static_assert(!meta::is_output_terminal_v> && !meta::is_output_terminal_tuple_v>, "ttg::sendk(x) - invalid invocation, x cannot be a terminal or a tuple of terminals; did you mean to call another version of send, e.g. send(x)?"); sendk(i, key); } @@ -284,6 +292,8 @@ namespace ttg { // clang-format on template inline std::enable_if_t>, void> sendv(std::size_t i, valueT &&value) { + // to avoid mixups due to key being a terminal tuple + static_assert(!meta::is_output_terminal_v> && !meta::is_output_terminal_tuple_v>, "ttg::sendv(i, x) - invalid invocation, x cannot be a terminal or a tuple of terminals; did you mean to call another version of send, e.g. send(x)?"); detail::value_copy_handler copy_handler; auto *terminal_ptr = detail::get_out_terminal(i, "ttg::sendv(i, value)"); terminal_ptr->sendv(copy_handler(std::forward(value))); @@ -297,6 +307,8 @@ namespace ttg { // clang-format on template inline std::enable_if_t>, void> sendv(valueT &&value) { + // to avoid mixups due to key being a terminal tuple + static_assert(!meta::is_output_terminal_v> && !meta::is_output_terminal_tuple_v>, "ttg::sendv(x) - invalid invocation, x cannot be a terminal or a tuple of terminals; did you mean to call another version of send, e.g. send(x)?"); sendv(i, std::forward(value)); } diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 6071d83cc..0a0ddefcb 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -120,6 +120,8 @@ #include "ttg/device/device.h" +#include + #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES /* PaRSEC function declarations */ @@ -998,28 +1000,34 @@ namespace ttg_parsec { } if (NULL == copy_res) { - ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast(copy_in->get_ptr())); - if (replace && nullptr != copy_in->get_next_task()) { - /* replace the task that was deferred */ - parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task(); - new_copy->mark_mutable(); - /* replace the copy in the deferred task */ - for (int i = 0; i < deferred_op->data_count; ++i) { - if (deferred_op->copies[i] == copy_in) { - deferred_op->copies[i] = new_copy; - break; - } - } - copy_in->set_next_task(nullptr); - deferred_op->release_task(); - copy_in->reset_readers(); // set the copy back to being read-only - copy_in->increment_readers(); // register as reader - copy_res = copy_in; // return the copy we were passed - } else { - if (!readonly) { + // can only make a copy if Value is copy-constructible ... so this codepath should never be hit + if constexpr (std::is_copy_constructible_v>) { + ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast(copy_in->get_ptr())); + if (replace && nullptr != copy_in->get_next_task()) { + /* replace the task that was deferred */ + parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task(); new_copy->mark_mutable(); + /* replace the copy in the deferred task */ + for (int i = 0; i < deferred_op->data_count; ++i) { + if (deferred_op->copies[i] == copy_in) { + deferred_op->copies[i] = new_copy; + break; + } + } + copy_in->set_next_task(nullptr); + deferred_op->release_task(); + copy_in->reset_readers(); // set the copy back to being read-only + copy_in->increment_readers(); // register as reader + copy_res = copy_in; // return the copy we were passed + } else { + if (!readonly) { + new_copy->mark_mutable(); + } + copy_res = new_copy; // return the new copy } - copy_res = new_copy; // return the new copy + } + else { + throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id>().pretty_name() + " but the type is not copyable"); } } return copy_res; @@ -2005,7 +2013,19 @@ namespace ttg_parsec { /* iterate over the keys and have them use the copy we made */ parsec_task_t *task_ring = nullptr; for (auto &&key : keylist) { - set_arg_local_impl(key, *reinterpret_cast(copy->get_ptr()), copy, &task_ring); + // copy-constructible? can broadcast to any number of keys + if constexpr (std::is_copy_constructible_v) { + set_arg_local_impl(key, *reinterpret_cast(copy->get_ptr()), copy, &task_ring); + } + else { + // not copy-constructible? can move, but only to single key + static_assert(!std::is_reference_v); + if (std::size(keylist) == 1) + set_arg_local_impl(key, std::move(*reinterpret_cast(copy->get_ptr())), copy, &task_ring); + else { + throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id>().pretty_name() + " but the type is not copyable"); + } + } } if (nullptr != task_ring) { @@ -3496,10 +3516,20 @@ namespace ttg_parsec { set_arg(key, std::forward(value)); }; auto send_callback = [this](const keyT &key, const valueT &value) { - set_arg(key, value); + if constexpr (std::is_copy_constructible_v) { + set_arg(key, value); + } + else { + throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id().pretty_name() + " which is not copy constructible, std::move datum into send statement"); + } }; auto broadcast_callback = [this](const ttg::span &keylist, const valueT &value) { + if constexpr (std::is_copy_constructible_v) { broadcast_arg(keylist, value); + } + else { + throw std::logic_error(std::string("TTG::PaRSEC: broadcast_callback is invoked on datum of type ") + boost::typeindex::type_id().pretty_name() + " which is not copy constructible, broadcast is not possible with move-only type"); + } }; auto prepare_send_callback = [this](const ttg::span &keylist, const valueT &value) { prepare_send(keylist, value); @@ -3527,7 +3557,14 @@ namespace ttg_parsec { ////////////////////////////////////////////////////////////////// else if constexpr (ttg::meta::is_void_v && !std::is_void_v) { auto move_callback = [this](valueT &&value) { set_arg(std::forward(value)); }; - auto send_callback = [this](const valueT &value) { set_arg(value); }; + auto send_callback = [this](const valueT &value) { + if constexpr (std::is_copy_constructible_v) { + set_arg(value); + } + else { + throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id().pretty_name() + " which is not copy constructible, std::move datum into send/broadcast statement"); + } + }; auto setsize_callback = [this](std::size_t size) { set_argstream_size(size); }; auto finalize_callback = [this]() { finalize_argstream(); }; auto prepare_send_callback = [this](const valueT &value) { @@ -4340,8 +4377,9 @@ struct ttg::detail::value_copy_handler { } template - inline std::add_lvalue_reference_t operator()(Value &&value) { - static_assert(std::is_rvalue_reference_v || + inline std::conditional_t,Value,Value&&> operator()(Value &&value) { + constexpr auto value_is_rvref = std::is_rvalue_reference_v; + static_assert(value_is_rvref || std::is_copy_constructible_v>, "Data sent without being moved must be copy-constructible!"); @@ -4364,7 +4402,7 @@ struct ttg::detail::value_copy_handler { value_ptr = reinterpret_cast(copy->get_ptr()); copy_to_remove = copy; } else { - if constexpr (std::is_rvalue_reference_v) { + if constexpr (value_is_rvref) { /* this copy won't be modified anymore so mark it as read-only */ copy->reset_readers(); } @@ -4372,7 +4410,10 @@ struct ttg::detail::value_copy_handler { /* We're coming from a writer so mark the data as modified. * That way we can force a pushout in prepare_send if we move to read-only tasks (needed by PaRSEC). */ caller->data_flags = ttg_parsec::detail::ttg_parsec_data_flags::IS_MODIFIED; - return *value_ptr; + if constexpr (value_is_rvref) + return std::move(*value_ptr); + else + return *value_ptr; } template diff --git a/ttg/ttg/terminal.h b/ttg/ttg/terminal.h index dc9f4b08b..57c287ed5 100644 --- a/ttg/ttg/terminal.h +++ b/ttg/ttg/terminal.h @@ -202,38 +202,51 @@ namespace ttg { } template - std::enable_if_t, void> send(const Key &key, const Value &value) { - if (!send_callback) throw std::runtime_error("send callback not initialized"); - send_callback(key, value); - } - - template - std::enable_if_t && std::is_same_v>, void> + std::enable_if_t, void> send(const Key &key, Value &&value) { - if (!move_callback) throw std::runtime_error("move callback not initialized"); - move_callback(key, std::forward(value)); + static_assert(meta::is_none_void_v, "ttg::send<>() sending to a terminal expecting void key and value; use ttg::send<>() instead"); + static_assert(!meta::is_void_v, "ttg::send<>(key,value) sending to a terminal expecting void key; use ttg::sendv(value) instead"); + static_assert(!meta::is_void_v, "ttg::send<>(key,value) sending to a terminal expecting void value; use ttg::sendk(key) instead"); + constexpr auto value_is_rvref = !std::is_reference_v; + if constexpr (value_is_rvref) { + if (!move_callback) throw std::runtime_error("move callback not initialized"); + move_callback(key, std::move(value)); + } else { + if (!send_callback) throw std::runtime_error("send callback not initialized"); + send_callback(key, value); + } } template std::enable_if_t, void> sendk(const Key &key) { + static_assert(!meta::is_void_v && meta::is_void_v, "ttg::sendk<>(key) sending to a terminal expecting void key and nonvoid value; use ttg::sendv<>(value) instead"); + static_assert(!meta::is_void_v, "ttg::sendk<>(key) sending to a terminal expecting void key; use ttg::send() instead"); + static_assert(meta::is_void_v, "ttg::sendk<>(key) sending to a terminal expecting nonvoid value; use ttg::send(key,value) instead"); if (!send_callback) throw std::runtime_error("send callback not initialized"); send_callback(key); } template - std::enable_if_t, void> sendv(const Value &value) { - if (!send_callback) throw std::runtime_error("send callback not initialized"); - send_callback(value); - } - - template - std::enable_if_t && std::is_same_v>, void> sendv( + std::enable_if_t, void> sendv( Value &&value) { - if (!move_callback) throw std::runtime_error("move callback not initialized"); - move_callback(std::forward(value)); + static_assert(meta::is_void_v && !meta::is_void_v, "ttg::sendv<>(value) sending to a terminal expecting nonvoid key and void value; use ttg::sendk<>(key) instead"); + static_assert(meta::is_void_v, "ttg::sendv<>(value) sending to a terminal expecting nonvoid key; use ttg::send(key, value) instead"); + static_assert(!meta::is_void_v, "ttg::sendv<>(value) sending to a terminal expecting void value; use ttg::send() instead"); + constexpr auto value_is_rvref = !std::is_reference_v; + if constexpr (value_is_rvref) { + if (!move_callback) throw std::runtime_error("move callback not initialized"); + move_callback(std::move(value)); + } + else { + if (!send_callback) throw std::runtime_error("send callback not initialized"); + send_callback(value); + } } void send() { + static_assert(!meta::is_none_void_v, "ttg::send<>() sending to a terminal expecting nonvoid key and value; use ttg::send<>(key,value) instead"); + static_assert(meta::is_void_v, "ttg::send<>() sending to a terminal expecting nonvoid key; use ttg::sendk<>(key) instead"); + static_assert(meta::is_void_v, "ttg::send<>() sending to a terminal expecting nonvoid value; use ttg::sendv<>(value) instead"); if (!send_callback) throw std::runtime_error("send callback not initialized"); send_callback(); } @@ -304,7 +317,7 @@ namespace ttg { template void prepare_send(const rangeT &keylist, Value &&value) { - const Value &v = value; + const std::remove_reference_t &v = value; if (prepare_send_callback) { if constexpr (ttg::meta::is_iterable_v) { prepare_send_callback(ttg::span(&(*std::begin(keylist)), @@ -317,16 +330,11 @@ namespace ttg { } } - template + template void prepare_send(Value &&value) { - const Value &v = value; + const std::remove_reference_t &v = value; if (prepare_send_callback) { - if constexpr (ttg::meta::is_iterable_v) { prepare_send_callback(v); - } else { - /* got something we cannot iterate over (single element?) so put one element in the span */ - prepare_send_callback(v); - } } } }; @@ -463,18 +471,6 @@ namespace ttg { in->connect_pull(this); } - template - std::enable_if_t,void> send(const Key &key, const Value &value) { - for (auto && successor : this->successors()) { - assert(successor->get_type() != TerminalBase::Type::Write); - if (successor->get_type() == TerminalBase::Type::Read) { - static_cast> *>(successor)->send(key, value); - } else if (successor->get_type() == TerminalBase::Type::Consume) { - static_cast *>(successor)->send(key, value); - } - } - } - template std::enable_if_t && meta::is_void_v, void> sendk(const Key &key) { for (auto &&successor : this->successors()) { @@ -488,15 +484,30 @@ namespace ttg { } template - std::enable_if_t && !meta::is_void_v, void> sendv(const Value &value) { - for (auto &&successor : this->successors()) { - assert(successor->get_type() != TerminalBase::Type::Write); + std::enable_if_t && !meta::is_void_v, void> sendv(Value&& value) { + const std::size_t N = this->nsuccessors(); + TerminalBase *move_successor = nullptr; + // send copies to every terminal except the one we will move the results to + for (std::size_t i = 0; i != N; ++i) { + TerminalBase *successor = this->successors().at(i); if (successor->get_type() == TerminalBase::Type::Read) { - static_cast> *>(successor)->sendv(value); + // if only have 1 successor forward value even if successor is read-only, so we can deal with move-only types + auto* read_successor = static_cast> *>(successor); + if (N != 1) + read_successor->sendv(value); + else + read_successor->sendv(std::forward(value)); } else if (successor->get_type() == TerminalBase::Type::Consume) { - static_cast *>(successor)->sendv(value); + if (nullptr == move_successor) { + move_successor = successor; + } else { + static_cast *>(successor)->sendv(value); + } } } + if (nullptr != move_successor) { + static_cast *>(move_successor)->sendv(std::forward(value)); + } } template @@ -517,7 +528,7 @@ namespace ttg { } template - std::enable_if_t && std::is_same_v>, void> + std::enable_if_t, void> send(const Key &key, Value &&value) { const std::size_t N = this->nsuccessors(); TerminalBase *move_successor = nullptr; @@ -525,7 +536,12 @@ namespace ttg { for (std::size_t i = 0; i != N; ++i) { TerminalBase *successor = this->successors().at(i); if (successor->get_type() == TerminalBase::Type::Read) { - static_cast> *>(successor)->send(key, value); + // if only have 1 successor forward value even if successor is read-only, so we can deal with move-only types + auto* read_successor = static_cast> *>(successor); + if (N != 1) + read_successor->send(key, value); + else + read_successor->send(key, std::forward(value)); } else if (successor->get_type() == TerminalBase::Type::Consume) { if (nullptr == move_successor) { move_successor = successor; @@ -579,8 +595,8 @@ namespace ttg { } } - template - std::enable_if_t && !meta::is_void_v, void> + template + std::enable_if_t && !meta::is_void_v, void> prepare_send(const Value &value) { for (auto &&successor : this->successors()) { assert(successor->get_type() != TerminalBase::Type::Write); diff --git a/ttg/ttg/util/env.h b/ttg/ttg/util/env.h index 4e07bff7b..f78e03640 100644 --- a/ttg/ttg/util/env.h +++ b/ttg/ttg/util/env.h @@ -21,7 +21,7 @@ namespace ttg { /// using the unofficial extension MPIX_Query_cuda_support). However, since not all MPI implementations /// support this extension, users can force the use of device buffers in communication by setting /// `TTG_FORCE_DEVICE_COMM` to a non-negative number. - /// @return true if the user wants to force the use of device-side buffers in communicaton. + /// @return true if the user wants to force the use of device-side buffers in communication. bool force_device_comm(); } // namespace detail