Skip to content

Commit

Permalink
Always return outermost thread id
Browse files Browse the repository at this point in the history
-flyby: deprecate get_outer_self_id
  • Loading branch information
hkaiser committed Jan 16, 2024
1 parent 7b35448 commit e2e17a1
Show file tree
Hide file tree
Showing 24 changed files with 366 additions and 293 deletions.
11 changes: 5 additions & 6 deletions components/iostreams/src/server/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace hpx::iostreams::detail {
ar << valid;
if (valid)
{
ar& data_;
ar & data_;
}
}

Expand All @@ -44,7 +44,7 @@ namespace hpx::iostreams::detail {
ar >> valid;
if (valid)
{
ar& data_;
ar & data_;
}
}
} // namespace hpx::iostreams::detail
Expand Down Expand Up @@ -89,10 +89,9 @@ namespace hpx::iostreams::server {
{ // {{{
// Perform the IO in another OS thread.
detail::buffer in(buf_in);
hpx::get_thread_pool("io_pool")->get_io_service().post(
hpx::bind_front(&output_stream::call_write_sync, this, locality_id,
count, std::ref(in),
threads::thread_id_ref_type(threads::get_outer_self_id())));
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
&output_stream::call_write_sync, this, locality_id, count,
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));

// Sleep until the worker thread wakes us up.
this_thread::suspend(threads::thread_schedule_state::suspended,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -63,6 +63,9 @@ namespace examples::server {
}
~reset_id()
{
auto const mtx = outer_.mtx_;
std::lock_guard<hpx::mutex> l(*mtx);

[[maybe_unused]] hpx::thread::id const old_value = outer_.id_;
outer_.id_ = hpx::thread::id();
HPX_ASSERT(old_value != hpx::thread::id());
Expand Down Expand Up @@ -104,9 +107,14 @@ namespace examples::server {
});

auto const mtx = mtx_;

std::lock_guard<hpx::mutex> l(*mtx);
HPX_ASSERT(id_ != hpx::thread::id());
hpx::thread::interrupt(id_);
auto const id = id_;

if (id != hpx::thread::id())
{
hpx::thread::interrupt(id);
}
}

HPX_DEFINE_COMPONENT_ACTION(cancelable_action, do_it, do_it_action)
Expand Down
33 changes: 23 additions & 10 deletions libs/core/execution/tests/unit/bulk_async.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2015 Daniel Bourgeois
// Copyright (c) 2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -16,16 +17,16 @@
#include <vector>

////////////////////////////////////////////////////////////////////////////////
int bulk_test(
hpx::thread::id tid, int value, bool is_par, int passed_through) //-V813
int bulk_test(hpx::thread::id const& tid, int value, bool is_par,
int passed_through) //-V813
{
HPX_TEST_EQ(is_par, (tid != hpx::this_thread::get_id()));
HPX_TEST_EQ(passed_through, 42);
return value;
}

template <typename Executor>
void test_bulk_sync(Executor& exec)
void test_bulk_sync(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -35,14 +36,15 @@ void test_bulk_sync(Executor& exec)
using hpx::placeholders::_1;
using hpx::placeholders::_2;

std::vector<int> results = hpx::parallel::execution::bulk_sync_execute(
exec, hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);
std::vector<int> results =
hpx::parallel::execution::bulk_sync_execute(HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v)));
}

template <typename Executor>
void test_bulk_async(Executor& exec)
void test_bulk_async(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -54,14 +56,25 @@ void test_bulk_async(Executor& exec)

std::vector<hpx::future<int>> results =
hpx::parallel::execution::bulk_async_execute(
exec, hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);
HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v),
[](hpx::future<int>& lhs, const int& rhs) {
return lhs.get() == rhs;
}));
}

template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

