diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 8a1d151e6..68a36b4d0 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -41,7 +41,7 @@ jobs: -DCMAKE_CXX_STANDARD=20 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Install prerequisite MacOS packages if: ${{ matrix.os == 'macos-latest' }} @@ -72,7 +72,7 @@ jobs: message("::set-output name=timestamp::${current_date}") - name: Setup ccache cache files - uses: actions/cache@v1.1.0 + uses: actions/cache@v4 with: path: ${{github.workspace}}/build/.ccache key: ${{ matrix.config.name }}-ccache-${{ steps.ccache_cache_timestamp.outputs.timestamp }} diff --git a/ttg/ttg/buffer.h b/ttg/ttg/buffer.h index cc17f274c..0242bba39 100644 --- a/ttg/ttg/buffer.h +++ b/ttg/ttg/buffer.h @@ -1,13 +1,15 @@ #ifndef TTG_BUFFER_H #define TTG_BUFFER_H +#include + #include "ttg/fwd.h" #include "ttg/serialization.h" #include namespace ttg { -template> +template>> using Buffer = TTG_IMPL_NS::Buffer; namespace meta { diff --git a/ttg/ttg/device/device.h b/ttg/ttg/device/device.h index 244e9c944..6dcb3c722 100644 --- a/ttg/ttg/device/device.h +++ b/ttg/ttg/device/device.h @@ -4,21 +4,12 @@ #include "ttg/execution.h" #include "ttg/impl_selector.h" #include "ttg/fwd.h" +#include "ttg/util/meta.h" namespace ttg::device { -#if defined(TTG_HAVE_CUDA) - constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::CUDA; -#elif defined(TTG_HAVE_HIP) - constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::HIP; -#elif defined(TTG_HAVE_LEVEL_ZERO) - constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::L0; -#else - constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::Invalid; -#endif - /// Represents a device in a specific execution space class Device { int m_id = 0; @@ -74,52 +65,64 @@ namespace std { } } // namespace std -#if defined(TTG_HAVE_CUDA) -#include - namespace ttg::device { - namespace detail { - inline thread_local ttg::device::Device current_device_ts = {}; - inline thread_local cudaStream_t current_stream_ts = 0; // default stream - - inline void reset_current() { - current_device_ts = {}; - current_stream_ts = 0; - } - inline void set_current(int device, cudaStream_t stream) { - current_device_ts = ttg::device::Device(device, ttg::ExecutionSpace::CUDA); - current_stream_ts = stream; - } + namespace detail { + template + struct default_stream { + static constexpr const Stream value = 0; + }; + template + constexpr const Stream default_stream_v = default_stream::value; } // namespace detail - inline - Device current_device() { - return detail::current_device_ts; - } - - inline - cudaStream_t current_stream() { - return detail::current_stream_ts; - } } // namespace ttg +#if defined(TTG_HAVE_CUDA) +#include +namespace ttg::device { + constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::CUDA; + using Stream = cudaStream_t; +} // namespace ttg::device #elif defined(TTG_HAVE_HIP) - #include +namespace ttg::device { + constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::HIP; + using Stream = hipStream_t; +} // namespace ttg::device +#elif defined(TTG_HAVE_LEVEL_ZERO) +#include +namespace ttg::device { + constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::L0; + using Stream = std::add_reference_t; +} // namespace ttg::device +#else +namespace ttg::device { + struct Stream { }; + namespace detail { + template<> + struct default_stream { + static constexpr const Stream value = {}; + }; + } // namespace detail + constexpr ttg::ExecutionSpace available_execution_space = ttg::ExecutionSpace::Host; +} // namespace ttg::device +#endif namespace ttg::device { + +#if !defined(TTG_HAVE_LEVEL_ZERO) namespace detail { inline thread_local ttg::device::Device current_device_ts = {}; - inline thread_local hipStream_t current_stream_ts = 0; // default stream + inline thread_local Stream current_stream_ts = detail::default_stream_v; // default stream inline void reset_current() { current_device_ts = {}; - current_stream_ts = 0; + current_stream_ts = detail::default_stream_v; } - inline void set_current(int device, hipStream_t stream) { - current_device_ts = ttg::device::Device(device, ttg::ExecutionSpace::HIP); + inline void set_current(int device, Stream stream) { + current_device_ts = ttg::device::Device(device, available_execution_space); current_stream_ts = stream; } } // namespace detail @@ -130,16 +133,16 @@ namespace ttg::device { } inline - hipStream_t current_stream() { + Stream current_stream() { return detail::current_stream_ts; } -} // namespace ttg - -#elif defined(TTG_HAVE_LEVEL_ZERO) -#include + inline int num_devices() { + return TTG_IMPL_NS::num_devices(); + } -namespace ttg::device { +#else // TTG_HAVE_LEVEL_ZERO + /* SYCL needs special treatment because it uses pointers/references */ namespace detail { inline thread_local ttg::device::Device current_device_ts = {}; inline thread_local sycl::queue* current_stream_ts = nullptr; // default stream @@ -165,26 +168,6 @@ namespace ttg::device { sycl::queue& current_stream() { return *detail::current_stream_ts; } -} // namespace ttg - -#else +#endif // TTG_HAVE_LEVEL_ZERO -namespace ttg::device { - inline Device current_device() { - return {}; - } - - template - inline const void* current_stream() { - static_assert(Space != ttg::ExecutionSpace::Invalid, - "TTG was built without any known device support so we cannot provide a current stream!"); - return nullptr; - } } // namespace ttg -#endif // defined(TTG_HAVE_HIP) - -namespace ttg::device { - inline int num_devices() { - return TTG_IMPL_NS::num_devices(); - } -} diff --git a/ttg/ttg/device/task.h b/ttg/ttg/device/task.h index 8e2d14cfc..476119886 100644 --- a/ttg/ttg/device/task.h +++ b/ttg/ttg/device/task.h @@ -46,7 +46,7 @@ namespace ttg::device { template struct wait_kernel_t { - std::tuple ties; + std::tuple...> ties; /* always suspend */ constexpr bool await_ready() const noexcept { return false; } @@ -270,7 +270,7 @@ namespace ttg::device { /* overload for iterable types that extracts the type of the first element */ template struct broadcast_keylist_trait>> { - using key_type = decltype(*std::begin(std::get<0>(std::declval()))); + using key_type = decltype(*std::begin(std::declval())); }; template + template inline void broadcast(const std::tuple &keylists, valueT &&value) { using key_t = typename broadcast_keylist_trait< std::tuple_element_t...>> @@ -364,6 +363,70 @@ namespace ttg::device { ttg::device::detail::broadcast<0, I, Is...>(std::tie(kl), std::forward>(value)); } } + + /** + * broadcastk + */ + + template + inline void broadcastk(const std::tuple &keylists, + std::tuple...> &t) { + std::get(t).broadcast(std::get(keylists)); + if constexpr (sizeof...(Is) > 0) { + detail::broadcastk(keylists, t); + } + } + + template + inline void broadcastk(const std::tuple &keylists) { + using key_t = typename broadcast_keylist_trait< + std::tuple_element_t...>> + >::key_type; + auto *terminal_ptr = ttg::detail::get_out_terminal(I, "ttg::device::broadcastk(keylists)"); + terminal_ptr->broadcast(std::get(keylists)); + if constexpr (sizeof...(Is) > 0) { + ttg::device::detail::broadcastk(keylists); + } + } + + /* overload with explicit terminals */ + template + inline send_coro_state + broadcastk_coro(RangesT &&keylists, + std::tuple...> &t) { + RangesT kl = std::forward(keylists); // capture the keylist(s) + if constexpr (ttg::meta::is_tuple_v) { + // treat as tuple + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcastk<0, I, Is...>(kl, t); + } else if constexpr (!ttg::meta::is_tuple_v) { + // create a tie to the captured keylist + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcastk<0, I, Is...>(std::tie(kl), t); + } + } + + /* overload with implicit terminals */ + template + inline send_coro_state + broadcastk_coro(RangesT &&keylists) { + RangesT kl = std::forward(keylists); // capture the keylist(s) + if constexpr (ttg::meta::is_tuple_v) { + // treat as tuple + static_assert(sizeof...(Is)+1 == std::tuple_size_v, + "Size of keylist tuple must match the number of output terminals"); + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcastk<0, I, Is...>(kl); + } else if constexpr (!ttg::meta::is_tuple_v) { + // create a tie to the captured keylist + co_await ttg::Void{}; // we'll come back once the task is done + ttg::device::detail::broadcastk<0, I, Is...>(std::tie(kl)); + } + } } // namespace detail /* overload with explicit terminals and keylist passed by const reference */ @@ -389,11 +452,33 @@ namespace ttg::device { std::move(copy_handler))}; } + /* overload with explicit terminals and keylist passed by const reference */ + template + [[nodiscard]] + inline detail::send_t broadcastk(rangeT &&keylist, + std::tuple...> &t) { + ttg::detail::value_copy_handler copy_handler; + return detail::send_t{ + detail::broadcastk_coro(std::forward(keylist), t)}; + } + + /* overload with implicit terminals and keylist passed by const reference */ + template + inline detail::send_t broadcastk(rangeT &&keylist) { + if constexpr (std::is_rvalue_reference_v) { + return detail::send_t{detail::broadcastk_coro(std::forward(keylist))}; + } else { + return detail::send_t{detail::broadcastk_coro(std::tie(keylist))}; + } + } + template [[nodiscard]] std::vector forward(Args&&... args) { // TODO: check the cost of this! - return std::vector{std::forward(args)...}; + return std::vector{std::forward(args)...}; } /******************************************* diff --git a/ttg/ttg/func.h b/ttg/ttg/func.h index 4273e7c66..9e9830b0b 100644 --- a/ttg/ttg/func.h +++ b/ttg/ttg/func.h @@ -416,8 +416,7 @@ namespace ttg { std::get(t).broadcast(keylist, copy_handler(std::forward(value))); } - template + template inline void broadcast(std::size_t i, const rangeT &keylist, valueT &&value) { detail::value_copy_handler copy_handler; using key_t = decltype(*std::begin(keylist)); @@ -425,8 +424,7 @@ namespace ttg { terminal_ptr->broadcast(keylist, copy_handler(std::forward(value))); } - template + template inline void broadcast(const rangeT &keylist, valueT &&value) { broadcast(i, keylist, std::forward(value)); } @@ -505,7 +503,7 @@ namespace ttg { terminal_ptr->set_size(size); } - template + template inline std::enable_if_t, void> set_size(const keyT &key, const std::size_t size) { set_size(i, key, size); } diff --git a/ttg/ttg/parsec/buffer.h b/ttg/ttg/parsec/buffer.h index 1f658077c..3e76af2b3 100644 --- a/ttg/ttg/parsec/buffer.h +++ b/ttg/ttg/parsec/buffer.h @@ -110,65 +110,6 @@ namespace detail { } }; -#if 0 - /** - * derived from parsec_data_t managing an allocator for host memory - * and creating a host copy through a callback - */ - struct data_type : public parsec_data_t { - private: - [[no_unique_address]] - Allocator host_allocator; - - allocator_type& get_allocator_reference() { return static_cast(*this); } - - element_type* do_allocate(std::size_t n) { - return allocator_traits::allocate(get_allocator_reference(), n); - } - - public: - - explicit data_type(const Allocator& host_alloc = Allocator()) - : host_allocator(host_alloc) - { } - - void* allocate(std::size_t size) { - return do_allocate(size); - } - - void deallocate(void* ptr, std::size_t size) { - allocator_traits::deallocate(get_allocator_reference(), ptr, size); - } - }; - - /* Allocate the host copy for a given data_t */ - static parsec_data_copy_t* data_copy_allocate(parsec_data_t* pdata, int device) { - if (device != 0) return nullptr; // we only allocate for the host - data_type* data = static_cast(pdata); // downcast - data_copy_type* copy = PARSEC_OBJ_NEW(data_copy_type); - copy->device_private = data->allocate(data->nb_elts); - return copy; - } - /** - * Create the PaRSEC object infrastructure for the data copy type - */ - inline void data_construct(data_type* obj) - { - obj->allocate_cb = &data_copy_allocate; - } - - static void data_destruct(data_type* obj) - { - /* cleanup alloctor instance */ - obj->~data_type(); - } - - static constexpr PARSEC_OBJ_CLASS_INSTANCE(data_type, parsec_data_t, - data_copy_construct, - data_copy_destruct); - -#endif // 0 - /** * Create the PaRSEC object infrastructure for the data copy type */ @@ -187,6 +128,7 @@ namespace detail { data_copy_destruct); static parsec_data_t * create_data(std::size_t size, + ttg::scope scope, const allocator_type& allocator = allocator_type()) { parsec_data_t *data = PARSEC_OBJ_NEW(parsec_data_t); data->owner_device = 0; @@ -200,12 +142,13 @@ namespace detail { /* adjust data flags */ data->device_copies[0]->flags |= PARSEC_DATA_FLAG_PARSEC_MANAGED; data->device_copies[0]->coherency_state = PARSEC_DATA_COHERENCY_SHARED; - data->device_copies[0]->version = 1; + /* set the version to 0 if we don't want to transfer data to the device initially */ + data->device_copies[0]->version = (ttg::scope::SyncIn == scope) ? 1 : 0; return data; } - static parsec_data_t * create_data(PtrT& ptr, std::size_t size) { + static parsec_data_t * create_data(PtrT& ptr, std::size_t size, ttg::scope scope) { parsec_data_t *data = PARSEC_OBJ_NEW(parsec_data_t); data->owner_device = 0; data->nb_elts = size; @@ -218,7 +161,8 @@ namespace detail { /* adjust data flags */ data->device_copies[0]->flags |= PARSEC_DATA_FLAG_PARSEC_MANAGED; data->device_copies[0]->coherency_state = PARSEC_DATA_COHERENCY_SHARED; - data->device_copies[0]->version = 1; + /* set the version to 0 if we don't want to transfer data to the device initially */ + data->device_copies[0]->version = (ttg::scope::SyncIn == scope) ? 1 : 0; return data; } @@ -239,12 +183,14 @@ namespace detail { template struct Buffer { + /* TODO: add overloads for T[]? */ + using value_type = std::remove_all_extents_t; + using pointer_type = std::add_pointer_t; + using const_pointer_type = const std::remove_const_t*; using element_type = std::decay_t; static_assert(std::is_trivially_copyable_v, "Only trivially copyable types are supported for devices."); - static_assert(std::is_default_constructible_v, - "Only default constructible types are supported for devices."); private: using delete_fn_t = std::function; @@ -270,8 +216,8 @@ struct Buffer { Buffer() { } - Buffer(std::size_t n) - : m_data(detail::ttg_parsec_data_types::create_data(n*sizeof(element_type))) + Buffer(std::size_t n, ttg::scope scope = ttg::scope::SyncIn) + : m_data(detail::ttg_parsec_data_types::create_data(n*sizeof(element_type), scope)) , m_count(n) { } @@ -280,18 +226,18 @@ struct Buffer { * The shared_ptr will ensure that the memory is not free'd before * the runtime has released all of its references. */ - Buffer(std::shared_ptr ptr, std::size_t n) + Buffer(std::shared_ptr ptr, std::size_t n, ttg::scope scope = ttg::scope::SyncIn) : m_data(detail::ttg_parsec_data_types, detail::empty_allocator> - ::create_data(ptr, n*sizeof(element_type))) + ::create_data(ptr, n*sizeof(element_type), scope)) , m_count(n) { } template - Buffer(std::unique_ptr ptr, std::size_t n) + Buffer(std::unique_ptr ptr, std::size_t n, ttg::scope scope = ttg::scope::SyncIn) : m_data(detail::ttg_parsec_data_types, detail::empty_allocator> - ::create_data(ptr, n*sizeof(element_type))) + ::create_data(ptr, n*sizeof(element_type), scope)) , m_count(n) { } @@ -352,55 +298,55 @@ struct Buffer { } /* Get the pointer on the currently active device. */ - element_type* current_device_ptr() { + pointer_type current_device_ptr() { assert(is_valid()); int device_id = detail::ttg_device_to_parsec_device(ttg::device::current_device()); - return static_cast(m_data->device_copies[device_id]->device_private); + return static_cast(m_data->device_copies[device_id]->device_private); } /* Get the pointer on the currently active device. */ - const element_type* current_device_ptr() const { + const_pointer_type current_device_ptr() const { assert(is_valid()); int device_id = detail::ttg_device_to_parsec_device(ttg::device::current_device()); - return static_cast(m_data->device_copies[device_id]->device_private); + return static_cast(m_data->device_copies[device_id]->device_private); } /* Get the pointer on the owning device. * @note: This may not be the device assigned to the currently executing task. * See \ref ttg::device::current_device for that. */ - element_type* owner_device_ptr() { + pointer_type owner_device_ptr() { assert(is_valid()); - return static_cast(m_data->device_copies[m_data->owner_device]->device_private); + return static_cast(m_data->device_copies[m_data->owner_device]->device_private); } /* get the current device pointer */ - const element_type* owner_device_ptr() const { + const_pointer_type owner_device_ptr() const { assert(is_valid()); - return static_cast(m_data->device_copies[m_data->owner_device]->device_private); + return static_cast(m_data->device_copies[m_data->owner_device]->device_private); } /* get the device pointer at the given device */ - element_type* device_ptr_on(const ttg::device::Device& device) { + pointer_type device_ptr_on(const ttg::device::Device& device) { assert(is_valid()); int device_id = detail::ttg_device_to_parsec_device(device); - return static_cast(parsec_data_get_ptr(m_data, device_id)); + return static_cast(parsec_data_get_ptr(m_data, device_id)); } /* get the device pointer at the given device */ - const element_type* device_ptr_on(const ttg::device::Device& device) const { + const_pointer_type device_ptr_on(const ttg::device::Device& device) const { assert(is_valid()); int device_id = detail::ttg_device_to_parsec_device(device); - return static_cast(parsec_data_get_ptr(m_data, device_id)); + return static_cast(parsec_data_get_ptr(m_data, device_id)); } - element_type* host_ptr() { - return static_cast(parsec_data_get_ptr(m_data, 0)); + pointer_type host_ptr() { + return static_cast(parsec_data_get_ptr(m_data, 0)); } - const element_type* host_ptr() const { - return static_cast(parsec_data_get_ptr(m_data, 0)); + const_pointer_type host_ptr() const { + return static_cast(parsec_data_get_ptr(m_data, 0)); } bool is_valid_on(const ttg::device::Device& device) const { @@ -455,16 +401,26 @@ struct Buffer { return m_count; } - /* Reallocate the buffer with count elements */ - void reset(std::size_t n) { - release_data(); - - if (n > 0) { - m_data = detail::ttg_parsec_data_types - ::create_data(n*sizeof(element_type)); + /** + * Resets the scope of the buffer. + * If scope is SyncIn then the next time + * the buffer is made available on a device the host + * data will be copied from the host. + * If scope is Allocate then no data will be moved. + */ + void reset_scope(ttg::scope scope) { + if (scope == ttg::scope::Allocate) { + m_data->device_copies[0]->version = 0; + } else { + m_data->device_copies[0]->version = 1; + /* reset all other copies to force a sync-in */ + for (int i = 0; i < parsec_nb_devices; ++i) { + if (m_data->device_copies[i] != nullptr) { + m_data->device_copies[i]->version = 0; + } + } + m_data->owner_device = 0; } - - m_count = n; } void prefer_device(ttg::device::Device dev) { @@ -475,6 +431,17 @@ struct Buffer { } } + void add_device(ttg::device::Device dev, pointer_type ptr, bool is_current = false) { + if (is_valid_on(dev)) { + throw std::runtime_error("Unable to add device that has already a buffer set!"); + } + add_copy(detail::ttg_device_to_parsec_device(dev), ptr); + if (is_current) { + // mark the data as being current on the new device + parsec_data()->owner_device = detail::ttg_device_to_parsec_device(dev); + } + } + /* serialization support */ #ifdef TTG_SERIALIZATION_SUPPORTS_BOOST diff --git a/ttg/ttg/parsec/devicefunc.h b/ttg/ttg/parsec/devicefunc.h index d4a488673..080660205 100644 --- a/ttg/ttg/parsec/devicefunc.h +++ b/ttg/ttg/parsec/devicefunc.h @@ -35,38 +35,47 @@ namespace ttg_parsec { parsec_data_t* data = detail::get_parsec_data(view); /* TODO: check whether the device is current */ - auto access = PARSEC_FLOW_ACCESS_RW; - if constexpr (std::is_const_v) { - // keep the flow at RW if it was RW to make sure we pull the data back out eventually - //if (flows[I].flow_flags != PARSEC_FLOW_ACCESS_RW) { + + if (nullptr != data) { + auto access = PARSEC_FLOW_ACCESS_RW; + if constexpr (std::is_const_v) { + // keep the flow at RW if it was RW to make sure we pull the data back out eventually access = PARSEC_FLOW_ACCESS_READ; - //} - } else if constexpr (ttg::meta::is_devicescratch_v) { - if (view.scope() == ttg::scope::Allocate) { - access = PARSEC_FLOW_ACCESS_WRITE; + } else if constexpr (ttg::meta::is_devicescratch_v) { + if (view.scope() == ttg::scope::Allocate) { + access = PARSEC_FLOW_ACCESS_WRITE; + } } - } - - //std::cout << "register_device_memory task " << detail::parsec_ttg_caller << " data " << I << " " - // << data << " size " << data->nb_elts << std::endl; - - /* build the flow */ - /* TODO: reuse the flows of the task class? How can we control the sync direction then? */ - flows[I] = parsec_flow_t{.name = nullptr, - .sym_type = PARSEC_SYM_INOUT, - .flow_flags = static_cast(access), - .flow_index = I, - .flow_datatype_mask = ~0 }; - gpu_task->flow_nb_elts[I] = data->nb_elts; // size in bytes - gpu_task->flow[I] = &flows[I]; - - /* set the input data copy, parsec will take care of the transfer - * and the buffer will look at the parsec_data_t for the current pointer */ - //detail::parsec_ttg_caller->parsec_task.data[I].data_in = data->device_copies[data->owner_device]; - assert(nullptr != data->device_copies[0]->original); - caller->parsec_task.data[I].data_in = data->device_copies[0]; - caller->parsec_task.data[I].source_repo_entry = NULL; + /* build the flow */ + /* TODO: reuse the flows of the task class? How can we control the sync direction then? */ + flows[I] = parsec_flow_t{.name = nullptr, + .sym_type = PARSEC_SYM_INOUT, + .flow_flags = static_cast(access), + .flow_index = I, + .flow_datatype_mask = ~0 }; + + gpu_task->flow_nb_elts[I] = data->nb_elts; // size in bytes + gpu_task->flow[I] = &flows[I]; + + /* set the input data copy, parsec will take care of the transfer + * and the buffer will look at the parsec_data_t for the current pointer */ + //detail::parsec_ttg_caller->parsec_task.data[I].data_in = data->device_copies[data->owner_device]; + assert(nullptr != data->device_copies[0]->original); + caller->parsec_task.data[I].data_in = data->device_copies[0]; + caller->parsec_task.data[I].source_repo_entry = NULL; + + } else { + /* ignore the flow */ + flows[I] = parsec_flow_t{.name = nullptr, + .sym_type = PARSEC_FLOW_ACCESS_NONE, + .flow_flags = 0, + .flow_index = I, + .flow_datatype_mask = ~0 }; + gpu_task->flow[I] = &flows[I]; + gpu_task->flow_nb_elts[I] = 0; // size in bytes + caller->parsec_task.data[I].data_in = nullptr; + } if constexpr (sizeof...(Is) > 0) { is_current |= register_device_memory(views, std::index_sequence{}); @@ -114,16 +123,16 @@ namespace ttg_parsec { /* get_parsec_data is overloaded for buffer and devicescratch */ parsec_data_t* data = detail::get_parsec_data(view); - parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task; parsec_gpu_exec_stream_t *stream = detail::parsec_ttg_caller->dev_ptr->stream; /* enqueue the transfer into the compute stream to come back once the compute and transfer are complete */ - parsec_device_gpu_module_t *device_module = detail::parsec_ttg_caller->dev_ptr->device; - device_module->memcpy_async(device_module, stream, - data->device_copies[0]->device_private, - data->device_copies[data->owner_device]->device_private, - data->nb_elts, parsec_device_gpu_transfer_direction_d2h); - + if (data->owner_device != 0) { + parsec_device_gpu_module_t *device_module = detail::parsec_ttg_caller->dev_ptr->device; + device_module->memcpy_async(device_module, stream, + data->device_copies[0]->device_private, + data->device_copies[data->owner_device]->device_private, + data->nb_elts, parsec_device_gpu_transfer_direction_d2h); + } if constexpr (sizeof...(Is) > 0) { // recursion mark_device_out(views, std::index_sequence{}); diff --git a/ttg/ttg/parsec/fwd.h b/ttg/ttg/parsec/fwd.h index 0cd798e87..de7996962 100644 --- a/ttg/ttg/parsec/fwd.h +++ b/ttg/ttg/parsec/fwd.h @@ -55,7 +55,7 @@ namespace ttg_parsec { static void ttg_broadcast(ttg::World world, T &data, int source_rank); /* device definitions */ - template> + template>> struct Buffer; template diff --git a/ttg/ttg/parsec/task.h b/ttg/ttg/parsec/task.h index 656117b94..58ee528ad 100644 --- a/ttg/ttg/parsec/task.h +++ b/ttg/ttg/parsec/task.h @@ -162,7 +162,10 @@ namespace ttg_parsec { parsec_task.priority = 0; // TODO: can we avoid this? - for (int i = 0; i < MAX_PARAM_COUNT; ++i) { this->parsec_task.data[i].data_in = nullptr; } + for (int i = 0; i < MAX_PARAM_COUNT; ++i) { + this->parsec_task.data[i].data_in = nullptr; + this->parsec_task.data[i].data_out = nullptr; + } } parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, @@ -184,7 +187,10 @@ namespace ttg_parsec { parsec_task.chore_mask = 1<<0; // TODO: can we avoid this? - for (int i = 0; i < MAX_PARAM_COUNT; ++i) { this->parsec_task.data[i].data_in = nullptr; } + for (int i = 0; i < MAX_PARAM_COUNT; ++i) { + this->parsec_task.data[i].data_in = nullptr; + this->parsec_task.data[i].data_out = nullptr; + } } public: @@ -308,6 +314,15 @@ namespace ttg_parsec { } } + template + parsec_hook_return_t invoke_evaluate() { + if constexpr (Space == ttg::ExecutionSpace::Host) { + return PARSEC_HOOK_RETURN_DONE; + } else { + return TT::template device_static_evaluate(&this->parsec_task); + } + } + parsec_key_t pkey() { return 0; } }; diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 0e956ce56..2a2b852bb 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -147,8 +147,8 @@ namespace ttg_parsec { MSG_SET_ARGSTREAM_SIZE = 1, MSG_FINALIZE_ARGSTREAM_SIZE = 2, MSG_GET_FROM_PULL = 3 } fn_id_t; - uint32_t taskpool_id = -1; - uint64_t op_id = -1; + uint32_t taskpool_id = std::numeric_limits::max(); + uint64_t op_id = std::numeric_limits::max(); std::size_t key_offset = 0; fn_id_t fn_id = MSG_INVALID; std::int8_t num_iovecs = 0; @@ -335,7 +335,7 @@ namespace ttg_parsec { void create_tpool() { assert(nullptr == tpool); tpool = PARSEC_OBJ_NEW(parsec_taskpool_t); - tpool->taskpool_id = -1; + tpool->taskpool_id = std::numeric_limits::max(); tpool->update_nb_runtime_task = parsec_add_fetch_runtime_task; tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG; tpool->taskpool_name = strdup("TTG Taskpool"); @@ -702,13 +702,14 @@ namespace ttg_parsec { template inline ttg_data_copy_t *create_new_datacopy(Value &&value) { using value_type = std::decay_t; - ttg_data_copy_t *copy; - if constexpr (std::is_base_of_v, value_type>) { + ttg_data_copy_t *copy = nullptr; + if constexpr (std::is_base_of_v, value_type> && + std::is_constructible_v) { copy = new value_type(std::forward(value)); - } else if constexpr (std::is_rvalue_reference_v || - std::is_copy_constructible_v>) { + } else if constexpr (std::is_constructible_v, decltype(value)>) { copy = new ttg_data_value_copy_t(std::forward(value)); } else { + /* we have no way to create a new copy from this value */ throw std::logic_error("Trying to copy-construct data that is not copy-constructible!"); } #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND) @@ -947,9 +948,12 @@ namespace ttg_parsec { ttg_data_copy_t *copy_res = copy_in; bool replace = false; int32_t readers = copy_in->num_readers(); - assert(readers != 0); + /* try hard to defer writers if we cannot make copies + * if deferral fails we have to bail out */ + bool defer_writer = (!std::is_copy_constructible_v>) || task->defer_writer; + if (readonly && !copy_in->is_mutable()) { /* simply increment the number of readers */ readers = copy_in->increment_readers(); @@ -988,7 +992,7 @@ namespace ttg_parsec { * (current task) or there are others, in which we case won't * touch it. */ - if (1 == copy_in->num_readers() && !task->defer_writer) { + if (1 == copy_in->num_readers() && !defer_writer) { /** * no other readers, mark copy as mutable and defer the release * of the task @@ -998,9 +1002,10 @@ namespace ttg_parsec { std::atomic_thread_fence(std::memory_order_release); copy_in->mark_mutable(); } else { - if (task->defer_writer && nullptr == copy_in->get_next_task()) { + if (defer_writer && nullptr == copy_in->get_next_task()) { /* we're the first writer and want to wait for all readers to complete */ copy_res->set_next_task(&task->parsec_task); + task->defer_writer = true; } else { /* there are writers and/or waiting already of this copy already, make a copy that we can mutate */ copy_res = NULL; @@ -1479,36 +1484,6 @@ namespace ttg_parsec { return rc; } - static void - static_device_stage_in(parsec_gpu_task_t *gtask, - uint32_t flow_mask, - parsec_gpu_exec_stream_t *gpu_stream) { - /* register any memory that hasn't been registered yet */ - for (int i = 0; i < MAX_PARAM_COUNT; ++i) { - if (flow_mask & (1<ec; - parsec_data_copy_t *copy = task->parsec_task.data[i].data_in; - if (0 == (copy->flags & TTG_PARSEC_DATA_FLAG_REGISTERED)) { -#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) - // register host memory for faster device access - cudaError_t status; - //status = cudaHostRegister(copy->device_private, gtask->flow_nb_elts[i], cudaHostRegisterPortable); - //assert(cudaSuccess == status); -#endif // PARSEC_HAVE_DEV_CUDA_SUPPORT - //copy->flags |= TTG_PARSEC_DATA_FLAG_REGISTERED; - } - } - } - } - - static int - static_device_stage_in_hook(parsec_gpu_task_t *gtask, - uint32_t flow_mask, - parsec_gpu_exec_stream_t *gpu_stream) { - static_device_stage_in(gtask, flow_mask, gpu_stream); - return parsec_default_gpu_stage_in(gtask, flow_mask, gpu_stream); - } - template static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) { @@ -1522,7 +1497,7 @@ namespace ttg_parsec { PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t); gpu_task->ec = parsec_task; gpu_task->task_type = 0; // user task - gpu_task->last_data_check_epoch = -1; // used internally + gpu_task->last_data_check_epoch = 0; // used internally gpu_task->pushout = 0; gpu_task->submit = &TT::device_static_submit; @@ -1611,8 +1586,11 @@ namespace ttg_parsec { // get the promise which contains the views ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise(); - /* for now make sure we're waiting for transfers and the coro hasn't skipped this step */ - assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER); + if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT || + dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) { + /* the task jumped directly to send or returned so we're done */ + return PARSEC_HOOK_RETURN_DONE; + } parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device; assert(NULL != device); @@ -1628,7 +1606,7 @@ namespace ttg_parsec { if constexpr (Space == ttg::ExecutionSpace::CUDA) { /* TODO: we need custom staging functions because PaRSEC looks at the * task-class to determine the number of flows. */ - gpu_task->stage_in = static_device_stage_in_hook; + gpu_task->stage_in = parsec_default_gpu_stage_in; gpu_task->stage_out = parsec_default_gpu_stage_out; return parsec_device_kernel_scheduler(&device->super, es, gpu_task); } @@ -1637,7 +1615,7 @@ namespace ttg_parsec { #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT) case PARSEC_DEV_HIP: if constexpr (Space == ttg::ExecutionSpace::HIP) { - gpu_task->stage_in = static_device_stage_in_hook; + gpu_task->stage_in = parsec_default_gpu_stage_in; gpu_task->stage_out = parsec_default_gpu_stage_out; return parsec_device_kernel_scheduler(&device->super, es, gpu_task); } @@ -1646,7 +1624,7 @@ namespace ttg_parsec { #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) case PARSEC_DEV_LEVEL_ZERO: if constexpr (Space == ttg::ExecutionSpace::L0) { - gpu_task->stage_in = static_device_stage_in_hook; + gpu_task->stage_in = parsec_default_gpu_stage_in; gpu_task->stage_out = parsec_default_gpu_stage_out; return parsec_device_kernel_scheduler(&device->super, es, gpu_task); } @@ -2408,7 +2386,9 @@ namespace ttg_parsec { auto &reducer = std::get(input_reducers); bool release = false; bool remove_from_hash = true; +#if defined(PARSEC_PROF_GRAPHER) bool discover_task = true; +#endif bool get_pull_data = false; bool has_lock = false; /* If we have only one input and no reducer on that input we can skip the hash table */ @@ -2802,7 +2782,6 @@ namespace ttg_parsec { num_iovecs = std::distance(std::begin(iovs), std::end(iovs)); /* pack the metadata */ auto metadata = descr.get_metadata(*const_cast(value_ptr)); - size_t metadata_size = sizeof(metadata); pos = pack(metadata, msg->bytes, pos); //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl; write_header_fn(); @@ -2978,7 +2957,6 @@ namespace ttg_parsec { ttg::SplitMetadataDescriptor descr; /* pack the metadata */ auto metadata = descr.get_metadata(value); - size_t metadata_size = sizeof(metadata); pos = pack(metadata, msg->bytes, pos); auto iovs = descr.get_data(*const_cast(&value)); num_iovs = std::distance(std::begin(iovs), std::end(iovs)); @@ -3536,7 +3514,7 @@ namespace ttg_parsec { // Registers the callback for the i'th input terminal template void register_input_callback(terminalT &input) { - using valueT = typename terminalT::value_type; + using valueT = std::decay_t; if (input.is_pull_terminal) { num_pullins++; } @@ -3548,20 +3526,10 @@ namespace ttg_parsec { set_arg(key, std::forward(value)); }; auto send_callback = [this](const keyT &key, const valueT &value) { - if constexpr (std::is_copy_constructible_v) { - set_arg(key, value); - } - else { - throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id().pretty_name() + " which is not copy constructible, std::move datum into send statement"); - } + set_arg(key, value); }; auto broadcast_callback = [this](const ttg::span &keylist, const valueT &value) { - if constexpr (std::is_copy_constructible_v) { - broadcast_arg(keylist, value); - } - else { - throw std::logic_error(std::string("TTG::PaRSEC: broadcast_callback is invoked on datum of type ") + boost::typeindex::type_id().pretty_name() + " which is not copy constructible, broadcast is not possible with move-only type"); - } + broadcast_arg(keylist, value); }; auto prepare_send_callback = [this](const ttg::span &keylist, const valueT &value) { prepare_send(keylist, value); @@ -3760,14 +3728,17 @@ namespace ttg_parsec { // get the promise which contains the views auto dev_data = dev_task.promise(); - /* for now make sure we're waiting for the kernel to complete and the coro hasn't skipped this step */ - assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT); + assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT || + dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE); /* execute the sends we stored */ if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) { /* set the current task, needed inside the sends */ detail::parsec_ttg_caller = task; - dev_data.do_sends(); + auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor(); + task->tt->set_outputs_tls_ptr(); + dev_data.do_sends(); // all sends happen here + task->tt->set_outputs_tls_ptr(old_output_tls_ptr); detail::parsec_ttg_caller = nullptr; } } @@ -4391,15 +4362,16 @@ struct ttg::detail::value_copy_handler { template inline std::conditional_t,Value,Value&&> operator()(Value &&value) { constexpr auto value_is_rvref = std::is_rvalue_reference_v; + using value_type = std::remove_reference_t; static_assert(value_is_rvref || std::is_copy_constructible_v>, "Data sent without being moved must be copy-constructible!"); auto caller = ttg_parsec::detail::parsec_ttg_caller; if (nullptr == caller) { - ttg::print("ERROR: ttg::send or ttg::broadcast called outside of a task!\n"); + throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!"); } - using value_type = std::remove_reference_t; + ttg_parsec::detail::ttg_data_copy_t *copy; copy = ttg_parsec::detail::find_copy_in_task(caller, &value); value_type *value_ptr = &value; @@ -4432,7 +4404,7 @@ struct ttg::detail::value_copy_handler { inline std::add_lvalue_reference_t operator()(ttg_parsec::detail::persistent_value_ref vref) { auto caller = ttg_parsec::detail::parsec_ttg_caller; if (nullptr == caller) { - ttg::print("ERROR: ttg::send or ttg::broadcast called outside of a task!\n"); + throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!"); } ttg_parsec::detail::ttg_data_copy_t *copy; copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.value_ref); @@ -4450,11 +4422,9 @@ struct ttg::detail::value_copy_handler { template inline const Value &operator()(const Value &value) { - static_assert(std::is_copy_constructible_v>, - "Data sent without being moved must be copy-constructible!"); auto caller = ttg_parsec::detail::parsec_ttg_caller; if (nullptr == caller) { - ttg::print("ERROR: ttg::send or ttg::broadcast called outside of a task!\n"); + throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!"); } ttg_parsec::detail::ttg_data_copy_t *copy; copy = ttg_parsec::detail::find_copy_in_task(caller, &value); diff --git a/ttg/ttg/parsec/ttg_data_copy.h b/ttg/ttg/parsec/ttg_data_copy.h index 484dcc538..b79f794a2 100644 --- a/ttg/ttg/parsec/ttg_data_copy.h +++ b/ttg/ttg/parsec/ttg_data_copy.h @@ -206,6 +206,7 @@ namespace ttg_parsec { value_type m_value; template + requires(std::constructible_from) ttg_data_value_copy_t(T&& value) : ttg_data_copy_container_setter(this) , ttg_data_copy_t() diff --git a/ttg/ttg/util/dot.h b/ttg/ttg/util/dot.h index 5e0dea7f6..f562dcb49 100644 --- a/ttg/ttg/util/dot.h +++ b/ttg/ttg/util/dot.h @@ -57,13 +57,12 @@ namespace ttg { void ttfunc(TTBase *tt) { std::string ttnm = nodename(tt); - bool is_ttg = true; const TTBase *ttc = reinterpret_cast(tt); build_ttg_hierarchy(ttc); if(!tt->is_ttg()) { std::stringstream ttss; - + ttss << " " << ttnm << " [shape=record,style=filled,fillcolor=gray90,label=\"{"; size_t count = 0;