From 50b3cc3b53ef4cb6a10f1b18e7d68ff6729ee404 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Tue, 9 Jan 2024 15:50:33 -0600 Subject: [PATCH] Tightening up collective operation semantics - flyby: small_vector tweaks --- .../datastructures/detail/small_vector.hpp | 38 ++-- .../tests/unit/small_vector.cpp | 73 ++++---- libs/core/futures/src/future_data.cpp | 53 +++--- .../include/hpx/lcos_local/and_gate.hpp | 25 ++- .../include/hpx/collectives/all_gather.hpp | 12 +- .../include/hpx/collectives/all_reduce.hpp | 12 +- .../include/hpx/collectives/all_to_all.hpp | 12 +- .../include/hpx/collectives/broadcast.hpp | 14 +- .../hpx/collectives/detail/communicator.hpp | 168 +++++++++++++++--- .../hpx/collectives/exclusive_scan.hpp | 14 +- .../include/hpx/collectives/gather.hpp | 16 +- .../hpx/collectives/inclusive_scan.hpp | 14 +- .../include/hpx/collectives/reduce.hpp | 17 +- .../include/hpx/collectives/scatter.hpp | 16 +- .../collectives/src/create_communicator.cpp | 7 +- .../tests/unit/communication_set.cpp | 2 +- 16 files changed, 329 insertions(+), 164 deletions(-) diff --git a/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp b/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp index ea7e69202f58..d6379378d138 100644 --- a/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp +++ b/libs/core/datastructures/include/hpx/datastructures/detail/small_vector.hpp @@ -175,7 +175,7 @@ namespace hpx::detail { } // only void* is allowed to be converted to uintptr_t - void* ptr = ::operator new(offset_to_data + sizeof(T) * capacity); + void* ptr = ::operator new(mem); if (nullptr == ptr) { throw std::bad_alloc(); @@ -319,9 +319,13 @@ namespace hpx::detail { { // indirect -> direct auto* storage = indirect(); - uninitialized_move_and_destroy( - storage->data(), direct_data(), storage->size()); - set_direct_and_size(storage->size()); + auto const data_size = storage->size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + storage->data(), direct_data(), data_size); + } + set_direct_and_size(data_size); detail::storage::dealloc(storage); } } @@ -332,16 +336,26 @@ namespace hpx::detail { if (is_direct()) { // direct -> indirect - uninitialized_move_and_destroy(data(), - storage->data(), size()); - storage->size(size()); + auto const data_size = size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + data(), storage->data(), + data_size); + } + storage->size(data_size); } else { // indirect -> indirect - uninitialized_move_and_destroy(data(), - storage->data(), size()); - storage->size(size()); + auto const data_size = size(); + if (data_size != 0) + { + uninitialized_move_and_destroy( + data(), storage->data(), + data_size); + } + storage->size(data_size); detail::storage::dealloc(indirect()); } set_indirect(storage); @@ -657,7 +671,7 @@ namespace hpx::detail { set_direct_and_size(0); } - // performs a const_cast so we don't need this implementation twice + // performs a const_cast, so we don't need this implementation twice template [[nodiscard]] auto at(std::size_t idx) const -> T& { @@ -785,7 +799,7 @@ namespace hpx::detail { if (&other == this) { // It doesn't seem to be required to do self-check, but let's do - // it anyways to be safe + // it anyway to be safe return *this; } diff --git a/libs/core/datastructures/tests/unit/small_vector.cpp b/libs/core/datastructures/tests/unit/small_vector.cpp index 765005d12d27..61eea3bebb12 100644 --- a/libs/core/datastructures/tests/unit/small_vector.cpp +++ b/libs/core/datastructures/tests/unit/small_vector.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 Hartmut Kaiser +// Copyright (c) 2021-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -36,18 +36,18 @@ namespace test { simple_allocator() noexcept = default; template - simple_allocator(simple_allocator const&) noexcept + explicit simple_allocator(simple_allocator const&) noexcept { } - T* allocate(std::size_t n) + static T* allocate(std::size_t n) { return reinterpret_cast(::new char[sizeof(T) * n]); } - void deallocate(T* p, std::size_t) noexcept + static void deallocate(T* p, std::size_t) noexcept { - delete[](reinterpret_cast(p)); + delete[] (reinterpret_cast(p)); } friend bool operator==( @@ -204,7 +204,7 @@ namespace test { void small_vector_test() { - // basic test with less elements than static size + // basic test with fewer elements than static size { using sm5_t = hpx::detail::small_vector; static_assert( @@ -214,7 +214,7 @@ namespace test { sm5.push_back(1); HPX_TEST_EQ(sm5[0], 1); - sm5_t sm5_copy(sm5); + sm5_t const sm5_copy(sm5); HPX_TEST(sm5 == sm5_copy); } { @@ -226,7 +226,7 @@ namespace test { sm7.push_back(1); HPX_TEST_EQ(sm7[0], 1); - sm7_t sm7_copy(sm7); + sm7_t const sm7_copy(sm7); HPX_TEST(sm7 == sm7_copy); } { @@ -240,7 +240,7 @@ namespace test { sm5.push_back(2); HPX_TEST_EQ(sm5[1], 2); - HPX_TEST_EQ(sm5.size(), std::size_t(2)); + HPX_TEST_EQ(sm5.size(), static_cast(2)); sm5_copy = sm5; HPX_TEST(sm5 == sm5_copy); @@ -274,7 +274,7 @@ namespace test { sm2.push_back(3); HPX_TEST_EQ(sm2[1], 2); HPX_TEST_EQ(sm2[2], 3); - HPX_TEST_EQ(sm2.size(), std::size_t(3)); + HPX_TEST_EQ(sm2.size(), static_cast(3)); sm2_copy = sm2; HPX_TEST(sm2 == sm2_copy); @@ -307,13 +307,13 @@ namespace test { for (std::size_t i = 0, max = v.capacity() + 1; i != max; ++i) { - v.push_back(int(i)); + v.push_back(static_cast(i)); } vec w; - vec v_copy(v); - vec w_copy(w); + vec const v_copy(v); + vec const w_copy(w); v.swap(w); HPX_TEST(v == w_copy); @@ -324,13 +324,13 @@ namespace test { vec v; for (std::size_t i = 0, max = v.capacity() - 1; i != max; ++i) { - v.push_back(int(i)); + v.push_back(static_cast(i)); } vec w; - vec v_copy(v); - vec w_copy(w); + vec const v_copy(v); + vec const w_copy(w); v.swap(w); HPX_TEST(v == w_copy); @@ -341,17 +341,17 @@ namespace test { vec v; for (std::size_t i = 0, max = v.capacity() - 1; i != max; ++i) { - v.push_back(int(i)); + v.push_back(static_cast(i)); } vec w; for (std::size_t i = 0, max = v.capacity() / 2; i != max; ++i) { - w.push_back(int(i)); + w.push_back(static_cast(i)); } - vec v_copy(v); - vec w_copy(w); + vec const v_copy(v); + vec const w_copy(w); v.swap(w); HPX_TEST(v == w_copy); @@ -362,17 +362,17 @@ namespace test { vec v; for (std::size_t i = 0, max = v.capacity() + 1; i != max; ++i) { - v.push_back(int(i)); + v.push_back(static_cast(i)); } vec w; for (std::size_t i = 0, max = v.capacity() * 2; i != max; ++i) { - w.push_back(int(i)); + w.push_back(static_cast(i)); } - vec v_copy(v); - vec w_copy(w); + vec const v_copy(v); + vec const w_copy(w); v.swap(w); HPX_TEST(v == w_copy); @@ -448,7 +448,6 @@ namespace test { void vector_test() { using value_type = typename Vector::value_type; - constexpr int max = 100; test_range_insertion(); @@ -480,6 +479,7 @@ namespace test { test::check_equal_containers(vector2, v); } { + constexpr int max = 100; Vector vector; std::vector v; @@ -534,8 +534,9 @@ namespace test { auto insert_it = vector.insert(vector.end(), &aux_vect[0], aux_vect + 50); - HPX_TEST_EQ(std::size_t(std::distance(insert_it, vector.end())), - std::size_t(50)); + HPX_TEST_EQ(static_cast( + std::distance(insert_it, vector.end())), + static_cast(50)); v.insert(v.end(), aux_vect2, aux_vect2 + 50); test::check_equal_containers(vector, v); @@ -581,9 +582,9 @@ namespace test { { //push_back with not enough capacity value_type push_back_this(1); vector.push_back(std::move(push_back_this)); - v.push_back(int(1)); + v.push_back(static_cast(1)); vector.push_back(value_type(1)); - v.push_back(int(1)); + v.push_back(static_cast(1)); test::check_equal_containers(vector, v); } @@ -601,9 +602,9 @@ namespace test { value_type push_back_this(1); vector.push_back(std::move(push_back_this)); - v.push_back(int(1)); + v.push_back(static_cast(1)); vector.push_back(value_type(1)); - v.push_back(int(1)); + v.push_back(static_cast(1)); test::check_equal_containers(vector, v); } @@ -617,7 +618,7 @@ namespace test { vector.insert(vector.begin(), std::move(insert_this)); v.insert(v.begin(), i); vector.insert(vector.begin(), value_type(i)); - v.insert(v.begin(), int(i)); + v.insert(v.begin(), static_cast(i)); } test::check_equal_containers(vector, v); @@ -631,7 +632,7 @@ namespace test { // Test insertion from list { - std::list l(50, int(1)); + std::list l(50, static_cast(1)); auto it_insert = vector.insert(vector.begin(), l.begin(), l.end()); HPX_TEST(vector.begin() == it_insert); @@ -643,7 +644,7 @@ namespace test { v.assign(l.begin(), l.end()); test::check_equal_containers(vector, v); - std::forward_list fl(50, int(1)); + std::forward_list fl(50, static_cast(1)); vector.clear(); v.clear(); vector.assign(fl.begin(), fl.end()); @@ -663,7 +664,7 @@ namespace test { class emplace_int { public: - emplace_int( + explicit emplace_int( int a = 0, int b = 0, int c = 0, int d = 0, int e = 0) noexcept : a_(a) , b_(b) @@ -726,7 +727,7 @@ namespace test { } int a_, b_, c_, d_, e_; - int padding[6]; + int padding[6] = {}; }; static emplace_int expected[10]; diff --git a/libs/core/futures/src/future_data.cpp b/libs/core/futures/src/future_data.cpp index a05f4d577be7..28d8eda865d3 100644 --- a/libs/core/futures/src/future_data.cpp +++ b/libs/core/futures/src/future_data.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2023 Hartmut Kaiser +// Copyright (c) 2015-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -22,13 +22,14 @@ #include #include -#include #include #include namespace hpx::lcos::detail { - static run_on_completed_error_handler_type run_on_completed_error_handler; + namespace { + run_on_completed_error_handler_type run_on_completed_error_handler; + } void set_run_on_completed_error_handler( run_on_completed_error_handler_type f) @@ -66,16 +67,12 @@ namespace hpx::lcos::detail { /////////////////////////////////////////////////////////////////////////// template - static void run_on_completed_on_new_thread(Callback&& f) + void run_on_completed_on_new_thread(Callback&& f) { lcos::local::futures_factory p(HPX_FORWARD(Callback, f)); - bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr(); + HPX_ASSERT(nullptr != hpx::threads::get_self_ptr()); hpx::launch policy = launch::fork; - if (!is_hpx_thread) - { - policy = launch::async; - } policy.set_priority(threads::thread_priority::boost); policy.set_stacksize(threads::thread_stacksize::current); @@ -84,17 +81,12 @@ namespace hpx::lcos::detail { threads::thread_id_ref_type const tid = //-V821 p.post("run_on_completed_on_new_thread", policy); - // wait for the task to run - if (is_hpx_thread) - { - // make sure this thread is executed last - this_thread::suspend( - threads::thread_schedule_state::pending, tid.noref()); - return p.get_future().get(); - } + // make sure this thread is executed last + this_thread::suspend( + threads::thread_schedule_state::pending, tid.noref()); - // If we are not on a HPX thread, we need to return immediately, to - // allow the newly spawned thread to execute. + // wait for the task to run + return p.get_future().get(); } /////////////////////////////////////////////////////////////////////////// @@ -124,15 +116,13 @@ namespace hpx::lcos::detail { } auto const state = this->state_.load(std::memory_order_acquire); - if (state != this->empty) + if (state != future_data_base::empty) { return false; } // this thread would block on the future - - auto* thrd = get_thread_id_data(runs_child); - HPX_UNUSED(thrd); // might be unused + [[maybe_unused]] auto* thrd = get_thread_id_data(runs_child); LTM_(debug).format("task_object::get_result_void: attempting to " "directly execute child({}), description({})", @@ -161,8 +151,6 @@ namespace hpx::lcos::detail { return false; } - static util::unused_type unused_; - util::unused_type* future_data_base::get_result_void( void const* storage, error_code& ec) @@ -190,6 +178,7 @@ namespace hpx::lcos::detail { if (s == value) { + static util::unused_type unused_; return &unused_; } @@ -232,12 +221,12 @@ namespace hpx::lcos::detail { hpx::scoped_annotation annotate(on_completed); HPX_MOVE(on_completed)(); }, - [&](std::exception_ptr ep) { + [&](std::exception_ptr const& ep) { // If the completion handler throws an exception, there's // nothing we can do, report the exception and terminate. if (run_on_completed_error_handler) { - run_on_completed_error_handler(HPX_MOVE(ep)); + run_on_completed_error_handler(ep); } else { @@ -272,7 +261,9 @@ namespace hpx::lcos::detail { cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH || (hpx::threads::get_self_ptr() == nullptr); #endif - if (!recurse_asynchronously) + + bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr(); + if (!is_hpx_thread || !recurse_asynchronously) { // directly execute continuation on this thread run_on_completed(HPX_FORWARD(Callback, on_completed)); @@ -289,17 +280,17 @@ namespace hpx::lcos::detail { run_on_completed_on_new_thread(util::deferred_call( p, HPX_FORWARD(Callback, on_completed))); }, - [&](std::exception_ptr ep) { + [&](std::exception_ptr const& ep) { // If an exception while creating the new task or inside the // completion handler is thrown, there is nothing we can do... // ... but terminate and report the error if (run_on_completed_error_handler) { - run_on_completed_error_handler(HPX_MOVE(ep)); + run_on_completed_error_handler(ep); } else { - std::rethrow_exception(HPX_MOVE(ep)); + std::rethrow_exception(ep); } }); } diff --git a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp index deeaeffc5e3b..8ee7ebf0a133 100644 --- a/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp +++ b/libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp @@ -170,7 +170,7 @@ namespace hpx::lcos::local { protected: // Set the data which has to go into the segment \a which. template - bool set(std::size_t which, OuterLock outer_lock, F&& f, + bool set(std::size_t which, OuterLock& outer_lock, F&& f, error_code& ec = throws) { HPX_ASSERT_OWNS_LOCK(outer_lock); @@ -224,15 +224,12 @@ namespace hpx::lcos::local { std::decay_t>) { // invoke callback with the outer lock being held - HPX_FORWARD(F, f)(outer_lock, *this); + HPX_FORWARD(F, f)(outer_lock, *this, ec); } - outer_lock.unlock(); return true; } } - - outer_lock.unlock(); return false; } @@ -242,7 +239,7 @@ namespace hpx::lcos::local { { hpx::no_mutex mtx; std::unique_lock lk(mtx); - return set(which, HPX_MOVE(lk), HPX_FORWARD(F, f), ec); + return set(which, lk, HPX_FORWARD(F, f), ec); } protected: @@ -324,7 +321,8 @@ namespace hpx::lcos::local { public: template - std::size_t next_generation(Lock& l, std::size_t new_generation) + std::size_t next_generation( + Lock& l, std::size_t new_generation, error_code& ec = throws) { HPX_ASSERT_OWNS_LOCK(l); @@ -335,10 +333,11 @@ namespace hpx::lcos::local { if (new_generation < generation_) { l.unlock(); - HPX_THROW_EXCEPTION(hpx::error::invalid_status, + HPX_THROWS_IF(ec, hpx::error::invalid_status, "and_gate::next_generation", "sequencing error, new generational counter value too " "small"); + return generation_; } generation_ = new_generation; } @@ -351,10 +350,11 @@ namespace hpx::lcos::local { } std::size_t next_generation( - std::size_t new_generation = static_cast(-1)) + std::size_t new_generation = static_cast(-1), + error_code& ec = throws) { std::unique_lock l(mtx_); - return next_generation(l, new_generation); + return next_generation(l, new_generation, ec); } template @@ -441,11 +441,10 @@ namespace hpx::lcos::local { } template - bool set(std::size_t which, Lock l, F&& f = nullptr, + bool set(std::size_t which, Lock& l, F&& f = nullptr, error_code& ec = hpx::throws) { - return this->base_type::set( - which, HPX_MOVE(l), HPX_FORWARD(F, f), ec); + return this->base_type::set(which, l, HPX_FORWARD(F, f), ec); } template diff --git a/libs/full/collectives/include/hpx/collectives/all_gather.hpp b/libs/full/collectives/include/hpx/collectives/all_gather.hpp index 519547a15eab..59c3aa2bf7b0 100644 --- a/libs/full/collectives/include/hpx/collectives/all_gather.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_gather.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023 Hartmut Kaiser +// Copyright (c) 2019-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -149,11 +149,15 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name< + communication::all_gather_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked after all data has been received) - [](auto& data, auto&) { return data; }); + [](auto& data, auto&, std::size_t) { return data; }); } }; } // namespace hpx::traits @@ -199,7 +203,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp index 7f2c5abaf80f..cb659bd2006b 100644 --- a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023 Hartmut Kaiser +// Copyright (c) 2019-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -156,13 +156,17 @@ namespace hpx::traits { std::size_t generation, T&& t, F&& op) { return communicator.template handle_data>( + communication::communicator_name< + communication::all_reduce_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked non-concurrently after all data has been // received) [op = HPX_FORWARD(F, op)]( - auto& data, bool& data_available) mutable { + auto& data, bool& data_available, std::size_t) mutable { HPX_ASSERT(!data.empty()); if (!data_available && data.size() > 1) { @@ -219,7 +223,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/all_to_all.hpp b/libs/full/collectives/include/hpx/collectives/all_to_all.hpp index 1f3c8597ad2b..3f7467939e1b 100644 --- a/libs/full/collectives/include/hpx/collectives/all_to_all.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_to_all.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023 Hartmut Kaiser +// Copyright (c) 2019-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -151,11 +151,15 @@ namespace hpx::traits { std::size_t generation, std::vector&& t) { return communicator.template handle_data>( + communication::communicator_name< + communication::all_to_all_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_MOVE(t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_MOVE(t); + }, // finalizer (invoked after all data has been received) - [which](auto& data, auto&) { + [](auto& data, auto&, std::size_t which) { // slice the overall data based on the locality id of the // requesting site std::vector result; @@ -209,7 +213,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/broadcast.hpp b/libs/full/collectives/include/hpx/collectives/broadcast.hpp index 6a8afdfa3197..b424b6ddfaa1 100644 --- a/libs/full/collectives/include/hpx/collectives/broadcast.hpp +++ b/libs/full/collectives/include/hpx/collectives/broadcast.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Hartmut Kaiser +// Copyright (c) 2020-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -233,11 +233,13 @@ namespace hpx::traits { using data_type = typename Result::result_type; return communicator.template handle_data( + communication::communicator_name< + communication::broadcast_tag>(), which, generation, // no step function nullptr, // finalizer (invoked after all sites have checked in) - [](auto& data, auto&) { + [](auto& data, auto&, std::size_t) { return Communicator::template handle_bool( data[0]); }, @@ -249,11 +251,13 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name< + communication::broadcast_tag>(), which, generation, // step function (invoked once for set) - [&](auto& data) { data[0] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t) { data[0] = HPX_FORWARD(T, t); }, // finalizer (invoked after all sites have checked in) - [](auto& data, auto&) { + [](auto& data, auto&, std::size_t) { return Communicator::template handle_bool>( data[0]); }, @@ -298,7 +302,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp index d4d06612f5ba..28d85536bc42 100644 --- a/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/detail/communicator.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Hartmut Kaiser +// Copyright (c) 2020-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -89,6 +90,8 @@ namespace hpx::collectives::detail { std::size_t, std::size_t, char const*) noexcept { } + + ~logging_helper() = default; #endif logging_helper(logging_helper const&) = delete; @@ -165,6 +168,12 @@ namespace hpx::collectives::detail { }; private: + std::size_t get_num_sites(std::size_t num_values) const noexcept + { + return num_values == static_cast(-1) ? num_sites_ : + num_values; + } + // re-initialize data template void reinitialize_data(std::size_t num_values) @@ -174,10 +183,8 @@ namespace hpx::collectives::detail { needs_initialization_ = false; data_available_ = false; - auto const new_size = - num_values == static_cast(-1) ? num_sites_ : - num_values; - auto* data = hpx::any_cast>(&data_); + auto const new_size = get_num_sites(num_values); + auto const* data = hpx::any_cast>(&data_); if (data == nullptr || data->size() < new_size) { data_ = std::vector(new_size); @@ -201,6 +208,8 @@ namespace hpx::collectives::detail { { needs_initialization_ = true; data_available_ = false; + on_ready_count_ = 0; + current_operation_ = nullptr; } } @@ -212,8 +221,7 @@ namespace hpx::collectives::detail { auto sf = gate_.get_shared_future(l); traits::detail::get_shared_state(sf)->reserve_callbacks( - capacity == static_cast(-1) ? num_sites_ : - capacity); + get_num_sites(capacity)); auto fut = sf.then(hpx::launch::sync, HPX_FORWARD(F, f)); @@ -225,30 +233,94 @@ namespace hpx::collectives::detail { return fut; } + template + struct on_exit + { + explicit constexpr on_exit(F&& f_) noexcept + : f(HPX_MOVE(f_)) + { + } + + on_exit(on_exit const&) = delete; + on_exit(on_exit&&) = delete; + on_exit& operator=(on_exit const&) = delete; + on_exit& operator=(on_exit&&) = delete; + + ~on_exit() + { + f(); + } + + F f; + }; + // Step will be invoked under lock for each site that checks in (either // set or get). // // Finalizer will be invoked under lock after all sites have checked in. template - auto handle_data(std::size_t which, std::size_t generation, - [[maybe_unused]] Step&& step, Finalizer&& finalizer, + auto handle_data(char const* operation, std::size_t which, + std::size_t generation, [[maybe_unused]] Step&& step, + Finalizer&& finalizer, std::size_t num_values = static_cast(-1)) { - auto on_ready = [this, num_values, + auto on_ready = [this, operation, which, num_values, finalizer = HPX_FORWARD(Finalizer, finalizer)]( shared_future&& f) mutable { + // This callback will be invoked once for each participating + // site after all sites have checked in. + f.get(); // propagate any exceptions + // It does not matter whether the lock will be acquired here. It + // either is still being held by the surrounding logic or is + // re-acquired here (if `on_ready` happens to run on a new + // thread asynchronously). + std::unique_lock l(mtx_, std::try_to_lock); + + // Verify that there is no overlap between different types of + // operations on the same communicator. + if (current_operation_ == nullptr || + std::strcmp(current_operation_, operation) != 0) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data::on_ready", + "sequencing error, operation type mismatch: invoked " + "for {}, ongoing operation {}", + operation, + current_operation_ ? current_operation_ : "unknown"); + } + + // Verify that the number of invocations of this callback is in + // the expected range. + if (on_ready_count_ >= num_sites_) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data::on_ready", + "sequencing error, an excessive number of on_ready " + "callbacks have been invoked before the end of the " + "collective {} operation. Expected count {}, received " + "count {}.", + operation, on_ready_count_, num_sites_); + } + + // On exit, keep track of number of invocations of this + // callback. + on_exit _([this] { ++on_ready_count_; }); + if constexpr (!std::is_same_v>) { // call provided finalizer return HPX_FORWARD(Finalizer, finalizer)( - access_data(num_values), data_available_); + access_data(num_values), data_available_, which); } else { HPX_UNUSED(this); + HPX_UNUSED(which); HPX_UNUSED(num_values); HPX_UNUSED(finalizer); } @@ -257,23 +329,73 @@ namespace hpx::collectives::detail { std::unique_lock l(mtx_); [[maybe_unused]] util::ignore_while_checking il(&l); + // Verify that there is no overlap between different types of + // operations on the same communicator. + if (current_operation_ == nullptr) + { + if (on_ready_count_ != 0) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data", + "sequencing error, on_ready callback was already " + "invoked before the start of the collective {} " + "operation", + operation); + } + current_operation_ = operation; + } + else if (std::strcmp(current_operation_, operation) != 0) + { + l.unlock(); + HPX_THROW_EXCEPTION(hpx::error::invalid_status, + "communicator::handle_data", + "sequencing error, operation type mismatch: invoked for " + "{}, ongoing operation {}", + operation, current_operation_); + } + auto f = get_future_and_synchronize( generation, num_values, HPX_MOVE(on_ready), l); if constexpr (!std::is_same_v>) { // call provided step function for each invocation site - HPX_FORWARD(Step, step)(access_data(num_values)); + HPX_FORWARD(Step, step)(access_data(num_values), which); } // Make sure next generation is enabled only after previous // generation has finished executing. - // - // set() consumes the lock - gate_.set( - which, HPX_MOVE(l), [this, generation](auto& l, auto& gate) { - gate.next_generation(l, generation); - this->invalidate_data(l); + gate_.set(which, l, + [this, operation, generation]( + auto& l, auto& gate, error_code& ec) { + // This callback is invoked synchronously once for each + // collective operation after all data has been received and + // all (shared) futures were triggered. + + HPX_ASSERT_OWNS_LOCK(l); + + // Verify that all `on_ready` callbacks have finished + // executing at this point. + if (on_ready_count_ != num_sites_) + { + l.unlock(); + HPX_THROWS_IF(ec, hpx::error::invalid_status, + "communicator::handle_data", + "sequencing error, not all on_ready callbacks have " + "been invoked at the end of the collective {} " + "operation. Expected count {}, received count {}.", + operation, on_ready_count_, num_sites_); + return; + } + + // Reset communicator state before proceeding to the next + // generation. + invalidate_data(l); + + // Release threads possibly waiting for the next generation + // to be handled. + gate.next_generation(l, generation, ec); }); return f; @@ -281,7 +403,7 @@ namespace hpx::collectives::detail { // protect against vector idiosyncrasies template - static constexpr decltype(auto) handle_bool(Data&& data) + static constexpr decltype(auto) handle_bool(Data&& data) noexcept { if constexpr (std::is_same_v) { @@ -298,10 +420,12 @@ namespace hpx::collectives::detail { mutex_type mtx_; hpx::unique_any_nonser data_; - lcos::local::and_gate gate_; + hpx::lcos::local::and_gate gate_; std::size_t const num_sites_; - bool needs_initialization_; - bool data_available_; + std::size_t on_ready_count_ = 0; + char const* current_operation_ = nullptr; + bool needs_initialization_ = true; + bool data_available_ = false; }; } // namespace hpx::collectives::detail diff --git a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp index c6bfbbf157fb..121bd9e7983c 100644 --- a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp +++ b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023 Hartmut Kaiser +// Copyright (c) 2019-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -169,13 +169,17 @@ namespace hpx::traits { std::size_t generation, T&& t, F&& op) { return communicator.template handle_data>( + communication::communicator_name< + communication::exclusive_scan_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked non-concurrently after all data has been // received) - [which, op = HPX_FORWARD(F, op)]( - auto& data, bool& data_available) mutable { + [op = HPX_FORWARD(F, op)](auto& data, bool& data_available, + std::size_t which) mutable { if (!data_available) { std::vector> dest; @@ -236,7 +240,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/gather.hpp b/libs/full/collectives/include/hpx/collectives/gather.hpp index 0409833cc7fe..ac85cea4e128 100644 --- a/libs/full/collectives/include/hpx/collectives/gather.hpp +++ b/libs/full/collectives/include/hpx/collectives/gather.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2014-2023 Hartmut Kaiser +// Copyright (c) 2014-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -250,11 +250,14 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked once for get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked once after all data has been received) - [](auto& data, bool&) { return HPX_MOVE(data); }); + [](auto& data, bool&, std::size_t) { return HPX_MOVE(data); }); } template @@ -262,9 +265,12 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked for each set) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // no finalizer nullptr); } @@ -312,7 +318,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp b/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp index 63adff323451..df3c5ce7ec87 100644 --- a/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp +++ b/libs/full/collectives/include/hpx/collectives/inclusive_scan.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023 Hartmut Kaiser +// Copyright (c) 2019-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -157,12 +157,16 @@ namespace hpx::traits { std::size_t generation, T&& t, F&& op) { return communicator.template handle_data>( + communication::communicator_name< + communication::inclusive_scan_tag>(), which, generation, // step function (invoked for each get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked after all data has been received) - [which, op = HPX_FORWARD(F, op)]( - auto& data, bool& data_available) mutable { + [op = HPX_FORWARD(F, op)](auto& data, bool& data_available, + std::size_t which) mutable { if (!data_available) { std::vector> dest; @@ -221,7 +225,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/reduce.hpp b/libs/full/collectives/include/hpx/collectives/reduce.hpp index 670500e948fd..a0a28b8eb8d6 100644 --- a/libs/full/collectives/include/hpx/collectives/reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/reduce.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2023 Hartmut Kaiser +// Copyright (c) 2019-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -253,11 +253,15 @@ namespace hpx::traits { std::size_t generation, T&& t, F&& op) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked once for get) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [&t](auto& data, std::size_t which) { + data[which] = HPX_FORWARD(T, t); + }, // finalizer (invoked once after all data has been received) - [op = HPX_FORWARD(F, op)](auto& data, bool&) mutable { + [op = HPX_FORWARD(F, op)]( + auto& data, bool&, std::size_t) mutable { HPX_ASSERT(!data.empty()); if (data.size() > 1) { @@ -276,9 +280,12 @@ namespace hpx::traits { std::size_t generation, T&& t) { return communicator.template handle_data>( + communication::communicator_name(), which, generation, // step function (invoked for each set) - [&](auto& data) { data[which] = HPX_FORWARD(T, t); }, + [t = HPX_FORWARD(T, t)](auto& data, std::size_t which) mutable { + data[which] = HPX_FORWARD(T, t); + }, // no finalizer nullptr); } @@ -326,7 +333,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/include/hpx/collectives/scatter.hpp b/libs/full/collectives/include/hpx/collectives/scatter.hpp index 85f2908f02a4..e8d3985105b9 100644 --- a/libs/full/collectives/include/hpx/collectives/scatter.hpp +++ b/libs/full/collectives/include/hpx/collectives/scatter.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2014-2023 Hartmut Kaiser +// Copyright (c) 2014-2024 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -244,12 +244,13 @@ namespace hpx::traits { { using data_type = typename Result::result_type; - return communicator.template handle_data(which, - generation, + return communicator.template handle_data( + communication::communicator_name(), + which, generation, // step function (invoked once for get) nullptr, // finalizer (invoked after all sites have checked in) - [which](auto& data, bool&) { + [](auto& data, bool&, std::size_t which) { return Communicator::template handle_bool( HPX_MOVE(data[which])); }); @@ -260,11 +261,12 @@ namespace hpx::traits { std::size_t generation, std::vector&& t) { return communicator.template handle_data( + communication::communicator_name(), which, generation, // step function (invoked once for set) - [&](auto& data) { data = HPX_MOVE(t); }, + [&t](auto& data, std::size_t) { data = HPX_MOVE(t); }, // finalizer (invoked after all sites have checked in) - [which](auto& data, bool&) { + [](auto& data, bool&, std::size_t which) { return Communicator::template handle_bool( HPX_MOVE(data[which])); }); @@ -306,7 +308,7 @@ namespace hpx::collectives { { // make sure id is kept alive as long as the returned future traits::detail::get_shared_state(result)->set_on_completed( - [client = HPX_MOVE(c)]() { HPX_UNUSED(client); }); + [client = HPX_MOVE(c)] { HPX_UNUSED(client); }); } return result; diff --git a/libs/full/collectives/src/create_communicator.cpp b/libs/full/collectives/src/create_communicator.cpp index 7f13d2a17b36..c2f97c9cb0e5 100644 --- a/libs/full/collectives/src/create_communicator.cpp +++ b/libs/full/collectives/src/create_communicator.cpp @@ -50,8 +50,6 @@ namespace hpx::collectives { communicator_server::communicator_server() noexcept //-V730 : num_sites_(0) - , needs_initialization_(false) - , data_available_(false) { HPX_ASSERT(false); // shouldn't ever be called } @@ -59,10 +57,9 @@ namespace hpx::collectives { communicator_server::communicator_server(std::size_t num_sites) noexcept : gate_(num_sites) , num_sites_(num_sites) - , needs_initialization_(true) - , data_available_(false) { - HPX_ASSERT(num_sites != 0); + HPX_ASSERT( + num_sites != 0 && num_sites != static_cast(-1)); } } // namespace detail diff --git a/libs/full/collectives/tests/unit/communication_set.cpp b/libs/full/collectives/tests/unit/communication_set.cpp index e4b55f16ef14..4083353c85ef 100644 --- a/libs/full/collectives/tests/unit/communication_set.cpp +++ b/libs/full/collectives/tests/unit/communication_set.cpp @@ -119,7 +119,7 @@ namespace hpx::traits { l); which = communicator.which_++; - communicator.gate_.set(which, std::move(l)); + communicator.gate_.set(which, l); return f; }