Skip to content

Commit

Permalink
Merge #6309
Browse files Browse the repository at this point in the history
6309: Making sure changed number of cores is propagated to executor r=hkaiser a=hkaiser



Co-authored-by: Hartmut Kaiser <[email protected]>
  • Loading branch information
StellarBot and hkaiser committed Aug 20, 2023
2 parents 557b87d + 3d39b6e commit 38d5bf9
Show file tree
Hide file tree
Showing 35 changed files with 1,044 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ namespace hpx { namespace ranges {

namespace hpx::ranges {

inline constexpr struct is_sorted_t final
inline constexpr struct is_sorted_t
: hpx::detail::tag_parallel_algorithm<is_sorted_t>
{
private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/async_base/scheduling_properties.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/execution/algorithms/detail/is_negative.hpp>
#include <hpx/execution/algorithms/detail/predicates.hpp>
Expand All @@ -16,6 +17,7 @@
#include <hpx/futures/future.hpp>
#include <hpx/iterator_support/iterator_range.hpp>
#include <hpx/parallel/util/detail/chunk_size_iterator.hpp>
#include <hpx/properties/property.hpp>

#include <algorithm>
#include <cstddef>
Expand Down Expand Up @@ -132,7 +134,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -166,6 +168,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -175,7 +181,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename Future, typename F1, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, std::vector<Future>& workitems,
get_bulk_iteration_shape(ExPolicy& policy, std::vector<Future>& workitems,
F1&& f1, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -241,6 +247,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -250,7 +260,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
std::vector<hpx::tuple<IterOrR, std::size_t>>
get_bulk_iteration_shape_variable(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape_variable(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<IterOrR, std::size_t>;
Expand Down Expand Up @@ -308,27 +318,31 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape_variable(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape_variable(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -360,7 +374,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy, FwdIter begin,
get_bulk_iteration_shape_idx(ExPolicy& policy, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
using iterator =
Expand Down Expand Up @@ -397,6 +411,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, 0);
iterator shape_end(last, chunk_size, count, count, 0);

Expand All @@ -407,7 +428,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy,
get_bulk_iteration_shape_idx(ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
Expand Down Expand Up @@ -475,6 +496,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, base_idx);
iterator shape_end(last, chunk_size, count, count, base_idx);

Expand All @@ -484,7 +512,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename FwdIter,
typename Stride = std::size_t>
std::vector<hpx::tuple<FwdIter, std::size_t, std::size_t>>
get_bulk_iteration_shape_idx_variable(ExPolicy&& policy, FwdIter first,
get_bulk_iteration_shape_idx_variable(ExPolicy& policy, FwdIter first,
std::size_t count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<FwdIter, std::size_t, std::size_t>;
Expand Down Expand Up @@ -543,6 +571,10 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}
} // namespace hpx::parallel::util::detail
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter, typename F>
auto foreach_partition(
ExPolicy&& policy, FwdIter first, std::size_t count, F&& f)
ExPolicy policy, FwdIter first, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -53,16 +53,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count);
policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count);
auto&& shape =
detail::get_bulk_iteration_shape_idx(policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -72,7 +72,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count);
policy, inititems, f, first, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
21 changes: 10 additions & 11 deletions libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename IterOrR, typename F>
auto partition(ExPolicy&& policy, IterOrR it_or_r, std::size_t count, F&& f)
auto partition(ExPolicy policy, IterOrR it_or_r, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -57,16 +57,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_variable(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
auto&& shape =
detail::get_bulk_iteration_shape(policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -76,7 +76,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), inititems, f, it_or_r, count);
policy, inititems, f, it_or_r, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -88,8 +88,8 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter,
typename Stride, typename F>
auto partition_with_index(ExPolicy&& policy, FwdIter first,
std::size_t count, Stride stride, F&& f)
auto partition_with_index(
ExPolicy policy, FwdIter first, std::size_t count, Stride stride, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -106,7 +106,7 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -115,7 +115,7 @@ namespace hpx::parallel::util::detail {
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -125,8 +125,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count,
stride);
policy, inititems, f, first, count, stride);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
1 change: 1 addition & 0 deletions libs/core/algorithms/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set(tests
for_loop_5735
for_loop_with_auto_chunk_size
minimal_findend
num_cores
reduce_3641
scan_different_inits
scan_non_commutative
Expand Down
41 changes: 41 additions & 0 deletions libs/core/algorithms/tests/regressions/num_cores.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2023 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/algorithm.hpp>
#include <hpx/chrono.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

#include <cstddef>

int hpx_main()
{
hpx::execution::experimental::num_cores nc(2);
auto policy = hpx::execution::par.with(nc);

HPX_TEST_EQ(
hpx::parallel::execution::processing_units_count(policy.parameters(),
policy.executor(), hpx::chrono::null_duration, 0),
static_cast<std::size_t>(2));

auto policy2 =
hpx::parallel::execution::with_processing_units_count(policy, 2);
HPX_TEST_EQ(hpx::parallel::execution::processing_units_count(
hpx::execution::par.parameters(), policy2.executor(),
hpx::chrono::null_duration, 0),
static_cast<std::size_t>(2));

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
8 changes: 4 additions & 4 deletions libs/core/algorithms/tests/unit/algorithms/rotate_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void test_rotate_direct(Policy l, ExPolicy&& policy, IteratorTag)
std::iota(std::begin(c), std::end(c), std::rand());
std::copy(std::begin(c), std::end(c), std::back_inserter(d1));

std::size_t mid_pos = std::rand() % c.size(); //-V104
std::size_t const mid_pos = std::rand() % c.size(); //-V104
base_iterator mid = std::begin(c);
std::advance(mid, mid_pos);

Expand Down Expand Up @@ -88,7 +88,7 @@ void test_rotate(Policy l, ExPolicy&& policy, IteratorTag)
std::iota(std::begin(c), std::end(c), std::rand());
std::copy(std::begin(c), std::end(c), std::back_inserter(d1));

std::size_t mid_pos = std::rand() % c.size(); //-V104
std::size_t const mid_pos = std::rand() % c.size(); //-V104
base_iterator mid = std::begin(c);
std::advance(mid, mid_pos);

Expand Down Expand Up @@ -127,7 +127,7 @@ void test_rotate_async_direct(Policy l, ExPolicy&& p, IteratorTag)
std::iota(std::begin(c), std::end(c), std::rand());
std::copy(std::begin(c), std::end(c), std::back_inserter(d1));

std::size_t mid_pos = std::rand() % c.size(); //-V104
std::size_t const mid_pos = std::rand() % c.size(); //-V104

base_iterator mid = std::begin(c);
std::advance(mid, mid_pos);
Expand Down Expand Up @@ -191,7 +191,7 @@ void rotate_test()

int hpx_main(hpx::program_options::variables_map& vm)
{
unsigned int seed = (unsigned int) std::time(nullptr);
unsigned int seed = static_cast<unsigned int>(std::time(nullptr));
if (vm.count("seed"))
seed = vm["seed"].as<unsigned int>();

Expand Down
Loading

0 comments on commit 38d5bf9

Please sign in to comment.