diff --git a/examples/potrf/potrf.h b/examples/potrf/potrf.h index 00452f78f..fb0c4f9b4 100644 --- a/examples/potrf/potrf.h +++ b/examples/potrf/potrf.h @@ -10,11 +10,11 @@ #if defined(TTG_HAVE_CUDART) #define ES ttg::ExecutionSpace::CUDA -#define TASKRET -> ttg::device_task +#define TASKRET -> ttg::device::Task #include #elif defined(TTG_HAVE_HIP) #define ES ttg::ExecutionSpace::HIP -#define TASKRET -> ttg::device_task +#define TASKRET -> ttg::device::Task #include #include #else @@ -134,14 +134,14 @@ namespace potrf { //auto norms_s = ttg::make_scratch(norms.data(), ttg::scope::Allocate, norms.size()); /* the workspace and the devInfo must be device-level pointers */ //co_await ttg::to_device(tile_kk.buffer(), devWS, devInfo, norms_s); - co_await ttg::to_device(tile_kk.buffer(), devWS, devInfo); + co_await ttg::device::select(tile_kk.buffer(), devWS, devInfo); /* compute the norm at input */ static_assert(std::is_same_v, "Norm debugging only implementation for T=double"); device_norm(tile_kk, &norms[0]); #else /* the workspace and the devInfo must be device-level pointers */ - co_await ttg::to_device(tile_kk.buffer(), devWS, devInfo); + co_await ttg::device::select(tile_kk.buffer(), devWS, devInfo); #endif // DEBUG_TILES_VALUES int device = ttg::device::current_device(); @@ -159,14 +159,14 @@ namespace potrf { static_assert(std::is_same_v, "Verification only implementation for T=double"); device_norm(tile_kk, &norms[1]); /* wait for the kernel to complete */ - co_await ttg::wait_kernel(devInfo); + co_await ttg::device::wait(devInfo); // check that we got the input tile we expected assert(check_norm(tile_kk.norm(), norms[0])); // set the new norm tile_kk.set_norm(norms[1]); #else /* wait for the kernel to complete */ - co_await ttg::wait_kernel(devInfo); + co_await ttg::device::wait(devInfo); #endif // DEBUG_TILES_VALUES delete[] hostWS; @@ -268,9 +268,9 @@ namespace potrf { #ifdef DEBUG_TILES_VALUES std::array norms; // input for tile_kk & tile_mk and output //auto norms_s = ttg::make_scratch(norms.data(), ttg::scope::Allocate, norms.size()); - co_await ttg::to_device(tile_kk.buffer(), tile_mk.buffer()); + co_await ttg::device::select(tile_kk.buffer(), tile_mk.buffer()); #else - co_await ttg::to_device(tile_kk.buffer(), tile_mk.buffer()); + co_await ttg::device::select(tile_kk.buffer(), tile_mk.buffer()); #endif // DEBUG_TILES_VALUES int device = ttg::device::current_device(); @@ -306,7 +306,7 @@ namespace potrf { /* compute the norms at input */ device_norm(tile_mk, &norms[2]); /* wait for the kernel to complete */ - co_await ttg::wait_kernel(); + co_await ttg::device::wait(); // check that we got the input tiles we expected assert(check_norm(tile_kk.norm(), norms[0])); assert(check_norm(tile_mk.norm(), norms[1])); @@ -400,12 +400,12 @@ namespace potrf { #ifdef DEBUG_TILES_VALUES std::array norms; // input for tile_kk & tile_mk and output //auto norms_s = ttg::make_scratch(norms.data(), ttg::scope::Allocate, norms.size()); - co_await ttg::to_device(tile_kk.buffer(), tile_mk.buffer()); + co_await ttg::device::select(tile_kk.buffer(), tile_mk.buffer()); /* compute the norms at input */ device_norm(tile_mk, &norms[0]); device_norm(tile_kk, &norms[1]); #else - co_await ttg::to_device(tile_kk.buffer(), tile_mk.buffer()); + co_await ttg::device::select(tile_kk.buffer(), tile_mk.buffer()); #endif // DEBUG_TILES_VALUES int device = ttg::device::current_device(); @@ -435,7 +435,7 @@ namespace potrf { /* compute the norm at output */ device_norm(tile_kk, &norms[2]); /* wait for the kernel to complete */ - co_await ttg::wait_kernel(); + co_await ttg::device::wait(); // check that we got the input tiles we expected assert(check_norm(tile_mk.norm(), norms[0])); assert(check_norm(tile_kk.norm(), norms[1])); @@ -526,14 +526,14 @@ namespace potrf { #ifdef DEBUG_TILES_VALUES std::array norms; // input for tile_mk & tile_nk & tile_mn and output //auto norms_s = ttg::make_scratch(norms.data(), ttg::scope::Allocate, norms.size()); - co_await ttg::to_device(tile_mk.buffer(), tile_nk.buffer(), tile_mn.buffer()); + co_await ttg::device::select(tile_mk.buffer(), tile_nk.buffer(), tile_mn.buffer()); /* compute the norms at input */ device_norm(tile_mk, &norms[0]); device_norm(tile_nk, &norms[1]); device_norm(tile_mn, &norms[2]); #else - co_await ttg::to_device(tile_mk.buffer(), tile_nk.buffer(), tile_mn.buffer()); + co_await ttg::device::select(tile_mk.buffer(), tile_nk.buffer(), tile_mn.buffer()); #endif // DEBUG_TILES_VALUES int device = ttg::device::current_device(); @@ -563,7 +563,7 @@ namespace potrf { /* compute the norm at output */ device_norm(tile_mn, &norms[3]); /* wait for the kernel to complete */ - co_await ttg::wait_kernel(); + co_await ttg::device::wait(); // check that we got the input tiles we expected assert(check_norm(tile_mk.norm(), norms[0])); assert(check_norm(tile_nk.norm(), norms[1])); diff --git a/examples/spmm/spmm_cuda.cc b/examples/spmm/spmm_cuda.cc index 5386d7da5..6235554b2 100644 --- a/examples/spmm/spmm_cuda.cc +++ b/examples/spmm/spmm_cuda.cc @@ -812,7 +812,7 @@ class SpMM25D { } } - ttg::device_task op(const Key<3> &ijk, typename baseT::input_refs_tuple_type &&_ijk, + ttg::device::Task op(const Key<3> &ijk, typename baseT::input_refs_tuple_type &&_ijk, std::tuple, Blk>, Out, Blk>> &result) { const auto i = ijk[0]; const auto j = ijk[1]; @@ -830,7 +830,7 @@ class SpMM25D { } /* pull all buffers onto the device */ - co_await ttg::to_device(A.b, B.b, C.b); + co_await ttg::device::select(A.b, B.b, C.b); /* everything is on the device, call the gemm */ device_gemm(C, A, B); @@ -844,7 +844,7 @@ class SpMM25D { (have_next_k ? std::to_string(next_k) : "does not exist")); /* wait for the kernel to complete */ - co_await ttg::wait_kernel(); + co_await ttg::device::wait(); // compute the contrib, pass the running total to the next flow, if needed diff --git a/ttg/ttg/device/task.h b/ttg/ttg/device/task.h index 1b099b87f..48ded0b4b 100644 --- a/ttg/ttg/device/task.h +++ b/ttg/ttg/device/task.h @@ -9,22 +9,7 @@ #include "ttg/impl_selector.h" #include "ttg/ptr.h" -namespace ttg { - - - /* yielded when waiting on a kernel to complete */ - struct device_op_wait_kernel - { }; - - enum ttg_device_coro_state { - TTG_DEVICE_CORO_STATE_NONE, - TTG_DEVICE_CORO_INIT, - TTG_DEVICE_CORO_WAIT_TRANSFER, - TTG_DEVICE_CORO_WAIT_KERNEL, - TTG_DEVICE_CORO_SENDOUT, - TTG_DEVICE_CORO_COMPLETE - }; - +namespace ttg::device { namespace detail { template @@ -33,13 +18,31 @@ namespace ttg { }; } // namespace detail + /** + * Select a device to execute on based on the provided buffer and scratchspace objects. + * Returns an object that should be awaited on using \c co_await. + * Upon resume, the device is selected (i.e., \sa ttg::device::current_device and + * \sa ttg::device::current_stream are available) and the buffers are available on the + * selected device. + */ template [[nodiscard]] - inline auto to_device(Args&&... args) { + inline auto select(Args&&... args) { return detail::to_device_t...>{std::tie(std::forward(args)...)}; } namespace detail { + + enum ttg_device_coro_state { + TTG_DEVICE_CORO_STATE_NONE, + TTG_DEVICE_CORO_INIT, + TTG_DEVICE_CORO_WAIT_TRANSFER, + TTG_DEVICE_CORO_WAIT_KERNEL, + TTG_DEVICE_CORO_SENDOUT, + TTG_DEVICE_CORO_COMPLETE + }; + + template struct wait_kernel_t { std::tuple ties; @@ -59,374 +62,369 @@ namespace ttg { }; } // namespace detail - /* Wait for kernel to complete and provided ttg::Buffer and ttg::devicescratch - * to be transferred back to host */ + /** + * Wait for previously submitted kernels to complete and provided + * ttg::Buffer and ttg::devicescratch to be transferred back to host. + * Must only be called after awaiting \sa ttg::device::select has resumed. + */ template [[nodiscard]] - inline auto wait_kernel(Buffers&&... args) { + inline auto wait(Buffers&&... args) { static_assert(((ttg::detail::is_buffer_v> ||ttg::detail::is_devicescratch_v>)&&...), "Only ttg::Buffer and ttg::devicescratch can be waited on!"); return detail::wait_kernel_t...>{std::tie(std::forward(args)...)}; } + /****************************** + * Send/Broadcast handling + * We pass the value returned by the backend's copy handler into a coroutine + * and execute the first part (prepare), before suspending it. + * The second part (send/broadcast) is executed after the task completed. + ******************************/ - /* TODO: move all device code into ttg::device */ - namespace device { - - /****************************** - * Send/Broadcast handling - * We pass the value returned by the backend's copy handler into a coroutine - * and execute the first part (prepare), before suspending it. - * The second part (send/broadcast) is executed after the task completed. - ******************************/ + namespace detail { + struct send_coro_promise_type; - namespace detail { - struct send_coro_promise_type; + using send_coro_handle_type = TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle; - using send_coro_handle_type = TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle; + /// task that can be resumed after some events occur + struct send_coro_state : public send_coro_handle_type { + using base_type = send_coro_handle_type; - /// task that can be resumed after some events occur - struct send_coro_state : public send_coro_handle_type { - using base_type = send_coro_handle_type; + /// these are members mandated by the promise_type concept + ///@{ - /// these are members mandated by the promise_type concept - ///@{ + using promise_type = send_coro_promise_type; - using promise_type = send_coro_promise_type; + ///@} - ///@} + send_coro_state(base_type base) : base_type(std::move(base)) {} - send_coro_state(base_type base) : base_type(std::move(base)) {} + base_type& handle() { return *this; } - base_type& handle() { return *this; } + /// @return true if ready to resume + inline bool ready() { + return true; + } - /// @return true if ready to resume - inline bool ready() { - return true; - } + /// @return true if task completed and can be destroyed + inline bool completed(); + }; - /// @return true if task completed and can be destroyed - inline bool completed(); - }; + struct send_coro_promise_type { - struct send_coro_promise_type { + /* do not suspend the coroutine on first invocation, we want to run + * the coroutine immediately and suspend only once. + */ + TTG_CXX_COROUTINE_NAMESPACE::suspend_never initial_suspend() { + return {}; + } - /* do not suspend the coroutine on first invocation, we want to run - * the coroutine immediately and suspend only once. - */ - TTG_CXX_COROUTINE_NAMESPACE::suspend_never initial_suspend() { - return {}; - } + /* we don't suspend the coroutine at the end. + * it can be destroyed once the send/broadcast is done + */ + TTG_CXX_COROUTINE_NAMESPACE::suspend_never final_suspend() noexcept { + return {}; + } - /* we don't suspend the coroutine at the end. - * it can be destroyed once the send/broadcast is done - */ - TTG_CXX_COROUTINE_NAMESPACE::suspend_never final_suspend() noexcept { - return {}; - } + send_coro_state get_return_object() { return send_coro_state{send_coro_handle_type::from_promise(*this)}; } - send_coro_state get_return_object() { return send_coro_state{send_coro_handle_type::from_promise(*this)}; } + /* the send coros only have an empty co_await */ + TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(ttg::Void) { + return {}; + } - /* the send coros only have an empty co_await */ - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(ttg::Void) { - return {}; - } + void unhandled_exception() { + std::cerr << "Send coroutine caught an unhandled exception!" << std::endl; + throw; // fwd + } - void unhandled_exception() { - std::cerr << "Send coroutine caught an unhandled exception!" << std::endl; - throw; // fwd - } + }; - }; + template + inline send_coro_state send_coro (const Key& key, Value&& value, ttg::Out> &t, + ttg::detail::value_copy_handler& ch) { + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + Key k = key; + t.prepare_send(k, std::forward(value)); + co_await ttg::Void{}; // we'll come back once the task is done + t.send(k, std::forward(value)); + }; - template - inline send_coro_state send_coro (const Key& key, Value&& value, ttg::Out> &t, + template + inline send_coro_state sendv_coro (Value&& value, ttg::Out> &t, ttg::detail::value_copy_handler& ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - Key k = key; - t.prepare_send(k, std::forward(value)); - co_await ttg::Void{}; // we'll come back once the task is done - t.send(k, std::forward(value)); - }; - - template - inline send_coro_state sendv_coro (Value&& value, ttg::Out> &t, - ttg::detail::value_copy_handler& ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - t.prepare_send(std::forward(value)); - co_await ttg::Void{}; // we'll come back once the task is done - t.sendv(std::forward(value)); - }; + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + t.prepare_send(std::forward(value)); + co_await ttg::Void{}; // we'll come back once the task is done + t.sendv(std::forward(value)); + }; - template - inline send_coro_state sendk_coro (const Key& key, ttg::Out &t) { - // no need to prepare the send but we have to suspend once - Key k = key; - co_await ttg::Void{}; // we'll come back once the task is done - t.sendk(k); - }; + template + inline send_coro_state sendk_coro (const Key& key, ttg::Out &t) { + // no need to prepare the send but we have to suspend once + Key k = key; + co_await ttg::Void{}; // we'll come back once the task is done + t.sendk(k); + }; - template - inline send_coro_state send_coro (ttg::Out &t) { - // no need to prepare the send but we have to suspend once - co_await ttg::Void{}; // we'll come back once the task is done - t.send(); - }; - } // namespace detail + template + inline send_coro_state send_coro (ttg::Out &t) { + // no need to prepare the send but we have to suspend once + co_await ttg::Void{}; // we'll come back once the task is done + t.send(); + }; + struct send_t { + send_coro_state coro; + }; + } // namespace detail - /* functionality to deal with sending/broadcasting */ - namespace detail { - struct send_t { - send_coro_state coro; - }; - } // namespace detail + template + inline detail::send_t send(const keyT &key, valueT &&value, + std::tuple...> &t) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{detail::send_coro(key, copy_handler(std::forward(value)), std::get(t), copy_handler)}; + } - template - inline detail::send_t send(const keyT &key, valueT &&value, - std::tuple...> &t) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{detail::send_coro(key, copy_handler(std::forward(value)), std::get(t), copy_handler)}; - } + template + inline detail::send_t sendv( + valueT &&value, std::tuple...> &t) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), std::get(t), copy_handler)}; + } - template - inline detail::send_t sendv( - valueT &&value, std::tuple...> &t) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), std::get(t), copy_handler)}; - } + template + inline detail::send_t sendk( + const Key& key, std::tuple...> &t) { + return detail::send_t{detail::sendk_coro(key, std::get(t))}; + } - template - inline detail::send_t sendk( - const Key& key, std::tuple...> &t) { - return detail::send_t{detail::sendk_coro(key, std::get(t))}; - } + // clang-format off + /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task + /// \param[in] i Identifies which output terminal of this template task to select for sending + /// \param[in] key: the id of the task(s) receiving the value + /// \param[in] value: the value to send to the receiving task(s) + // clang-format on + template + inline detail::send_t send(size_t i, const keyT &key, valueT &&value) { + ttg::detail::value_copy_handler copy_handler; + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + return detail::send_t{detail::send_coro(key, copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; + } - // clang-format off - /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task - /// \param[in] i Identifies which output terminal of this template task to select for sending - /// \param[in] key: the id of the task(s) receiving the value - /// \param[in] value: the value to send to the receiving task(s) - // clang-format on - template - inline detail::send_t send(size_t i, const keyT &key, valueT &&value) { - ttg::detail::value_copy_handler copy_handler; - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - return detail::send_t{detail::send_coro(key, copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; - } + // clang-format off + /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task + /// \note this is provided to support `send` with and without explicitly-passed terminal tuple + /// \tparam Identifies which output terminal of this template task to select for sending + /// \param[in] key: the id of the task(s) receiving the value + /// \param[in] value: the value to send to the receiving task(s) + // clang-format on + template + inline auto send(const keyT &key, valueT &&value) { + return ttg::device::send(i, key, std::forward(value)); + } - // clang-format off - /// \brief Sends a task id and a value to the template tasks attached to the output terminal of this template task - /// \note this is provided to support `send` with and without explicitly-passed terminal tuple - /// \tparam Identifies which output terminal of this template task to select for sending - /// \param[in] key: the id of the task(s) receiving the value - /// \param[in] value: the value to send to the receiving task(s) - // clang-format on - template - inline auto send(const keyT &key, valueT &&value) { - return ttg::device::send(i, key, std::forward(value)); - } + template + inline detail::send_t sendv(std::size_t i, valueT &&value) { + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; + } - template - inline detail::send_t sendv(std::size_t i, valueT &&value) { - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{detail::sendv_coro(copy_handler(std::forward(value)), *terminal_ptr, copy_handler)}; - } + template + inline detail::send_t sendk(std::size_t i, const Key& key) { + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + return detail::send_t{detail::sendk_coro(key, *terminal_ptr)}; + } - template - inline detail::send_t sendk(std::size_t i, const Key& key) { - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - return detail::send_t{detail::sendk_coro(key, *terminal_ptr)}; - } + template + inline detail::send_t send(std::size_t i) { + auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); + return detail::send_t{detail::send_coro(*terminal_ptr)}; + } - template - inline detail::send_t send(std::size_t i) { - auto *terminal_ptr = ttg::detail::get_out_terminal(i, "ttg::device::send(i, key, value)"); - return detail::send_t{detail::send_coro(*terminal_ptr)}; - } + template + inline detail::send_t sendv(valueT &&value) { + return sendv(i, std::forward(value)); + } - template - inline detail::send_t sendv(valueT &&value) { - return sendv(i, std::forward(value)); - } + template + inline detail::send_t sendk(const Key& key) { + return sendk(i, key); + } - template - inline detail::send_t sendk(const Key& key) { - return sendk(i, key); - } + template + inline detail::send_t sendk() { + return send(i); + } - template - inline detail::send_t sendk() { - return send(i); - } + namespace detail { - namespace detail { - - template - struct broadcast_keylist_trait { - using type = T; - }; - - /* overload for iterable types that extracts the type of the first element */ - template - struct broadcast_keylist_trait>> { - using key_type = decltype(*std::begin(std::get<0>(std::declval()))); - }; - - template - inline void prepare_broadcast(const std::tuple &keylists, valueT &&value, - std::tuple...> &t) { - std::get(t).prepare_send(std::get(keylists), std::forward(value)); - if constexpr (sizeof...(Is) > 0) { - prepare_broadcast(keylists, std::forward(value), t); - } - } + template + struct broadcast_keylist_trait { + using type = T; + }; - template - inline void prepare_broadcast(const std::tuple &keylists, valueT &&value) { - using key_t = typename broadcast_keylist_trait< - std::tuple_element_t...>> - >::key_type; - auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); - terminal_ptr->prepare_send(std::get(keylists), value); - if constexpr (sizeof...(Is) > 0) { - prepare_broadcast(keylists, std::forward(value)); - } - } + /* overload for iterable types that extracts the type of the first element */ + template + struct broadcast_keylist_trait>> { + using key_type = decltype(*std::begin(std::get<0>(std::declval()))); + }; - template - inline void broadcast(const std::tuple &keylists, valueT &&value, - std::tuple...> &t) { - std::get(t).broadcast(std::get(keylists), std::forward(value)); - if constexpr (sizeof...(Is) > 0) { - detail::broadcast(keylists, std::forward(value), t); - } + template + inline void prepare_broadcast(const std::tuple &keylists, valueT &&value, + std::tuple...> &t) { + std::get(t).prepare_send(std::get(keylists), std::forward(value)); + if constexpr (sizeof...(Is) > 0) { + prepare_broadcast(keylists, std::forward(value), t); } + } - template - inline void broadcast(const std::tuple &keylists, valueT &&value) { - using key_t = typename broadcast_keylist_trait< - std::tuple_element_t...>> - >::key_type; - auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); - terminal_ptr->broadcast(std::get(keylists), value); - if constexpr (sizeof...(Is) > 0) { - ttg::device::detail::broadcast(keylists, std::forward(value)); - } + template + inline void prepare_broadcast(const std::tuple &keylists, valueT &&value) { + using key_t = typename broadcast_keylist_trait< + std::tuple_element_t...>> + >::key_type; + auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); + terminal_ptr->prepare_send(std::get(keylists), value); + if constexpr (sizeof...(Is) > 0) { + prepare_broadcast(keylists, std::forward(value)); } + } - /* overload with explicit terminals */ - template - inline send_coro_state - broadcast_coro(RangesT &&keylists, valueT &&value, - std::tuple...> &t, - ttg::detail::value_copy_handler&& ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - RangesT kl = std::forward(keylists); // capture the keylist(s) - if constexpr (ttg::meta::is_tuple_v) { - // treat as tuple - prepare_broadcast<0, I, Is...>(kl, std::forward>(value), t); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value), t); - } else if constexpr (!ttg::meta::is_tuple_v) { - // create a tie to the captured keylist - prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); - } + template + inline void broadcast(const std::tuple &keylists, valueT &&value, + std::tuple...> &t) { + std::get(t).broadcast(std::get(keylists), std::forward(value)); + if constexpr (sizeof...(Is) > 0) { + detail::broadcast(keylists, std::forward(value), t); } + } - /* overload with implicit terminals */ - template - inline send_coro_state - broadcast_coro(RangesT &&keylists, valueT &&value, - ttg::detail::value_copy_handler&& ch) { - ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro - RangesT kl = std::forward(keylists); // capture the keylist(s) - if constexpr (ttg::meta::is_tuple_v) { - // treat as tuple - static_assert(sizeof...(Is)+1 == std::tuple_size_v, - "Size of keylist tuple must match the number of output terminals"); - prepare_broadcast<0, I, Is...>(kl, std::forward>(value)); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value)); - } else if constexpr (!ttg::meta::is_tuple_v) { - // create a tie to the captured keylist - prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); - co_await ttg::Void{}; // we'll come back once the task is done - ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); - } + template + inline void broadcast(const std::tuple &keylists, valueT &&value) { + using key_t = typename broadcast_keylist_trait< + std::tuple_element_t...>> + >::key_type; + auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcast(keylists, value)"); + terminal_ptr->broadcast(std::get(keylists), value); + if constexpr (sizeof...(Is) > 0) { + ttg::device::detail::broadcast(keylists, std::forward(value)); } - } // namespace detail + } - /* overload with explicit terminals and keylist passed by const reference */ - template - [[nodiscard]] - inline detail::send_t broadcast(rangeT &&keylist, - valueT &&value, - std::tuple...> &t) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{ - detail::broadcast_coro(std::forward(keylist), - copy_handler(std::forward(value)), - t, std::move(copy_handler))}; + inline send_coro_state + broadcast_coro(RangesT &&keylists, valueT &&value, + std::tuple...> &t, + ttg::detail::value_copy_handler&& ch) { + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + RangesT kl = std::forward(keylists); // capture the keylist(s) + if constexpr (ttg::meta::is_tuple_v) { + // treat as tuple + prepare_broadcast<0, I, Is...>(kl, std::forward>(value), t); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value), t); + } else if constexpr (!ttg::meta::is_tuple_v) { + // create a tie to the captured keylist + prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value), t); + } } - /* overload with implicit terminals and keylist passed by const reference */ - template - inline detail::send_t broadcast(rangeT &&keylist, valueT &&value) { - ttg::detail::value_copy_handler copy_handler; - return detail::send_t{broadcast_coro(std::tie(keylist), copy_handler(std::forward(value)), - std::move(copy_handler))}; + inline send_coro_state + broadcast_coro(RangesT &&keylists, valueT &&value, + ttg::detail::value_copy_handler&& ch) { + ttg::detail::value_copy_handler copy_handler = std::move(ch); // destroyed at the end of the coro + RangesT kl = std::forward(keylists); // capture the keylist(s) + if constexpr (ttg::meta::is_tuple_v) { + // treat as tuple + static_assert(sizeof...(Is)+1 == std::tuple_size_v, + "Size of keylist tuple must match the number of output terminals"); + prepare_broadcast<0, I, Is...>(kl, std::forward>(value)); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(kl, std::forward>(value)); + } else if constexpr (!ttg::meta::is_tuple_v) { + // create a tie to the captured keylist + prepare_broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); + } } + } // namespace detail - template - [[nodiscard]] - std::vector forward(Args&&... args) { - // TODO: check the cost of this! - return std::vector{std::forward(args)...}; - } + /* overload with explicit terminals and keylist passed by const reference */ + template + [[nodiscard]] + inline detail::send_t broadcast(rangeT &&keylist, + valueT &&value, + std::tuple...> &t) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{ + detail::broadcast_coro(std::forward(keylist), + copy_handler(std::forward(value)), + t, std::move(copy_handler))}; + } - } // namespace device + /* overload with implicit terminals and keylist passed by const reference */ + template + inline detail::send_t broadcast(rangeT &&keylist, valueT &&value) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{broadcast_coro(std::tie(keylist), copy_handler(std::forward(value)), + std::move(copy_handler))}; + } + + template + [[nodiscard]] + std::vector forward(Args&&... args) { + // TODO: check the cost of this! + return std::vector{std::forward(args)...}; + } /******************************************* * Device task promise and coroutine handle *******************************************/ - - struct device_task_promise_type; - - using device_task_handle_type = TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle; + namespace detail { + // fwd-decl + struct device_task_promise_type; + // base type for ttg::device::Task + using device_task_handle_type = TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle; + } // namespace detail /// task that can be resumed after some events occur - struct device_task : public device_task_handle_type { - using base_type = device_task_handle_type; + struct Task : public detail::device_task_handle_type { + using base_type = detail::device_task_handle_type; /// these are members mandated by the promise_type concept ///@{ - using promise_type = device_task_promise_type; + using promise_type = detail::device_task_promise_type; ///@} - device_task(base_type base) : base_type(std::move(base)) {} + Task(base_type base) : base_type(std::move(base)) {} base_type& handle() { return *this; } @@ -439,176 +437,113 @@ namespace ttg { inline bool completed(); }; - /* The promise type that stores the views provided by the - * application task coroutine on the first co_yield. It subsequently - * tracks the state of the task when it moves from waiting for transfers - * to waiting for the submitted kernel to complete. */ - struct device_task_promise_type { - - /* do not suspend the coroutine on first invocation, we want to run - * the coroutine immediately and suspend when we get the device transfers. - */ - TTG_CXX_COROUTINE_NAMESPACE::suspend_never initial_suspend() { - m_state = TTG_DEVICE_CORO_INIT; - return {}; - } - - /* suspend the coroutine at the end of the execution - * so we can access the promise. - * TODO: necessary? maybe we can save one suspend here - */ - TTG_CXX_COROUTINE_NAMESPACE::suspend_always final_suspend() noexcept { - m_state = TTG_DEVICE_CORO_COMPLETE; - return {}; - } + namespace detail { -#if 0 - /* waiting for transfers to complete should always suspend - * TODO: as an optimization, we could check here if all data - * is already available and avoid suspending... - */ - template - TTG_CXX_COROUTINE_NAMESPACE::suspend_always yield_value(std::tuple &views) { - /* gather all the views (host object + view spans, type-punned) into a vector */ - constexpr static std::size_t view_count = std::tuple_size_v>; - std::cout << "yield_value: views" << std::endl; - m_spans.clear(); // in case we ever come back here - m_spans.reserve(view_count); - if constexpr(view_count > 0) { - auto unpack_lambda = [&](Views&... view){ - ((m_spans.push_back(device_obj_view(&view.get_host_object(), - view.view_spans()))), - ...); - }; - std::apply(unpack_lambda, views); + /* The promise type that stores the views provided by the + * application task coroutine on the first co_yield. It subsequently + * tracks the state of the task when it moves from waiting for transfers + * to waiting for the submitted kernel to complete. */ + struct device_task_promise_type { + + /* do not suspend the coroutine on first invocation, we want to run + * the coroutine immediately and suspend when we get the device transfers. + */ + TTG_CXX_COROUTINE_NAMESPACE::suspend_never initial_suspend() { + m_state = ttg::device::detail::TTG_DEVICE_CORO_INIT; + return {}; } - m_state = TTG_DEVICE_CORO_WAIT_TRANSFER; - return {}; - } - - /* convenience-function to yield a single view */ - template - TTG_CXX_COROUTINE_NAMESPACE::suspend_always yield_value(View &view) { - auto tmp_tuple = std::tie(view); - return yield_value(tmp_tuple); - } - - /* convenience-function to yield a single view */ - template - TTG_CXX_COROUTINE_NAMESPACE::suspend_always yield_value(PersistentView &view) { - auto tmp_tuple = std::tie(view); - return yield_value(tmp_tuple); - } -#endif // 0 - /* waiting for the kernel to complete should always suspend */ - TTG_CXX_COROUTINE_NAMESPACE::suspend_always yield_value(device_op_wait_kernel) { - std::cout << "yield_value: device_op_wait_kernel" << std::endl; - m_state = TTG_DEVICE_CORO_WAIT_KERNEL; - return {}; - } - - /* Allow co_await on a tuple */ - template - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(std::tuple &views) { - return yield_value(views); - } - -#if 0 - /* convenience-function to await a single view */ - template - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(View &view) { - auto tmp_tuple = std::tie(view); - return yield_value(tmp_tuple); - } + /* suspend the coroutine at the end of the execution + * so we can access the promise. + * TODO: necessary? maybe we can save one suspend here + */ + TTG_CXX_COROUTINE_NAMESPACE::suspend_always final_suspend() noexcept { + m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + return {}; + } - /* convenience-function to await a single view */ - template - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(PersistentView &view) { - auto tmp_tuple = std::tie(view); - return yield_value(tmp_tuple); - } -#endif // 0 + /* Allow co_await on a tuple */ + template + TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(std::tuple &views) { + return yield_value(views); + } - /* co_await for the kernel to complete should always suspend */ - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(device_op_wait_kernel) { - std::cout << "yield_value: device_op_wait_kernel" << std::endl; - m_state = TTG_DEVICE_CORO_WAIT_KERNEL; - return {}; - } + template + TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(detail::to_device_t&& a) { + bool need_transfer = !(TTG_IMPL_NS::register_device_memory(a.ties)); + /* TODO: are we allowed to not suspend here and launch the kernel directly? */ + m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER; + return {}; + } - template - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(detail::to_device_t&& a) { - bool need_transfer = !(TTG_IMPL_NS::register_device_memory(a.ties)); - /* TODO: are we allowed to not suspend here and launch the kernel directly? */ - m_state = TTG_DEVICE_CORO_WAIT_TRANSFER; - return {}; - } + template + auto await_transform(detail::wait_kernel_t&& a) { + //std::cout << "yield_value: wait_kernel_t" << std::endl; + if constexpr (sizeof...(Ts) > 0) { + TTG_IMPL_NS::mark_device_out(a.ties); + } + m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL; + return a; + } - template - auto await_transform(detail::wait_kernel_t&& a) { - //std::cout << "yield_value: wait_kernel_t" << std::endl; - if constexpr (sizeof...(Ts) > 0) { - TTG_IMPL_NS::mark_device_out(a.ties); + TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(std::vector&& v) { + m_sends = std::forward>(v); + m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + return {}; } - m_state = TTG_DEVICE_CORO_WAIT_KERNEL; - return a; - } - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(std::vector&& v) { - m_sends = std::forward>(v); - m_state = TTG_DEVICE_CORO_SENDOUT; - return {}; - } + TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(device::detail::send_t&& v) { + m_sends.clear(); + m_sends.push_back(std::forward(v)); + m_state = ttg::device::detail::TTG_DEVICE_CORO_SENDOUT; + return {}; + } - TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(device::detail::send_t&& v) { - m_sends.clear(); - m_sends.push_back(std::forward(v)); - m_state = TTG_DEVICE_CORO_SENDOUT; - return {}; - } + void return_void() { + m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + } - void return_void() { - m_state = TTG_DEVICE_CORO_COMPLETE; - } + bool complete() const { + return m_state == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; + } - bool complete() const { - return m_state == TTG_DEVICE_CORO_COMPLETE; - } + ttg::device::Task get_return_object() { return {detail::device_task_handle_type::from_promise(*this)}; } - device_task get_return_object() { return device_task{device_task_handle_type::from_promise(*this)}; } + void unhandled_exception() { + std::cerr << "Task coroutine caught an unhandled exception!" << std::endl; + throw; // fwd + } - void unhandled_exception() { - std::cerr << "Task coroutine caught an unhandled exception!" << std::endl; - throw; // fwd - } + //using iterator = std::vector::iterator; - //using iterator = std::vector::iterator; + /* execute all pending send and broadcast operations */ + void do_sends() { + for (auto& send : m_sends) { + send.coro(); + } + m_sends.clear(); + } - /* execute all pending send and broadcast operations */ - void do_sends() { - for (auto& send : m_sends) { - send.coro(); + auto state() { + return m_state; } - m_sends.clear(); - } - auto state() { - return m_state; - } + private: + std::vector m_sends; + ttg_device_coro_state m_state = ttg::device::detail::TTG_DEVICE_CORO_STATE_NONE; - private: - std::vector m_sends; - ttg_device_coro_state m_state = TTG_DEVICE_CORO_STATE_NONE; + }; - }; + } // namespace detail - bool device_task::completed() { return base_type::promise().state() == TTG_DEVICE_CORO_COMPLETE; } + bool Task::completed() { return base_type::promise().state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; } struct device_wait_kernel { }; + /* NOTE: below is preliminary for reductions on the device, which is not available yet */ +#if 0 /************************** * Device reduction coros * **************************/ @@ -643,25 +578,25 @@ namespace ttg { /* The promise type that stores the views provided by the - * application task coroutine on the first co_yield. It subsequently - * tracks the state of the task when it moves from waiting for transfers - * to waiting for the submitted kernel to complete. */ + * application task coroutine on the first co_yield. It subsequently + * tracks the state of the task when it moves from waiting for transfers + * to waiting for the submitted kernel to complete. */ struct device_reducer_promise_type { /* do not suspend the coroutine on first invocation, we want to run - * the coroutine immediately and suspend when we get the device transfers. - */ + * the coroutine immediately and suspend when we get the device transfers. + */ TTG_CXX_COROUTINE_NAMESPACE::suspend_never initial_suspend() { - m_state = TTG_DEVICE_CORO_INIT; + m_state = ttg::device::detail::TTG_DEVICE_CORO_INIT; return {}; } /* suspend the coroutine at the end of the execution - * so we can access the promise. - * TODO: necessary? maybe we can save one suspend here - */ + * so we can access the promise. + * TODO: necessary? maybe we can save one suspend here + */ TTG_CXX_COROUTINE_NAMESPACE::suspend_always final_suspend() noexcept { - m_state = TTG_DEVICE_CORO_COMPLETE; + m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; return {}; } @@ -669,16 +604,16 @@ namespace ttg { TTG_CXX_COROUTINE_NAMESPACE::suspend_always await_transform(detail::to_device_t&& a) { bool need_transfer = !(TTG_IMPL_NS::register_device_memory(a.ties)); /* TODO: are we allowed to not suspend here and launch the kernel directly? */ - m_state = TTG_DEVICE_CORO_WAIT_TRANSFER; + m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER; return {}; } void return_void() { - m_state = TTG_DEVICE_CORO_COMPLETE; + m_state = ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; } bool complete() const { - return m_state == TTG_DEVICE_CORO_COMPLETE; + return m_state == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; } device_reducer get_return_object() { return device_reducer{device_reducer_handle_type::from_promise(*this)}; } @@ -691,12 +626,13 @@ namespace ttg { private: - ttg_device_coro_state m_state = TTG_DEVICE_CORO_STATE_NONE; + ttg::device::detail::ttg_device_coro_state m_state = ttg::device::detail::TTG_DEVICE_CORO_STATE_NONE; }; - bool device_reducer::completed() { return base_type::promise().state() == TTG_DEVICE_CORO_COMPLETE; } + bool device_reducer::completed() { return base_type::promise().state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE; } +#endif // 0 -} // namespace ttg +} // namespace ttg::devie #endif // TTG_DEVICE_TASK_H \ No newline at end of file diff --git a/ttg/ttg/make_tt.h b/ttg/ttg/make_tt.h index 8ba7f39b0..4d1f060b4 100644 --- a/ttg/ttg/make_tt.h +++ b/ttg/ttg/make_tt.h @@ -153,8 +153,8 @@ class CallableWrapTTArgs std::conditional_t, ttg::coroutine_handle<>, #ifdef TTG_HAVE_DEVICE - std::conditional_t, - ttg::device_task::base_type, + std::conditional_t, + ttg::device::Task::base_type, void> #else // TTG_HAVE_DEVICE void @@ -188,8 +188,8 @@ class CallableWrapTTArgs return coro_handle; } else #ifdef TTG_HAVE_DEVICE - if constexpr (std::is_same_v) { - ttg::device_task::base_type coro_handle = ret; + if constexpr (std::is_same_v) { + ttg::device::Task::base_type coro_handle = ret; return coro_handle; } #else // TTG_HAVE_DEVICE @@ -197,7 +197,7 @@ class CallableWrapTTArgs #endif // TTG_HAVE_DEVICE if constexpr (!(std::is_same_v #ifdef TTG_HAVE_DEVICE - || std::is_same_v + || std::is_same_v #endif // TTG_HAVE_DEVICE )) #endif diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 3355f648a..bed06d535 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -1347,7 +1347,7 @@ namespace ttg_parsec { task_t *task = (task_t*)gpu_task->ec; // get the device task from the coroutine handle - ttg::device_task dev_task = ttg::device_task_handle_type::from_address(task->suspended_task_address); + ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); task->dev_ptr->stream = gpu_stream; @@ -1357,8 +1357,8 @@ namespace ttg_parsec { auto dev_data = dev_task.promise(); /* we should still be waiting for the transfer to complete */ - assert(dev_data.state() == ttg::TTG_DEVICE_CORO_WAIT_TRANSFER || - dev_data.state() == ttg::TTG_DEVICE_CORO_WAIT_KERNEL); + assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER || + dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL); #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA) { @@ -1394,15 +1394,15 @@ namespace ttg_parsec { int rc = PARSEC_HOOK_RETURN_DONE; if (nullptr != task->suspended_task_address) { /* Get a new handle for the promise*/ - dev_task = ttg::device_task_handle_type::from_address(task->suspended_task_address); + dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); dev_data = dev_task.promise(); - assert(dev_data.state() == ttg::TTG_DEVICE_CORO_WAIT_KERNEL || - dev_data.state() == ttg::TTG_DEVICE_CORO_SENDOUT || - dev_data.state() == ttg::TTG_DEVICE_CORO_COMPLETE); + assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL || + dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT || + dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE); - if (ttg::TTG_DEVICE_CORO_SENDOUT == dev_data.state() || - ttg::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) { + if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() || + ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) { /* the task started sending so we won't come back here */ //std::cout << "device_static_submit task " << task << " complete" << std::endl; } else { @@ -1479,13 +1479,13 @@ namespace ttg_parsec { /* when we come back here, the flows in gpu_task are set (see register_device_memory) */ // get the device task from the coroutine handle - auto dev_task = ttg::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); // get the promise which contains the views - ttg::device_task_promise_type& dev_data = dev_task.promise(); + ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise(); /* for now make sure we're waiting for transfers and the coro hasn't skipped this step */ - assert(dev_data.state() == ttg::TTG_DEVICE_CORO_WAIT_TRANSFER); + assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER); /* set up a temporary task-class to correctly specify the flows */ parsec_task_class_t tc = *task->parsec_task.task_class; @@ -1603,7 +1603,7 @@ namespace ttg_parsec { } else { // resume the suspended coroutine #ifdef TTG_HAVE_DEVICE - auto coro = static_cast(ttg::device_task_handle_type::from_address(suspended_task_address)); + ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address); assert(detail::parsec_ttg_caller == nullptr); detail::parsec_ttg_caller = static_cast(task); // TODO: unify the outputs tls handling @@ -3554,16 +3554,16 @@ ttg::abort(); // should not happen //increment_data_versions(task, std::make_index_sequence>{}); // get the device task from the coroutine handle - auto dev_task = ttg::device_task_handle_type::from_address(task->suspended_task_address); + auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address); // get the promise which contains the views auto dev_data = dev_task.promise(); /* for now make sure we're waiting for the kernel to complete and the coro hasn't skipped this step */ - assert(dev_data.state() == ttg::TTG_DEVICE_CORO_SENDOUT); + assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT); /* execute the sends we stored */ - if (dev_data.state() == ttg::TTG_DEVICE_CORO_SENDOUT) { + if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) { /* set the current task, needed inside the sends */ detail::parsec_ttg_caller = task; dev_data.do_sends();