From e8855799319c857c57bebf28de6a055bfd123e16 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Tue, 9 Jan 2024 15:50:33 -0600 Subject: [PATCH] Tightening up collective operation semantics - flyby: small_vector tweaks --- .../datastructures/detail/small_vector.hpp | 34 ++-- libs/core/futures/src/future_data.cpp | 53 +++--- .../include/hpx/lcos_local/and_gate.hpp | 25 ++- .../include/hpx/collectives/all_gather.hpp | 8 +- .../include/hpx/collectives/all_reduce.hpp | 8 +- .../include/hpx/collectives/all_to_all.hpp | 8 +- .../include/hpx/collectives/broadcast.hpp | 10 +- .../hpx/collectives/detail/communicator.hpp | 168 +++++++++++++++--- .../hpx/collectives/exclusive_scan.hpp | 10 +- .../include/hpx/collectives/gather.hpp | 12 +- .../hpx/collectives/inclusive_scan.hpp | 8 +- .../include/hpx/collectives/reduce.hpp | 13 +- .../include/hpx/collectives/scatter.hpp | 12 +- .../collectives/src/create_communicator.cpp | 7 +- .../tests/unit/communication_set.cpp | 2 +- 15 files changed, 270 insertions(+), 108 deletions(-) diff --git a/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp b/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp index ea7e69202f58..cebec47403c6 100644 --- a/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp +++ b/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp @@ -175,7 +175,7 @@ namespace hpx::detail { } // only void* is allowed to be converted to uintptr_t - void* ptr = ::operator new(offset_to_data + sizeof(T) * capacity); + void* ptr = ::operator new(mem); if (nullptr == ptr) { throw std::bad_alloc(); @@ -319,9 +319,13 @@ namespace hpx::detail { { // indirect -> direct auto* storage = indirect(); - uninitialized_move_and_destroy( - storage->data(), direct_data(), storage->size()); - set_direct_and_size(storage->size()); + auto const data_size = storage->size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + storage->data(), direct_data(), data_size); + set_direct_and_size(data_size); + } detail::storage::dealloc(storage); } } @@ -332,16 +336,26 @@ namespace hpx::detail { if (is_direct()) { // direct -> indirect - uninitialized_move_and_destroy(data(), - storage->data(), size()); - storage->size(size()); + auto const data_size = size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + data(), storage->data(), + data_size); + storage->size(data_size); + } } else { // indirect -> indirect - uninitialized_move_and_destroy(data(), - storage->data(), size()); - storage->size(size()); + auto const data_size = size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + data(), storage->data(), + data_size); + storage->size(data_size); + } detail::storage::dealloc(indirect()); } set_indirect(storage); diff --git a/libs/core/futures/src/future_data.cpp b/libs/core/futures/src/future_data.cpp index a05f4d577be7..28d8eda865d3 100644 --- a/libs/core/futures/src/future_data.cpp +++ b/libs/core/futures/src/future_data.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2023 Hartmut Kaiser +// Copyright (c) 2015-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -22,13 +22,14 @@ #include #include -#include #include #include namespace hpx::lcos::detail { - static run_on_completed_error_handler_type run_on_completed_error_handler; + namespace { + run_on_completed_error_handler_type run_on_completed_error_handler; + } void set_run_on_completed_error_handler( run_on_completed_error_handler_type f) @@ -66,16 +67,12 @@ namespace hpx::lcos::detail { /////////////////////////////////////////////////////////////////////////// template - static void run_on_completed_on_new_thread(Callback&& f) + void run_on_completed_on_new_thread(Callback&& f) { lcos::local::futures_factory p(HPX_FORWARD(Callback, f)); - bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr(); + HPX_ASSERT(nullptr != hpx::threads::get_self_ptr()); hpx::launch policy = launch::fork; - if (!is_hpx_thread) - { - policy = launch::async; - } policy.set_priority(threads::thread_priority::boost); policy.set_stacksize(threads::thread_stacksize::current); @@ -84,17 +81,12 @@ namespace hpx::lcos::detail { threads::thread_id_ref_type const tid = //-V821 p.post("run_on_completed_on_new_thread", policy); - // wait for the task to run - if (is_hpx_thread) - { - // make sure this thread is executed last - this_thread::suspend( - threads::thread_schedule_state::pending, tid.noref()); - return p.get_future().get(); - } + // make sure this thread is executed last + this_thread::suspend( + threads::thread_schedule_state::pending, tid.noref()); - // If we are not on a HPX thread, we need to return immediately, to - // allow the newly spawned thread to execute. + // wait for the task to run + return p.get_future().get(); } /////////////////////////////////////////////////////////////////////////// @@ -124,15 +116,13 @@ namespace hpx::lcos::detail { } auto const state = this->state_.load(std::memory_order_acquire); - if (state != this->empty) + if (state != future_data_base::empty) { return false; } // this thread would block on the future - - auto* thrd = get_thread_id_data(runs_child); - HPX_UNUSED(thrd); // might be unused + [[maybe_unused]] auto* thrd = get_thread_id_data(runs_child); LTM_(debug).format("task_object::get_result_void: attempting to " "directly execute child({}), description({})", @@ -161,8 +151,6 @@ namespace hpx::lcos::detail { return false; } - static util::unused_type unused_; - util::unused_type* future_data_base::get_result_void( void const* storage, error_code& ec) @@ -190,6 +178,7 @@ namespace hpx::lcos::detail { if (s == value) { + static util::unused_type unused_; return &unused_; } @@ -232,12 +221,12 @@ namespace hpx::lcos::detail { hpx::scoped_annotation annotate(on_completed); HPX_MOVE(on_completed)(); }, - [&](std::exception_ptr ep) { + [&](std::exception_ptr const& ep) { // If the completion handler throws an exception, there's // nothing we can do, report the exception and terminate. if (run_on_completed_error_handler) { - run_on_completed_error_handler(HPX_MOVE(ep)); + run_on_completed_error_handler(ep); } else { @@ -272,7 +261,9 @@ namespace hpx::lcos::detail { cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH || (hpx::threads::get_self_ptr() == nullptr); #endif - if (!recurse_asynchronously) + + bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr(); + if (!is_hpx_thread || !recurse_asynchronously) { // directly execute continuation on this thread run_on_completed(HPX_FORWARD(Callback, on_completed)); @@ -289,17 +280,17 @@ namespace hpx::lcos::detail { run_on_completed_on_new_thread(util::deferred_call( p, HPX_FORWARD(Callback, on_completed))); }, - [&](std::exception_ptr ep) { + [&](std::exception_ptr const& ep) { // If an exception while creating the new task or inside the // completion handler is thrown, there is nothing we can do... // ... but terminate and report the error if (run_on_completed_error_handler) { - run_on_completed_error_handler(HPX_MOVE(ep)); + run_on_completed_error_handler(ep); } else { - std::rethrow_exception(HPX_MOVE(ep)); + std::rethrow_exception(ep); } }); } diff --git a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp index deeaeffc5e3b..8ee7ebf0a133 100644 --- a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp +++ b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp @@ -170,7 +170,7 @@ namespace hpx::lcos::local { protected: // Set the data which has to go into the segment \a which. template - bool set(std::size_t which, OuterLock outer_lock, F&& f, + bool set(std::size_t which, OuterLock& outer_lock, F&& f, error_code& ec = throws) { HPX_ASSERT_OWNS_LOCK(outer_lock); @@ -224,15 +224,12 @@ namespace hpx::lcos::local { std::decay_t>) { // invoke callback with the outer lock being held - HPX_FORWARD(F, f)(outer_lock, *this); + HPX_FORWARD(F, f)(outer_lock, *this, ec); } - outer_lock.unlock(); return true; } } - - outer_lock.unlock(); return false; } @@ -242,7 +239,7 @@ namespace hpx::lcos::local { { hpx::no_mutex mtx; std::unique_lock lk(mtx); - return set(which, HPX_MOVE(lk), HPX_FORWARD(F, f), ec); + return set(which, lk, HPX_FORWARD(F, f), ec); } protected: @@ -324,7 +321,8 @@ namespace hpx::lcos::local { public: template - std::size_t next_generation(Lock& l, std::size_t new_generation) + std::size_t next_generation( + Lock& l, std::size_t new_generation, error_code& ec = throws) { HPX_ASSERT_OWNS_LOCK(l); @@ -335,10 +333,11 @@ namespace hpx::lcos::local { if (new_generation < generation_) { l.unlock(); - HPX_THROW_EXCEPTION(hpx::error::invalid_status, + HPX_THROWS_IF(ec, hpx::error::invalid_status, "and_gate::next_generation", "sequencing error, new generational counter value too " "small"); + return generation_; } generation_ = new_generation; } @@ -351,10 +350,11 @@ namespace hpx::lcos::local { } std::size_t next_generation( - std::size_t new_generation = static_cast(-1)) + std::size_t new_generation = static_cast(-1), + error_code& ec = throws) { std::unique_lock l(mtx_); - return next_generation(l, new_generation); + return next_generation(l, new_generation, ec); } template @@ -441,11 +441,10 @@ namespace hpx::lcos::local { } template - bool set(std::size_t which, Lock l, F&& f = nullptr, + bool set(std::size_t which, Lock& l, F&& f = nullptr, error_code& ec = hpx::throws) { - return this->base_type::set( - which, HPX_MOVE(l), HPX_FORWARD(F, f), ec); + return this->base_type::set(which, l, HPX_FORWARD(F, f), ec); } template diff --git a/libs/full/collectives/include/hpx/collectives/all_gather.hpp b/libs/full/collectives/include/hpx/collectives/all_gather.hpp index 519547a15eab..650783da016f 100644 --- a/libs/full/collectives/include/hpx/collectives/all_gather.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_gather.hpp @@ -149,11 +149,15 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name< + communication::all_gather_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked after all data has been received) - [](auto& data, auto&) { return data; }); + [](auto& data, auto&, std::size_t) { return data; }); } }; } // namespace hpx::traits diff --git a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp index 7f2c5abaf80f..37aa440e65d2 100644 --- a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp @@ -156,13 +156,17 @@ namespace hpx::traits { std::size_t generation, T&& t, F&& op) { return communicator.template handle_data>( + communication::communicator_name< + communication::all_reduce_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked non-concurrently after all data has been // received) [op = HPX_FORWARD(F, op)]( - auto& data, bool& data_available) mutable { + auto& data, bool& data_available, std::size_t) mutable { HPX_ASSERT(!data.empty()); if (!data_available && data.size() > 1) { diff --git a/libs/full/collectives/include/hpx/collectives/all_to_all.hpp b/libs/full/collectives/include/hpx/collectives/all_to_all.hpp index 1f3c8597ad2b..353deaa705c6 100644 --- a/libs/full/collectives/include/hpx/collectives/all_to_all.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_to_all.hpp @@ -151,11 +151,15 @@ namespace hpx::traits { std::size_t generation, std::vector&& t) { return communicator.template handle_data>( + communication::communicator_name< + communication::all_to_all_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_MOVE(t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_MOVE(t); + }, // finalizer (invoked after all data has been received) - [which](auto& data, auto&) { + [](auto& data, auto&, std::size_t which) { // slice the overall data based on the locality id of the // requesting site std::vector result; diff --git a/libs/full/collectives/include/hpx/collectives/broadcast.hpp b/libs/full/collectives/include/hpx/collectives/broadcast.hpp index 6a8afdfa3197..3a383a26482f 100644 --- a/libs/full/collectives/include/hpx/collectives/broadcast.hpp +++ b/libs/full/collectives/include/hpx/collectives/broadcast.hpp @@ -233,11 +233,13 @@ namespace hpx::traits { using data_type = typename Result::result_type; return communicator.template handle_data( + communication::communicator_name< + communication::broadcast_tag>(), which, generation, // no step function nullptr, // finalizer (invoked after all sites have checked in) - [](auto& data, auto&) { + [](auto& data, auto&, std::size_t) { return Communicator::template handle_bool( data[0]); }, @@ -249,11 +251,13 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name< + communication::broadcast_tag>(), which, generation, // step function (invoked once for set) - [&](auto& data) { data[0] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t) { data[0] = HPX_FORWARD(T, t); }, // finalizer (invoked after all sites have checked in) - [](auto& data, auto&) { + [](auto& data, auto&, std::size_t) { return Communicator::template handle_bool>( data[0]); }, diff --git a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp index d4d06612f5ba..28d85536bc42 100644 --- a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Hartmut Kaiser +// Copyright (c) 2020-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -89,6 +90,8 @@ namespace hpx::collectives::detail { std::size_t, std::size_t, char const*) noexcept { } + + ~logging_helper() = default; #endif logging_helper(logging_helper const&) = delete; @@ -165,6 +168,12 @@ namespace hpx::collectives::detail { }; private: + std::size_t get_num_sites(std::size_t num_values) const noexcept + { + return num_values == static_cast(-1) ? num_sites_ : + num_values; + } + // re-initialize data template void reinitialize_data(std::size_t num_values) @@ -174,10 +183,8 @@ namespace hpx::collectives::detail { needs_initialization_ = false; data_available_ = false; - auto const new_size = - num_values == static_cast(-1) ? num_sites_ : - num_values; - auto* data = hpx::any_cast>(&data_); + auto const new_size = get_num_sites(num_values); + auto const* data = hpx::any_cast>(&data_); if (data == nullptr || data->size() < new_size) { data_ = std::vector(new_size); @@ -201,6 +208,8 @@ namespace hpx::collectives::detail { { needs_initialization_ = true; data_available_ = false; + on_ready_count_ = 0; + current_operation_ = nullptr; } } @@ -212,8 +221,7 @@ namespace hpx::collectives::detail { auto sf = gate_.get_shared_future(l); traits::detail::get_shared_state(sf)->reserve_callbacks( - capacity == static_cast(-1) ? num_sites_ : - capacity); + get_num_sites(capacity)); auto fut = sf.then(hpx::launch::sync, HPX_FORWARD(F, f)); @@ -225,30 +233,94 @@ namespace hpx::collectives::detail { return fut; } + template + struct on_exit + { + explicit constexpr on_exit(F&& f_) noexcept + : f(HPX_MOVE(f_)) + { + } + + on_exit(on_exit const&) = delete; + on_exit(on_exit&&) = delete; + on_exit& operator=(on_exit const&) = delete; + on_exit& operator=(on_exit&&) = delete; + + ~on_exit() + { + f(); + } + + F f; + }; + // Step will be invoked under lock for each site that checks in (either // set or get). // // Finalizer will be invoked under lock after all sites have checked in. template - auto handle_data(std::size_t which, std::size_t generation, - [[maybe_unused]] Step&& step, Finalizer&& finalizer, + auto handle_data(char const* operation, std::size_t which, + std::size_t generation, [[maybe_unused]] Step&& step, + Finalizer&& finalizer, std::size_t num_values = static_cast(-1)) { - auto on_ready = [this, num_values, + auto on_ready = [this, operation, which, num_values, finalizer = HPX_FORWARD(Finalizer, finalizer)]( shared_future&& f) mutable { + // This callback will be invoked once for each participating + // site after all sites have checked in. + f.get(); // propagate any exceptions + // It does not matter whether the lock will be acquired here. It + // either is still being held by the surrounding logic or is + // re-acquired here (if `on_ready` happens to run on a new + // thread asynchronously). + std::unique_lock l(mtx_, std::try_to_lock); + + // Verify that there is no overlap between different types of + // operations on the same communicator. + if (current_operation_ == nullptr || + std::strcmp(current_operation_, operation) != 0) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data::on_ready", + "sequencing error, operation type mismatch: invoked " + "for {}, ongoing operation {}", + operation, + current_operation_ ? current_operation_ : "unknown"); + } + + // Verify that the number of invocations of this callback is in + // the expected range. + if (on_ready_count_ >= num_sites_) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data::on_ready", + "sequencing error, an excessive number of on_ready " + "callbacks have been invoked before the end of the " + "collective {} operation. Expected count {}, received " + "count {}.", + operation, on_ready_count_, num_sites_); + } + + // On exit, keep track of number of invocations of this + // callback. + on_exit _([this] { ++on_ready_count_; }); + if constexpr (!std::is_same_v>) { // call provided finalizer return HPX_FORWARD(Finalizer, finalizer)( - access_data(num_values), data_available_); + access_data(num_values), data_available_, which); } else { HPX_UNUSED(this); + HPX_UNUSED(which); HPX_UNUSED(num_values); HPX_UNUSED(finalizer); } @@ -257,23 +329,73 @@ namespace hpx::collectives::detail { std::unique_lock l(mtx_); [[maybe_unused]] util::ignore_while_checking il(&l); + // Verify that there is no overlap between different types of + // operations on the same communicator. + if (current_operation_ == nullptr) + { + if (on_ready_count_ != 0) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data", + "sequencing error, on_ready callback was already " + "invoked before the start of the collective {} " + "operation", + operation); + } + current_operation_ = operation; + } + else if (std::strcmp(current_operation_, operation) != 0) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data", + "sequencing error, operation type mismatch: invoked for " + "{}, ongoing operation {}", + operation, current_operation_); + } + auto f = get_future_and_synchronize( generation, num_values, HPX_MOVE(on_ready), l); if constexpr (!std::is_same_v>) { // call provided step function for each invocation site - HPX_FORWARD(Step, step)(access_data(num_values)); + HPX_FORWARD(Step, step)(access_data(num_values), which); } // Make sure next generation is enabled only after previous // generation has finished executing. - // - // set() consumes the lock - gate_.set( - which, HPX_MOVE(l), [this, generation](auto& l, auto& gate) { - gate.next_generation(l, generation); - this->invalidate_data(l); + gate_.set(which, l, + [this, operation, generation]( + auto& l, auto& gate, error_code& ec) { + // This callback is invoked synchronously once for each + // collective operation after all data has been received and + // all (shared) futures were triggered. + + HPX_ASSERT_OWNS_LOCK(l); + + // Verify that all `on_ready` callbacks have finished + // executing at this point. + if (on_ready_count_ != num_sites_) + { + l.unlock(); + HPX_THROWS_IF(ec, hpx::error::invalid_status, + "communicator::handle_data", + "sequencing error, not all on_ready callbacks have " + "been invoked at the end of the collective {} " + "operation. Expected count {}, received count {}.", + operation, on_ready_count_, num_sites_); + return; + } + + // Reset communicator state before proceeding to the next + // generation. + invalidate_data(l); + + // Release threads possibly waiting for the next generation + // to be handled. + gate.next_generation(l, generation, ec); }); return f; @@ -281,7 +403,7 @@ namespace hpx::collectives::detail { // protect against vector idiosyncrasies template - static constexpr decltype(auto) handle_bool(Data&& data) + static constexpr decltype(auto) handle_bool(Data&& data) noexcept { if constexpr (std::is_same_v) { @@ -298,10 +420,12 @@ namespace hpx::collectives::detail { mutex_type mtx_; hpx::unique_any_nonser data_; - lcos::local::and_gate gate_; + hpx::lcos::local::and_gate gate_; std::size_t const num_sites_; - bool needs_initialization_; - bool data_available_; + std::size_t on_ready_count_ = 0; + char const* current_operation_ = nullptr; + bool needs_initialization_ = true; + bool data_available_ = false; }; } // namespace hpx::collectives::detail diff --git a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp index c6bfbbf157fb..97665ccfe679 100644 --- a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp +++ b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp @@ -169,13 +169,17 @@ namespace hpx::traits { std::size_t generation, T&& t, F&& op) { return communicator.template handle_data>( + communication::communicator_name< + communication::exclusive_scan_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked non-concurrently after all data has been // received) - [which, op = HPX_FORWARD(F, op)]( - auto& data, bool& data_available) mutable { + [op = HPX_FORWARD(F, op)](auto& data, bool& data_available, + std::size_t which) mutable { if (!data_available) { std::vector> dest; diff --git a/libs/full/collectives/include/hpx/collectives/gather.hpp b/libs/full/collectives/include/hpx/collectives/gather.hpp index 0409833cc7fe..c5fcca65dbf9 100644 --- a/libs/full/collectives/include/hpx/collectives/gather.hpp +++ b/libs/full/collectives/include/hpx/collectives/gather.hpp @@ -250,11 +250,14 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked once for get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked once after all data has been received) - [](auto& data, bool&) { return HPX_MOVE(data); }); + [](auto& data, bool&, std::size_t) { return HPX_MOVE(data); }); } template @@ -262,9 +265,12 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked for each set) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // no finalizer nullptr); } diff --git a/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp b/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp index 63adff323451..ddd4d23d11cf 100644 --- a/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp +++ b/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp @@ -159,10 +159,12 @@ namespace hpx::traits { return communicator.template handle_data>( which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked after all data has been received) - [which, op = HPX_FORWARD(F, op)]( - auto& data, bool& data_available) mutable { + [op = HPX_FORWARD(F, op)](auto& data, bool& data_available, + std::size_t which) mutable { if (!data_available) { std::vector> dest; diff --git a/libs/full/collectives/include/hpx/collectives/reduce.hpp b/libs/full/collectives/include/hpx/collectives/reduce.hpp index 670500e948fd..d8b780b73046 100644 --- a/libs/full/collectives/include/hpx/collectives/reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/reduce.hpp @@ -253,11 +253,15 @@ namespace hpx::traits { std::size_t generation, T&& t, F&& op) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked once for get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked once after all data has been received) - [op = HPX_FORWARD(F, op)](auto& data, bool&) mutable { + [op = HPX_FORWARD(F, op)]( + auto& data, bool&, std::size_t) mutable { HPX_ASSERT(!data.empty()); if (data.size() > 1) { @@ -276,9 +280,12 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked for each set) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [t = HPX_FORWARD(T, t)](auto& data, std::size_t which) mutable { + data[which] = HPX_FORWARD(T, t); + }, // no finalizer nullptr); } diff --git a/libs/full/collectives/include/hpx/collectives/scatter.hpp b/libs/full/collectives/include/hpx/collectives/scatter.hpp index 85f2908f02a4..2b727501a0e1 100644 --- a/libs/full/collectives/include/hpx/collectives/scatter.hpp +++ b/libs/full/collectives/include/hpx/collectives/scatter.hpp @@ -244,12 +244,13 @@ namespace hpx::traits { { using data_type = typename Result::result_type; - return communicator.template handle_data(which, - generation, + return communicator.template handle_data( + communication::communicator_name(), + which, generation, // step function (invoked once for get) nullptr, // finalizer (invoked after all sites have checked in) - [which](auto& data, bool&) { + [](auto& data, bool&, std::size_t which) { return Communicator::template handle_bool( HPX_MOVE(data[which])); }); @@ -260,11 +261,12 @@ namespace hpx::traits { std::size_t generation, std::vector&& t) { return communicator.template handle_data( + communication::communicator_name(), which, generation, // step function (invoked once for set) - [&](auto& data) { data = HPX_MOVE(t); }, + [&t](auto& data, std::size_t) { data = HPX_MOVE(t); }, // finalizer (invoked after all sites have checked in) - [which](auto& data, bool&) { + [](auto& data, bool&, std::size_t which) { return Communicator::template handle_bool( HPX_MOVE(data[which])); }); diff --git a/libs/full/collectives/src/create_communicator.cpp b/libs/full/collectives/src/create_communicator.cpp index 7f13d2a17b36..c2f97c9cb0e5 100644 --- a/libs/full/collectives/src/create_communicator.cpp +++ b/libs/full/collectives/src/create_communicator.cpp @@ -50,8 +50,6 @@ namespace hpx::collectives { communicator_server::communicator_server() noexcept //-V730 : num_sites_(0) - , needs_initialization_(false) - , data_available_(false) { HPX_ASSERT(false); // shouldn't ever be called } @@ -59,10 +57,9 @@ namespace hpx::collectives { communicator_server::communicator_server(std::size_t num_sites) noexcept : gate_(num_sites) , num_sites_(num_sites) - , needs_initialization_(true) - , data_available_(false) { - HPX_ASSERT(num_sites != 0); + HPX_ASSERT( + num_sites != 0 && num_sites != static_cast(-1)); } } // namespace detail diff --git a/libs/full/collectives/tests/unit/communication_set.cpp b/libs/full/collectives/tests/unit/communication_set.cpp index e4b55f16ef14..4083353c85ef 100644 --- a/libs/full/collectives/tests/unit/communication_set.cpp +++ b/libs/full/collectives/tests/unit/communication_set.cpp @@ -119,7 +119,7 @@ namespace hpx::traits { l); which = communicator.which_++; - communicator.gate_.set(which, std::move(l)); + communicator.gate_.set(which, l); return f; }