Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rfa parallel #6595

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/macos_debug_fetch_hwloc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
-DHPX_WITH_VERIFY_LOCKS=ON \
-DHPX_WITH_VERIFY_LOCKS_BACKTRACE=ON \
-DHPX_WITH_CHECK_MODULE_DEPENDENCIES=ON
ln -s "$(which aclocal)" /opt/homebrew/bin/aclocal-1.16
cd build/_deps/hwloc-src/ && autoreconf -f -i
- name: Build
shell: bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <hpx/functional/invoke.hpp>
#include <hpx/parallel/algorithms/detail/rfa.hpp>
#include <hpx/parallel/util/loop.hpp>
#include <hpx/type_support/pack.hpp>

#include <cstddef>
#include <cstring>
Expand All @@ -35,16 +36,16 @@ namespace hpx::parallel::detail {
{
/// TODO: Put constraint on Reduce to be a binary plus operator
(void) r;
hpx::parallel::detail::rfa::RFA_bins<T> bins;
bins.initialize_bins();
std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins));

// __rfa_bin_host_buffer__ should be initialized by the frontend of
// this method

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
rfa.set_max_abs_val(init);
rfa.unsafe_add(init);
rfa.renorm();
size_t count = 0;
T max_val = std::abs(*first);
T max_val = std::abs((std::numeric_limits<T>::min)());
for (auto e = first; e != last; ++e)
{
T temp_max_val = std::abs(static_cast<T>(*e));
Expand All @@ -65,6 +66,69 @@ namespace hpx::parallel::detail {
}
};

template <typename ExPolicy>
struct sequential_reduce_deterministic_rfa_t final
: hpx::functional::detail::tag_fallback<
sequential_reduce_deterministic_rfa_t<ExPolicy>>
{
private:
template <typename InIterB, typename T>
friend constexpr hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T>
tag_fallback_invoke(sequential_reduce_deterministic_rfa_t,
ExPolicy&&, InIterB first, std::size_t partition_size, T init,
std::true_type&&)
{
// __rfa_bin_host_buffer__ should be initialized by the frontend of
// this method

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
rfa.zero();
rfa += init;
size_t count = 0;
T max_val = std::abs((std::numeric_limits<T>::min)());
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim < partition_size;
partition_size_lim++, e++)
{
T temp_max_val = std::abs(static_cast<T>(*e));
if (max_val < temp_max_val)
{
rfa.set_max_abs_val(temp_max_val);
max_val = temp_max_val;
}
rfa.unsafe_add(*e);
count++;
if (count == rfa.endurance())
{
rfa.renorm();
count = 0;
}
}
return rfa;
}

template <typename InIterB, typename T>
friend constexpr T tag_fallback_invoke(
sequential_reduce_deterministic_rfa_t, ExPolicy&&, InIterB first,
std::size_t partition_size, T init, std::false_type&&)
{
// __rfa_bin_host_buffer__ should be initialized by the frontend of
// this method

T rfa;
rfa.zero();
rfa += init;
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim < partition_size;
partition_size_lim++, e++)
{
rfa += (*e);
}
return rfa;
}
};

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_t<ExPolicy>
Expand All @@ -80,4 +144,18 @@ namespace hpx::parallel::detail {
}
#endif

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_rfa_t<ExPolicy>
sequential_reduce_deterministic_rfa =
sequential_reduce_deterministic_rfa_t<ExPolicy>{};
#else
template <typename ExPolicy, typename... Args>
HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic_rfa(
Args&&... args)
{
return sequential_reduce_deterministic_rfa_t<ExPolicy>{}(
std::forward<Args>(args)...);
}
#endif
} // namespace hpx::parallel::detail
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ namespace hpx::parallel::detail::rfa {
///The number of deposits that can be performed before a renorm is necessary.
///Applies also to binned complex double precision.
static constexpr auto ENDURANCE = 1 << (MANT_DIG - BIN_WIDTH - 2);

///Return a binned floating-point reference bin
inline const ftype* binned_bins(const int x) const
{
Expand Down Expand Up @@ -825,7 +824,7 @@ namespace hpx::parallel::detail::rfa {
///Set the binned fp to zero
void zero()
{
data = {0};
data = {{0}};
}

///Return the fold of the binned fp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ namespace hpx {

#include <algorithm>
#include <cstddef>
#include <cstring>
#include <iterator>
#include <type_traits>
#include <utility>
Expand All @@ -396,10 +397,67 @@ namespace hpx::parallel {
static constexpr T sequential(ExPolicy&& policy, InIterB first,
InIterE last, T_&& init, Reduce&& r)
{
// TODO: abstract initializing memory
hpx::parallel::detail::rfa::RFA_bins<T_> bins;
bins.initialize_bins();
std::memcpy(hpx::parallel::detail::rfa::__rfa_bin_host_buffer__,
&bins, sizeof(bins));
return hpx::parallel::detail::sequential_reduce_deterministic<
ExPolicy>(HPX_FORWARD(ExPolicy, policy), first, last,
HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r));
}

template <typename ExPolicy, typename FwdIterB, typename FwdIterE,
typename T_, typename Reduce>
static util::detail::algorithm_result_t<ExPolicy, T> parallel(
ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init,
Reduce&& r)
{
(void) r;
if (first == last)
{
return util::detail::algorithm_result<ExPolicy, T>::get(
HPX_FORWARD(T_, init));
}

// TODO: abstract initializing memory
hpx::parallel::detail::rfa::RFA_bins<T_> bins;
bins.initialize_bins();
std::memcpy(hpx::parallel::detail::rfa::__rfa_bin_host_buffer__,
&bins, sizeof(bins));

auto f1 = [policy](FwdIterB part_begin, std::size_t part_size)
-> hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T_> {
T_ val = *part_begin;
// Assumed that __rfa_bin_host_buffer__ is initiallized
return hpx::parallel::detail::
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), ++part_begin,
--part_size, HPX_MOVE(val),
std::true_type{});
};

return util::partitioner<ExPolicy, T_,
hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
T_>>::call(HPX_FORWARD(ExPolicy, policy), first,
detail::distance(first, last), HPX_MOVE(f1),
hpx::unwrapping([policy, init](auto&& results) -> T_ {
// Assumed that __rfa_bin_host_buffer__ is initiallized
hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T_>
rfa;
rfa.zero();
rfa += init;
return hpx::parallel::detail::
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy),
hpx::util::begin(results),
hpx::util::size(results), HPX_MOVE(rfa),
std::false_type{})
.conv();
}));
}
};
/// \endcond
} // namespace detail
Expand Down
1 change: 1 addition & 0 deletions libs/core/algorithms/tests/performance/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ set(benchmarks
benchmark_partial_sort_parallel
benchmark_partition
benchmark_partition_copy
benchmark_reduce_deterministic
benchmark_remove
benchmark_remove_if
benchmark_scan_algorithms
Expand Down
Loading
Loading