////////////////////////////////////////////////////////////////////////////////
int hpx_main()
{
Expand All @@ -70,15 +83,15 @@ int hpx_main()

hpx::execution::parallel_executor par_exec;
hpx::execution::parallel_executor par_fork_exec(hpx::launch::fork);
test_bulk_async(par_exec);
test_bulk_async(par_fork_exec);
test_bulk_async(disable_run_as_child(par_exec));
test_bulk_async(disable_run_as_child(par_fork_exec));

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down
57 changes: 35 additions & 22 deletions libs/core/executors/tests/unit/parallel_policy_executor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2019 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -17,6 +17,16 @@
#include <vector>

///////////////////////////////////////////////////////////////////////////////
template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

hpx::thread::id test(int passed_through)
{
HPX_TEST_EQ(passed_through, 42);
Expand All @@ -26,7 +36,7 @@ hpx::thread::id test(int passed_through)
template <typename Policy>
void test_sync()
{
typedef hpx::execution::parallel_policy_executor<Policy> executor;
using executor = hpx::execution::parallel_policy_executor<Policy>;

executor exec;
HPX_TEST(hpx::parallel::execution::sync_execute(exec, &test, 42) ==
Expand All @@ -36,12 +46,12 @@ void test_sync()
template <typename Policy>
void test_async(bool sync)
{
typedef hpx::execution::parallel_policy_executor<Policy> executor;
using executor = hpx::execution::parallel_policy_executor<Policy>;

executor exec;
bool result =
hpx::parallel::execution::async_execute(exec, &test, 42).get() ==
hpx::this_thread::get_id();
bool const result = hpx::parallel::execution::async_execute(
disable_run_as_child(exec), &test, 42)
.get() == hpx::this_thread::get_id();

HPX_TEST_EQ(sync, result);
}
Expand All @@ -60,26 +70,26 @@ hpx::thread::id test_f(hpx::future<void> f, int passed_through)
template <typename Policy>
void test_then(bool sync)
{
typedef hpx::execution::parallel_policy_executor<Policy> executor;
using executor = hpx::execution::parallel_policy_executor<Policy>;

hpx::future<void> f = hpx::make_ready_future();

executor exec;
bool result =
bool const result =
hpx::parallel::execution::then_execute(exec, &test_f, f, 42).get() ==
hpx::this_thread::get_id();

HPX_TEST_EQ(sync, result);
}

///////////////////////////////////////////////////////////////////////////////
void bulk_test_s(int, hpx::thread::id tid, int passed_through) //-V813
void bulk_test_s(int, hpx::thread::id const& tid, int passed_through) //-V813
{
HPX_TEST_EQ(tid, hpx::this_thread::get_id());
HPX_TEST_EQ(passed_through, 42);
}

void bulk_test_a(int, hpx::thread::id tid, int passed_through) //-V813
void bulk_test_a(int, hpx::thread::id const& tid, int passed_through) //-V813
{
HPX_TEST_NEQ(tid, hpx::this_thread::get_id());
HPX_TEST_EQ(passed_through, 42);
Expand All @@ -88,7 +98,7 @@ void bulk_test_a(int, hpx::thread::id tid, int passed_through) //-V813
template <typename Policy>
void test_bulk_sync(bool sync)
{
typedef hpx::execution::parallel_policy_executor<Policy> executor;
using executor = hpx::execution::parallel_policy_executor<Policy>;

hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -99,17 +109,17 @@ void test_bulk_sync(bool sync)
using hpx::placeholders::_2;

executor exec;
hpx::parallel::execution::bulk_sync_execute(exec,
hpx::parallel::execution::bulk_sync_execute(disable_run_as_child(exec),
hpx::bind(sync ? &bulk_test_s : &bulk_test_a, _1, tid, _2), v, 42);
hpx::parallel::execution::bulk_sync_execute(
exec, sync ? &bulk_test_s : &bulk_test_a, v, tid, 42);
hpx::parallel::execution::bulk_sync_execute(disable_run_as_child(exec),
sync ? &bulk_test_s : &bulk_test_a, v, tid, 42);
}

///////////////////////////////////////////////////////////////////////////////
template <typename Policy>
void test_bulk_async(bool sync)
{
typedef hpx::execution::parallel_policy_executor<Policy> executor;
using executor = hpx::execution::parallel_policy_executor<Policy>;

hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -121,16 +131,18 @@ void test_bulk_async(bool sync)

executor exec;
hpx::when_all(
hpx::parallel::execution::bulk_async_execute(exec,
hpx::parallel::execution::bulk_async_execute(disable_run_as_child(exec),
hpx::bind(sync ? &bulk_test_s : &bulk_test_a, _1, tid, _2), v, 42))
.get();
hpx::when_all(hpx::parallel::execution::bulk_async_execute(
exec, sync ? &bulk_test_s : &bulk_test_a, v, tid, 42))
hpx::when_all(
hpx::parallel::execution::bulk_async_execute(disable_run_as_child(exec),
sync ? &bulk_test_s : &bulk_test_a, v, tid, 42))
.get();
}

///////////////////////////////////////////////////////////////////////////////
void bulk_test_f_s(int, hpx::shared_future<void> f, hpx::thread::id tid,
void bulk_test_f_s(int, hpx::shared_future<void> const& f,
hpx::thread::id const& tid,
int passed_through) //-V813
{
HPX_TEST(f.is_ready()); // make sure, future is ready
Expand All @@ -141,7 +153,8 @@ void bulk_test_f_s(int, hpx::shared_future<void> f, hpx::thread::id tid,
HPX_TEST_EQ(passed_through, 42);
}

void bulk_test_f_a(int, hpx::shared_future<void> f, hpx::thread::id tid,
void bulk_test_f_a(int, hpx::shared_future<void> const& f,
hpx::thread::id const& tid,
int passed_through) //-V813
{
HPX_TEST(f.is_ready()); // make sure, future is ready
Expand All @@ -155,7 +168,7 @@ void bulk_test_f_a(int, hpx::shared_future<void> f, hpx::thread::id tid,
template <typename Policy>
void test_bulk_then(bool sync)
{
typedef hpx::execution::parallel_policy_executor<Policy> executor;
using executor = hpx::execution::parallel_policy_executor<Policy>;

hpx::thread::id tid = hpx::this_thread::get_id();

Expand Down Expand Up @@ -223,7 +236,7 @@ int hpx_main()

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down
23 changes: 17 additions & 6 deletions libs/core/executors/tests/unit/shared_parallel_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ namespace hpx::parallel::execution {
} // namespace hpx::parallel::execution

///////////////////////////////////////////////////////////////////////////////
template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

hpx::thread::id test(int passed_through)
{
HPX_TEST_EQ(passed_through, 42);
Expand All @@ -61,7 +71,8 @@ void test_async()
executor exec;

hpx::shared_future<hpx::thread::id> fut =
hpx::parallel::execution::async_execute(exec, &test, 42);
hpx::parallel::execution::async_execute(
disable_run_as_child(exec), &test, 42);

HPX_TEST_NEQ(fut.get(), hpx::this_thread::get_id());
}
Expand Down Expand Up @@ -105,12 +116,12 @@ void test_bulk_async()

executor exec;
std::vector<hpx::shared_future<void>> futs =
hpx::parallel::execution::bulk_async_execute(
exec, hpx::bind(&bulk_test, _1, tid, _2), v, 42);
hpx::parallel::execution::bulk_async_execute(disable_run_as_child(exec),
hpx::bind(&bulk_test, _1, tid, _2), v, 42);
hpx::when_all(futs).get();

futs = hpx::parallel::execution::bulk_async_execute(
exec, &bulk_test, v, tid, 42);
disable_run_as_child(exec), &bulk_test, v, tid, 42);
hpx::when_all(futs).get();
}

Expand All @@ -133,8 +144,8 @@ void test_async_void()
using executor = shared_parallel_executor;

executor exec;
hpx::shared_future<void> fut =
hpx::parallel::execution::async_execute(exec, &void_test, 42);
hpx::shared_future<void> fut = hpx::parallel::execution::async_execute(
disable_run_as_child(exec), &void_test, 42);
fut.get();
}

Expand Down
Loading

0 comments on commit e2e17a1

Please sign in to comment.