Skip to content

Commit

Permalink
Tightening up collective operation semantics
Browse files Browse the repository at this point in the history
- flyby: small_vector tweaks
  • Loading branch information
hkaiser committed Jan 11, 2024
1 parent 7b35448 commit e885579
Show file tree
Hide file tree
Showing 15 changed files with 270 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ namespace hpx::detail {
}

// only void* is allowed to be converted to uintptr_t
void* ptr = ::operator new(offset_to_data + sizeof(T) * capacity);
void* ptr = ::operator new(mem);
if (nullptr == ptr)
{
throw std::bad_alloc();
Expand Down Expand Up @@ -319,9 +319,13 @@ namespace hpx::detail {
{
// indirect -> direct
auto* storage = indirect();
uninitialized_move_and_destroy(
storage->data(), direct_data(), storage->size());
set_direct_and_size(storage->size());
auto const data_size = storage->size();
if (data_size != 0)
{
uninitialized_move_and_destroy(
storage->data(), direct_data(), data_size);
set_direct_and_size(data_size);
}
detail::storage<T>::dealloc(storage);
}
}
Expand All @@ -332,16 +336,26 @@ namespace hpx::detail {
if (is_direct())
{
// direct -> indirect
uninitialized_move_and_destroy(data<direction::direct>(),
storage->data(), size<direction::direct>());
storage->size(size<direction::direct>());
auto const data_size = size<direction::direct>();
if (data_size != 0)
{
uninitialized_move_and_destroy(
data<direction::direct>(), storage->data(),
data_size);
storage->size(data_size);
}
}
else
{
// indirect -> indirect
uninitialized_move_and_destroy(data<direction::indirect>(),
storage->data(), size<direction::indirect>());
storage->size(size<direction::indirect>());
auto const data_size = size<direction::indirect>();
if (data_size != 0)
{
uninitialized_move_and_destroy(
data<direction::indirect>(), storage->data(),
data_size);
storage->size(data_size);
}
detail::storage<T>::dealloc(indirect());
}
set_indirect(storage);
Expand Down
53 changes: 22 additions & 31 deletions libs/core/futures/src/future_data.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2023 Hartmut Kaiser
// Copyright (c) 2015-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -22,13 +22,14 @@

#include <cstddef>
#include <exception>
#include <functional>
#include <mutex>
#include <utility>

namespace hpx::lcos::detail {

static run_on_completed_error_handler_type run_on_completed_error_handler;
namespace {
run_on_completed_error_handler_type run_on_completed_error_handler;
}

void set_run_on_completed_error_handler(
run_on_completed_error_handler_type f)
Expand Down Expand Up @@ -66,16 +67,12 @@ namespace hpx::lcos::detail {

///////////////////////////////////////////////////////////////////////////
template <typename Callback>
static void run_on_completed_on_new_thread(Callback&& f)
void run_on_completed_on_new_thread(Callback&& f)
{
lcos::local::futures_factory<void()> p(HPX_FORWARD(Callback, f));

bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr();
HPX_ASSERT(nullptr != hpx::threads::get_self_ptr());
hpx::launch policy = launch::fork;
if (!is_hpx_thread)
{
policy = launch::async;
}

policy.set_priority(threads::thread_priority::boost);
policy.set_stacksize(threads::thread_stacksize::current);
Expand All @@ -84,17 +81,12 @@ namespace hpx::lcos::detail {
threads::thread_id_ref_type const tid = //-V821
p.post("run_on_completed_on_new_thread", policy);

// wait for the task to run
if (is_hpx_thread)
{
// make sure this thread is executed last
this_thread::suspend(
threads::thread_schedule_state::pending, tid.noref());
return p.get_future().get();
}
// make sure this thread is executed last
this_thread::suspend(
threads::thread_schedule_state::pending, tid.noref());

// If we are not on a HPX thread, we need to return immediately, to
// allow the newly spawned thread to execute.
// wait for the task to run
return p.get_future().get();
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -124,15 +116,13 @@ namespace hpx::lcos::detail {
}

auto const state = this->state_.load(std::memory_order_acquire);
if (state != this->empty)
if (state != future_data_base::empty)
{
return false;
}

// this thread would block on the future

auto* thrd = get_thread_id_data(runs_child);
HPX_UNUSED(thrd); // might be unused
[[maybe_unused]] auto* thrd = get_thread_id_data(runs_child);

LTM_(debug).format("task_object::get_result_void: attempting to "
"directly execute child({}), description({})",
Expand Down Expand Up @@ -161,8 +151,6 @@ namespace hpx::lcos::detail {
return false;
}

static util::unused_type unused_;

util::unused_type*
future_data_base<traits::detail::future_data_void>::get_result_void(
void const* storage, error_code& ec)
Expand Down Expand Up @@ -190,6 +178,7 @@ namespace hpx::lcos::detail {

if (s == value)
{
static util::unused_type unused_;
return &unused_;
}

Expand Down Expand Up @@ -232,12 +221,12 @@ namespace hpx::lcos::detail {
hpx::scoped_annotation annotate(on_completed);
HPX_MOVE(on_completed)();
},
[&](std::exception_ptr ep) {
[&](std::exception_ptr const& ep) {
// If the completion handler throws an exception, there's
// nothing we can do, report the exception and terminate.
if (run_on_completed_error_handler)
{
run_on_completed_error_handler(HPX_MOVE(ep));
run_on_completed_error_handler(ep);
}
else
{
Expand Down Expand Up @@ -272,7 +261,9 @@ namespace hpx::lcos::detail {
cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH ||
(hpx::threads::get_self_ptr() == nullptr);
#endif
if (!recurse_asynchronously)

bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr();
if (!is_hpx_thread || !recurse_asynchronously)
{
// directly execute continuation on this thread
run_on_completed(HPX_FORWARD(Callback, on_completed));
Expand All @@ -289,17 +280,17 @@ namespace hpx::lcos::detail {
run_on_completed_on_new_thread(util::deferred_call(
p, HPX_FORWARD(Callback, on_completed)));
},
[&](std::exception_ptr ep) {
[&](std::exception_ptr const& ep) {
// If an exception while creating the new task or inside the
// completion handler is thrown, there is nothing we can do...
// ... but terminate and report the error
if (run_on_completed_error_handler)
{
run_on_completed_error_handler(HPX_MOVE(ep));
run_on_completed_error_handler(ep);
}
else
{
std::rethrow_exception(HPX_MOVE(ep));
std::rethrow_exception(ep);
}
});
}
Expand Down
25 changes: 12 additions & 13 deletions libs/core/lcos_local/include/hpx/lcos_local/and_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ namespace hpx::lcos::local {
protected:
// Set the data which has to go into the segment \a which.
template <typename OuterLock, typename F>
bool set(std::size_t which, OuterLock outer_lock, F&& f,
bool set(std::size_t which, OuterLock& outer_lock, F&& f,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(outer_lock);
Expand Down Expand Up @@ -224,15 +224,12 @@ namespace hpx::lcos::local {
std::decay_t<F>>)
{
// invoke callback with the outer lock being held
HPX_FORWARD(F, f)(outer_lock, *this);
HPX_FORWARD(F, f)(outer_lock, *this, ec);
}

outer_lock.unlock();
return true;
}
}

outer_lock.unlock();
return false;
}

Expand All @@ -242,7 +239,7 @@ namespace hpx::lcos::local {
{
hpx::no_mutex mtx;
std::unique_lock<hpx::no_mutex> lk(mtx);
return set(which, HPX_MOVE(lk), HPX_FORWARD(F, f), ec);
return set(which, lk, HPX_FORWARD(F, f), ec);
}

protected:
Expand Down Expand Up @@ -324,7 +321,8 @@ namespace hpx::lcos::local {

public:
template <typename Lock>
std::size_t next_generation(Lock& l, std::size_t new_generation)
std::size_t next_generation(
Lock& l, std::size_t new_generation, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(l);

Expand All @@ -335,10 +333,11 @@ namespace hpx::lcos::local {
if (new_generation < generation_)
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::error::invalid_status,
HPX_THROWS_IF(ec, hpx::error::invalid_status,
"and_gate::next_generation",
"sequencing error, new generational counter value too "
"small");
return generation_;
}
generation_ = new_generation;
}
Expand All @@ -351,10 +350,11 @@ namespace hpx::lcos::local {
}

std::size_t next_generation(
std::size_t new_generation = static_cast<std::size_t>(-1))
std::size_t new_generation = static_cast<std::size_t>(-1),
error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);
return next_generation(l, new_generation);
return next_generation(l, new_generation, ec);
}

template <typename Lock>
Expand Down Expand Up @@ -441,11 +441,10 @@ namespace hpx::lcos::local {
}

template <typename Lock, typename F = std::nullptr_t>
bool set(std::size_t which, Lock l, F&& f = nullptr,
bool set(std::size_t which, Lock& l, F&& f = nullptr,
error_code& ec = hpx::throws)
{
return this->base_type::set(
which, HPX_MOVE(l), HPX_FORWARD(F, f), ec);
return this->base_type::set(which, l, HPX_FORWARD(F, f), ec);
}

template <typename Lock>
Expand Down
8 changes: 6 additions & 2 deletions libs/full/collectives/include/hpx/collectives/all_gather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,15 @@ namespace hpx::traits {
std::size_t generation, T&& t)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::all_gather_tag>(),
which, generation,
// step function (invoked for each get)
[&](auto& data) { data[which] = HPX_FORWARD(T, t); },
[&t](auto& data, std::size_t which) {
data[which] = HPX_FORWARD(T, t);
},
// finalizer (invoked after all data has been received)
[](auto& data, auto&) { return data; });
[](auto& data, auto&, std::size_t) { return data; });
}
};
} // namespace hpx::traits
Expand Down
8 changes: 6 additions & 2 deletions libs/full/collectives/include/hpx/collectives/all_reduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,17 @@ namespace hpx::traits {
std::size_t generation, T&& t, F&& op)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::all_reduce_tag>(),
which, generation,
// step function (invoked for each get)
[&](auto& data) { data[which] = HPX_FORWARD(T, t); },
[&t](auto& data, std::size_t which) {
data[which] = HPX_FORWARD(T, t);
},
// finalizer (invoked non-concurrently after all data has been
// received)
[op = HPX_FORWARD(F, op)](
auto& data, bool& data_available) mutable {
auto& data, bool& data_available, std::size_t) mutable {
HPX_ASSERT(!data.empty());
if (!data_available && data.size() > 1)
{
Expand Down
8 changes: 6 additions & 2 deletions libs/full/collectives/include/hpx/collectives/all_to_all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,15 @@ namespace hpx::traits {
std::size_t generation, std::vector<T>&& t)
{
return communicator.template handle_data<std::vector<T>>(
communication::communicator_name<
communication::all_to_all_tag>(),
which, generation,
// step function (invoked for each get)
[&](auto& data) { data[which] = HPX_MOVE(t); },
[&t](auto& data, std::size_t which) {
data[which] = HPX_MOVE(t);
},
// finalizer (invoked after all data has been received)
[which](auto& data, auto&) {
[](auto& data, auto&, std::size_t which) {
// slice the overall data based on the locality id of the
// requesting site
std::vector<T> result;
Expand Down
10 changes: 7 additions & 3 deletions libs/full/collectives/include/hpx/collectives/broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,13 @@ namespace hpx::traits {
using data_type = typename Result::result_type;

return communicator.template handle_data<data_type>(
communication::communicator_name<
communication::broadcast_tag>(),
which, generation,
// no step function
nullptr,
// finalizer (invoked after all sites have checked in)
[](auto& data, auto&) {
[](auto& data, auto&, std::size_t) {
return Communicator::template handle_bool<data_type>(
data[0]);
},
Expand All @@ -249,11 +251,13 @@ namespace hpx::traits {
std::size_t generation, T&& t)
{
return communicator.template handle_data<std::decay_t<T>>(
communication::communicator_name<
communication::broadcast_tag>(),
which, generation,
// step function (invoked once for set)
[&](auto& data) { data[0] = HPX_FORWARD(T, t); },
[&t](auto& data, std::size_t) { data[0] = HPX_FORWARD(T, t); },
// finalizer (invoked after all sites have checked in)
[](auto& data, auto&) {
[](auto& data, auto&, std::size_t) {
return Communicator::template handle_bool<std::decay_t<T>>(
data[0]);
},
Expand Down
Loading

0 comments on commit e885579

Please sign in to comment.