From 9f0938288ee6bce47072af96645d5c3fbdc9bab9 Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 12:01:11 -0400 Subject: [PATCH 01/22] Add `wait_for_tasks()` method --- include/thread_pool/thread_pool.h | 28 +++++++++++++++++++++++++--- test/source/thread_pool.cpp | 20 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 94ca6bb..93ffe7c 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -36,7 +36,7 @@ namespace dp { public: explicit thread_pool( const unsigned int &number_of_threads = std::thread::hardware_concurrency()) - : tasks_(number_of_threads) { + : tasks_(number_of_threads), waiting_barrier_(number_of_threads) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); @@ -69,8 +69,15 @@ namespace dp { } } while (pending_tasks_.load(std::memory_order_acquire) > 0); - - priority_queue_.rotate_to_front(id); + + priority_queue_.rotate_to_front(id); + // once tasks are done, arrive at the barrier. + if (waiting_.load(std::memory_order_acquire)) { + waiting_barrier_.arrive_and_wait(); + // notify the waiter + wait_signal_.release(); + } + } while (!stop_tok.stop_requested()); }); @@ -79,6 +86,8 @@ namespace dp { } catch (...) { // catch all + // if an exception occurs, drop the count for the barrier + waiting_barrier_.arrive_and_drop(); // remove one item from the tasks tasks_.pop_back(); @@ -192,6 +201,16 @@ namespace dp { })); } + void wait_for_tasks() { + // first check if there are any pending tasks + if (pending_tasks_.load(std::memory_order_acquire) == 0) return; + waiting_.store(true); + // wait for all threads to arrive at the barrier + wait_signal_.acquire(); + // reset the waiting flag + waiting_.store(false); + } + [[nodiscard]] auto size() const { return threads_.size(); } private: @@ -217,6 +236,9 @@ namespace dp { std::deque tasks_; dp::thread_safe_queue priority_queue_; std::atomic_int_fast64_t pending_tasks_{}; + std::atomic_bool waiting_{}; + std::barrier<> waiting_barrier_; + std::binary_semaphore wait_signal_{0}; }; /** diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index f3c20e0..fa4c4a3 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -251,6 +251,26 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { pool.enqueue_detach(task); } } + CHECK_EQ(counter.load(), total_tasks); +} + +TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { + std::atomic counter = 0; + int total_tasks{}; + dp::thread_pool pool(4); + + SUBCASE("with tasks") { total_tasks = 30; } + SUBCASE("with no tasks") { total_tasks = 0; } + + for (auto i = 0; i < total_tasks; i++) { + auto task = [i, &counter]() { + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); + ++counter; + }; + pool.enqueue_detach(task); + } + + pool.wait_for_tasks(); CHECK_EQ(counter.load(), total_tasks); } From 507bb7fb6abd672624b3cc2980b726fac5514709 Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 14:21:05 -0400 Subject: [PATCH 02/22] Minor fixes --- include/thread_pool/thread_pool.h | 7 ++----- test/source/thread_pool.cpp | 3 +-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 93ffe7c..9360003 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -36,7 +36,7 @@ namespace dp { public: explicit thread_pool( const unsigned int &number_of_threads = std::thread::hardware_concurrency()) - : tasks_(number_of_threads), waiting_barrier_(number_of_threads) { + : tasks_(number_of_threads), waiting_barrier_(number_of_threads + 1) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); @@ -74,8 +74,6 @@ namespace dp { // once tasks are done, arrive at the barrier. if (waiting_.load(std::memory_order_acquire)) { waiting_barrier_.arrive_and_wait(); - // notify the waiter - wait_signal_.release(); } @@ -206,7 +204,7 @@ namespace dp { if (pending_tasks_.load(std::memory_order_acquire) == 0) return; waiting_.store(true); // wait for all threads to arrive at the barrier - wait_signal_.acquire(); + waiting_barrier_.arrive_and_wait(); // reset the waiting flag waiting_.store(false); } @@ -238,7 +236,6 @@ namespace dp { std::atomic_int_fast64_t pending_tasks_{}; std::atomic_bool waiting_{}; std::barrier<> waiting_barrier_; - std::binary_semaphore wait_signal_{0}; }; /** diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index fa4c4a3..3b0cbb1 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -257,11 +257,11 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { std::atomic counter = 0; int total_tasks{}; - dp::thread_pool pool(4); SUBCASE("with tasks") { total_tasks = 30; } SUBCASE("with no tasks") { total_tasks = 0; } + dp::thread_pool pool(4); for (auto i = 0; i < total_tasks; i++) { auto task = [i, &counter]() { std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); @@ -269,7 +269,6 @@ TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { }; pool.enqueue_detach(task); } - pool.wait_for_tasks(); CHECK_EQ(counter.load(), total_tasks); From 4a547766526e906b6876b9799d9e252a30d8a45b Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 14:45:20 -0400 Subject: [PATCH 03/22] Trying to address race condition --- include/thread_pool/thread_pool.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 9360003..5e20193 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -43,6 +43,9 @@ namespace dp { try { threads_.emplace_back([&, id = current_id](const std::stop_token &stop_tok) { do { + // check for a stop request before acquiring the wait signal + if(stop_tok.stop_requested()) break; + // wait until signaled tasks_[id].signal.acquire(); From 47bfd8d9ab822c74a76b388e70448042326616ca Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 15:49:28 -0400 Subject: [PATCH 04/22] Wake all threads when waiting This is done to ensure that the std::barrier is always hit when waiting for tasks to complete. --- include/thread_pool/thread_pool.h | 4 +++- test/source/thread_pool.cpp | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 5e20193..da12a83 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -44,7 +44,7 @@ namespace dp { threads_.emplace_back([&, id = current_id](const std::stop_token &stop_tok) { do { // check for a stop request before acquiring the wait signal - if(stop_tok.stop_requested()) break; + if (stop_tok.stop_requested()) break; // wait until signaled tasks_[id].signal.acquire(); @@ -206,6 +206,8 @@ namespace dp { // first check if there are any pending tasks if (pending_tasks_.load(std::memory_order_acquire) == 0) return; waiting_.store(true); + // wake all threads + for (std::size_t i = 0; i < tasks_.size(); ++i) tasks_[i].signal.release(); // wait for all threads to arrive at the barrier waiting_barrier_.arrive_and_wait(); // reset the waiting flag diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 3b0cbb1..6f7e961 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -257,14 +257,16 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { std::atomic counter = 0; int total_tasks{}; + constexpr auto thread_count = 4; SUBCASE("with tasks") { total_tasks = 30; } SUBCASE("with no tasks") { total_tasks = 0; } + SUBCASE("with task count less than thread count") { total_tasks = thread_count / 2; } - dp::thread_pool pool(4); + dp::thread_pool pool(thread_count); for (auto i = 0; i < total_tasks; i++) { auto task = [i, &counter]() { - std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 10)); ++counter; }; pool.enqueue_detach(task); From 79e7c7b0076aeda26663b76d79cbe0e8c4f50c5b Mon Sep 17 00:00:00 2001 From: Paul T Date: Wed, 2 Aug 2023 00:20:56 -0400 Subject: [PATCH 05/22] Minor updates --- include/thread_pool/thread_pool.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index da12a83..89d61b2 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -39,7 +39,7 @@ namespace dp { : tasks_(number_of_threads), waiting_barrier_(number_of_threads + 1) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { - priority_queue_.push_back(size_t(current_id)); + priority_queue_.push_back(static_cast(current_id)); try { threads_.emplace_back([&, id = current_id](const std::stop_token &stop_tok) { do { @@ -72,13 +72,13 @@ namespace dp { } } while (pending_tasks_.load(std::memory_order_acquire) > 0); - - priority_queue_.rotate_to_front(id); + + priority_queue_.rotate_to_front(id); + // once tasks are done, arrive at the barrier. if (waiting_.load(std::memory_order_acquire)) { waiting_barrier_.arrive_and_wait(); } - } while (!stop_tok.stop_requested()); }); @@ -205,13 +205,13 @@ namespace dp { void wait_for_tasks() { // first check if there are any pending tasks if (pending_tasks_.load(std::memory_order_acquire) == 0) return; - waiting_.store(true); + waiting_.store(true, std::memory_order_release); // wake all threads for (std::size_t i = 0; i < tasks_.size(); ++i) tasks_[i].signal.release(); // wait for all threads to arrive at the barrier waiting_barrier_.arrive_and_wait(); // reset the waiting flag - waiting_.store(false); + waiting_.store(false, std::memory_order_release); } [[nodiscard]] auto size() const { return threads_.size(); } From c8245a23d580fac11fc6a83cea131ae4e0dadd39 Mon Sep 17 00:00:00 2001 From: Paul T Date: Wed, 2 Aug 2023 14:35:37 -0400 Subject: [PATCH 06/22] Update README.md Update readme with corrected build info and how to install via vcpkg --- README.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 6b57f59..05c3ff5 100644 --- a/README.md +++ b/README.md @@ -26,12 +26,17 @@ A simple, *fast* and functional thread pool implementation using pure C++20. `dp::thread-pool` is a header only library. All the files needed are in `include/thread_pool`. -### CMake +### vcpkg + +`dp::thread-pool` is available on `vcpkg` -`ThreadPool` defines two CMake targets: +```powershell +vcpkg install dp-thread-pool +``` + +### CMake -* `ThreadPool::ThreadPool` -* `dp::thread-pool` +`thread-pool` defines the CMake target `dp::thread-pool`. You can then use `find_package()`: From 45bfacfd3c4bdb33d974fd5376c8d0a3138b1c59 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 17 Aug 2023 14:19:57 -0400 Subject: [PATCH 07/22] Fix minor issues with clang-format --- .clang-format | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/.clang-format b/.clang-format index 2644f89..26d4c8c 100644 --- a/.clang-format +++ b/.clang-format @@ -1,17 +1,15 @@ --- BasedOnStyle: Google - AccessModifierOffset: '-2' - AlignTrailingComments: 'true' - AllowAllParametersOfDeclarationOnNextLine: 'false' - AlwaysBreakTemplateDeclarations: 'No' + AccessModifierOffset: -2 + AlignTrailingComments: true + AllowAllParametersOfDeclarationOnNextLine: false BreakBeforeBraces: Attach - ColumnLimit: '100' - ConstructorInitializerAllOnOneLineOrOnePerLine: 'false' + ColumnLimit: 100 + ConstructorInitializerAllOnOneLineOrOnePerLine: false IncludeBlocks: Regroup IndentPPDirectives: AfterHash - IndentWidth: '4' + IndentWidth: 4 NamespaceIndentation: All BreakBeforeBinaryOperators: None - BreakBeforeTernaryOperators: 'true' + BreakBeforeTernaryOperators: true AlwaysBreakTemplateDeclarations: 'Yes' -... From 76a5927c16ac9131607c25427d1ac2506f81fe9b Mon Sep 17 00:00:00 2001 From: Paul T Date: Wed, 23 Aug 2023 07:23:21 -0400 Subject: [PATCH 08/22] Add thread scaling and count primes benchmarks --- benchmark/include/utilities.h | 11 ++ benchmark/source/count_primes.cpp | 137 +++++++++++++++++++++++ benchmark/source/thread_pool_scaling.cpp | 78 +++++++++++++ 3 files changed, 226 insertions(+) create mode 100644 benchmark/source/count_primes.cpp create mode 100644 benchmark/source/thread_pool_scaling.cpp diff --git a/benchmark/include/utilities.h b/benchmark/include/utilities.h index e95c1a4..f89cda5 100644 --- a/benchmark/include/utilities.h +++ b/benchmark/include/utilities.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -53,3 +55,12 @@ template return computations; } + +template > + requires std::is_integral_v +void generate_random_data(Seq&& seq) { + static ankerl::nanobench::Rng rng(std::random_device{}()); + std::uniform_int_distribution distribution(std::numeric_limits::min(), + std::numeric_limits::max()); + std::ranges::generate(seq, [&] { return distribution(rng); }); +} diff --git a/benchmark/source/count_primes.cpp b/benchmark/source/count_primes.cpp new file mode 100644 index 0000000..b4b7745 --- /dev/null +++ b/benchmark/source/count_primes.cpp @@ -0,0 +1,137 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +template +bool is_prime(const ValueType& value) { + if (value <= 3 && value > 1) return true; + + // no need to check above sqrt(n) + const auto n = static_cast(std::ceil(std::sqrt(value) + 1)); + + for (auto i = 2; i < n; ++i) { + if (n % i == 0) { + return false; + } + } + return true; +} + +template +void count_if_prime(const ValueType& value, std::uint64_t& count) { + if (is_prime(value)) ++count; +} + +template +void count_if_prime_tp(const ValueType& value, std::atomic& count) { + if (is_prime(value)) ++count; +} + +template +std::uint64_t count_primes(const std::vector& values) { + std::uint64_t count = 0; + for (const auto& value : values) count_if_prime(value, std::ref(count)); + return count; +} + +template +std::uint64_t count_primes_thread_pool(const std::vector& values) { + std::atomic count(0); + + { + dp::thread_pool<> pool{}; + for (const auto& value : values) { + pool.enqueue_detach(count_if_prime_tp, value, std::ref(count)); + } + } + + return count.load(); +} + +template +void run_benchmark(const std::size_t& size) { + ankerl::nanobench::Bench bench; + auto bench_title = std::string("count primes ") + std::to_string(sizeof(ValueType) * 8) + + " bit " + std::to_string(size); + bench.title(bench_title).warmup(10).relative(true); + + // generate the data + std::vector values(size); + generate_random_data(values); + + std::atomic count(0); + bench.run("dp::thread_pool", [&] { + { + dp::thread_pool<> pool{}; + for (const auto& value : values) { + pool.enqueue_detach(count_if_prime_tp, value, std::ref(count)); + } + } + }); + + count.store(0); + bench.run("BS::thread_pool_light", [&] { + BS::thread_pool_light bs_thread_pool{std::thread::hardware_concurrency()}; + for (const auto& value : values) { + bs_thread_pool.push_task(count_if_prime_tp, value, std::ref(count)); + } + }); + + count.store(0); + bench.run("riften::thief_pool", [&] { + riften::Thiefpool pool{}; + for (const auto& value : values) { + pool.enqueue_detach(count_if_prime_tp, value, std::ref(count)); + } + }); +} + +TEST_CASE("count primes") { + using namespace std::chrono_literals; + + // test sequentially and with thread pool + std::vector values(100); + generate_random_data(values); + + auto result = count_primes(values); + auto pool_result = count_primes_thread_pool(values); + + CHECK(result == pool_result); + + std::vector values2(100); + generate_random_data(values2); + + result = count_primes(values2); + pool_result = count_primes_thread_pool(values2); + + CHECK(result == pool_result); + + std::vector values3(100); + generate_random_data(values3); + + result = count_primes(values3); + pool_result = count_primes_thread_pool(values3); + + CHECK(result == pool_result); + + std::vector small_int_args = {10'000, 100'000, 1'000'000}; + for (const auto& size : small_int_args) { + run_benchmark(size); + } + + std::vector args = {100, 1000, 10'000}; + for (const auto& size : args) { + run_benchmark(size); + } + + std::vector large_int_args = {100, 1000}; + for (const auto& size : large_int_args) { + run_benchmark(size); + } +} diff --git a/benchmark/source/thread_pool_scaling.cpp b/benchmark/source/thread_pool_scaling.cpp new file mode 100644 index 0000000..23170a8 --- /dev/null +++ b/benchmark/source/thread_pool_scaling.cpp @@ -0,0 +1,78 @@ +#include +#include +#include + +#include +#include +#include + +const auto thread_task = [] { + int a = 0; + int b = 1; +#pragma unroll 1 + for (int i = 0; i < 50; ++i) { +#pragma unroll 1 + for (int j = 0; j < 25; ++j) { + a = a + b; + b = b + a; + } + } + int result = b; + ankerl::nanobench::doNotOptimizeAway(result); +}; + +// tests how well the thread pool scales for a given task +TEST_CASE("dp::thread_pool scaling") { + using namespace std::chrono_literals; + ankerl::nanobench::Bench bench; + const auto bench_title = std::string("equilibrium 64,000"); + + // clang-format off + bench.title(bench_title) + .warmup(10) + .minEpochIterations(10) + .relative(true) + .timeUnit(1ms, "ms"); + // clang-format on + + for (unsigned int n_threads = 1; n_threads <= std::thread::hardware_concurrency(); + n_threads++) { + const std::string run_title = "dp::thread_pool n_threads: " + std::to_string(n_threads); + dp::thread_pool> pool(n_threads); + std::vector> results(64'000); + bench.run(run_title, [&] { + for (auto i = 0; i < 64'000; i++) { + results[i] = pool.enqueue(thread_task); + } + for (auto& result : results) result.get(); + }); + results.clear(); + } +} + +TEST_CASE("riften::ThiefPool scaling") { + using namespace std::chrono_literals; + ankerl::nanobench::Bench bench; + const auto bench_title = std::string("equilibrium 64,000"); + + // clang-format off + bench.title(bench_title) + .warmup(10) + .minEpochIterations(10) + .relative(true) + .timeUnit(1ms, "ms"); + // clang-format on + + for (unsigned int n_threads = 1; n_threads <= std::thread::hardware_concurrency(); + n_threads++) { + const std::string run_title = "riften::ThiefPool n_threads: " + std::to_string(n_threads); + + bench.run(run_title, [=] { + riften::Thiefpool pool(n_threads); + + for (auto i = 0; i < 64'000; i++) { + pool.enqueue_detach(thread_task); + } + }); + } +} From a6e0b85c6992a1baa0f88254fc6a7897054cd7fc Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 29 Sep 2023 13:21:57 -0400 Subject: [PATCH 09/22] feature: add new build preset and update benchmarks Updated thread pool slightly, fixed some issues in the tests and updated a benchmark for linear scaling of thread pool testing. Added support for thread sanitizer and address sanitizer as well --- CMakePresets.json | 17 +++++++++++++++-- benchmark/source/thread_pool_scaling.cpp | 15 ++++++++------- include/thread_pool/thread_pool.h | 2 +- test/CMakeLists.txt | 16 +++++++++++++++- test/source/thread_pool.cpp | 16 ++++++++-------- 5 files changed, 47 insertions(+), 19 deletions(-) diff --git a/CMakePresets.json b/CMakePresets.json index 01f0422..95ca434 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -77,6 +77,12 @@ "displayName": "Clang Release", "cacheVariables": { "CMAKE_BUILD_TYPE": "Release" } }, + { + "name": "clang-release-with-debug-info", + "inherits": "clang-base", + "displayName": "Clang RelWithDebInfo", + "cacheVariables": { "CMAKE_BUILD_TYPE": "RelWithDebInfo" } + }, { "name": "x64-debug", "displayName": "x64 Debug", @@ -112,10 +118,17 @@ { "name": "x64-release", "displayName": "x64 Release", - "description": "Target Windows (64-bit) with the Visual Studio development environment. (RelWithDebInfo)", + "description": "Target Windows (64-bit) with the Visual Studio development environment. (Release)", "inherits": "x64-debug", "cacheVariables": { "CMAKE_BUILD_TYPE": "Release" } }, + { + "name": "x64-release-with-debug", + "displayName": "x64 Release w/Debug", + "description": "Target Windows (64-bit) with the Visual Studio development environment. (RelWithDebInfo)", + "inherits": "x64-debug", + "cacheVariables": { "CMAKE_BUILD_TYPE": "RelWithDebInfo" } + }, { "name": "x64-windows-vcpkg", "displayName": "Install only", @@ -125,7 +138,7 @@ "CMAKE_BUILD_TYPE": "Release", "TP_BUILD_TESTS": "OFF", "TP_BUILD_EXAMPLES": "OFFF", - "TP_BUILD_BENCHMARKS":"OFF" + "TP_BUILD_BENCHMARKS": "OFF" } } ] diff --git a/benchmark/source/thread_pool_scaling.cpp b/benchmark/source/thread_pool_scaling.cpp index 23170a8..15ba84a 100644 --- a/benchmark/source/thread_pool_scaling.cpp +++ b/benchmark/source/thread_pool_scaling.cpp @@ -6,20 +6,21 @@ #include #include -const auto thread_task = [] { +inline void thread_task() { int a = 0; int b = 1; -#pragma unroll 1 + +#pragma unroll for (int i = 0; i < 50; ++i) { -#pragma unroll 1 +#pragma unroll for (int j = 0; j < 25; ++j) { a = a + b; b = b + a; } } int result = b; - ankerl::nanobench::doNotOptimizeAway(result); -}; + // ankerl::nanobench::doNotOptimizeAway(result); +} // tests how well the thread pool scales for a given task TEST_CASE("dp::thread_pool scaling") { @@ -38,7 +39,7 @@ TEST_CASE("dp::thread_pool scaling") { for (unsigned int n_threads = 1; n_threads <= std::thread::hardware_concurrency(); n_threads++) { const std::string run_title = "dp::thread_pool n_threads: " + std::to_string(n_threads); - dp::thread_pool> pool(n_threads); + dp::thread_pool pool{n_threads}; std::vector> results(64'000); bench.run(run_title, [&] { for (auto i = 0; i < 64'000; i++) { @@ -58,7 +59,7 @@ TEST_CASE("riften::ThiefPool scaling") { // clang-format off bench.title(bench_title) .warmup(10) - .minEpochIterations(10) + .minEpochIterations(100) .relative(true) .timeUnit(1ms, "ms"); // clang-format on diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 94ca6bb..08a4735 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -116,7 +116,7 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { -#if __cpp_lib_move_only_function +#ifdef __cpp_lib_move_only_function // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b08afe4..7d12125 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,17 +40,31 @@ target_link_libraries(${PROJECT_NAME} doctest::doctest dp::thread-pool) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:--coverage> ) + target_link_options( ${PROJECT_NAME} PRIVATE $<$:--coverage> ) +if(NOT WIN32) + target_compile_options( + ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread -g> + ) + target_link_options( + ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread> + ) +elseif(MSVC AND ${CMAKE_BUILD_TYPE} MATCHES "Debug;RelWithDebInfo") + target_compile_options( + ${PROJECT_NAME} PRIVATE $<$:/fsanitize=address> + ) +endif() + # enable compiler warnings if(NOT TEST_INSTALLED_VERSION) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-Wall -Wpedantic -Wextra -Werror> ) - target_compile_options(${PROJECT_NAME} PRIVATE $<$:/W4 /WX>) + target_compile_options(${PROJECT_NAME} PRIVATE $<$:/W4 /WX /wd4324>) target_compile_definitions( ${PROJECT_NAME} PRIVATE $<$:DOCTEST_CONFIG_USE_STD_HEADERS> ) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index f3c20e0..38a31a6 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -13,19 +13,19 @@ auto multiply(int a, int b) { return a * b; } TEST_CASE("Multiply using global function") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto result = pool.enqueue(multiply, 3, 4); CHECK_EQ(result.get(), 12); } TEST_CASE("Multiply using lambda") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto result = pool.enqueue([](int a, int b) { return a * b; }, 3, 4); CHECK_EQ(result.get(), 12); } TEST_CASE("Multiply with functor") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto result = pool.enqueue(std::multiplies{}, 3, 4); CHECK_EQ(result.get(), 12); } @@ -33,7 +33,7 @@ TEST_CASE("Multiply with functor") { TEST_CASE("Pass reference to pool") { int x = 2; { - dp::thread_pool pool; + dp::thread_pool pool{}; pool.enqueue_detach([](int& a) { a *= 2; }, std::ref(x)); } CHECK_EQ(x, 4); @@ -42,14 +42,14 @@ TEST_CASE("Pass reference to pool") { TEST_CASE("Pass raw reference to pool") { int x = 2; { - dp::thread_pool pool; + dp::thread_pool pool{}; pool.enqueue_detach([](int& a) { a *= 2; }, x); } CHECK_EQ(x, 2); } TEST_CASE("Support enqueue with void return type") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto value = 8; auto future = pool.enqueue([](int& x) { x *= 2; }, std::ref(value)); future.wait(); @@ -149,7 +149,7 @@ TEST_CASE("Ensure task load is spread evenly across threads") { // worst case is the same thread doing the long task back to back. Tasks are assigned // sequentially in the thread pool so this would be the default execution if there was no work // stealing. - CHECK_LT(duration.count(), long_task_time * 2); + CHECK_LT(duration.count(), long_task_time * 2 + 1); } TEST_CASE("Ensure task exception doesn't kill worker thread") { @@ -165,7 +165,7 @@ TEST_CASE("Ensure task exception doesn't kill worker thread") { }; { - dp::thread_pool pool; + dp::thread_pool pool{}; auto throw_future = pool.enqueue(throw_task, 1); auto no_throw_future = pool.enqueue(regular_task, 2); From b04b1655a8477c107d83f393e77a04149c5726fe Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 29 Sep 2023 14:06:14 -0400 Subject: [PATCH 10/22] fix: build issue with flags in tests --- test/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7d12125..9f29ba8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -52,7 +52,7 @@ if(NOT WIN32) target_link_options( ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread> ) -elseif(MSVC AND ${CMAKE_BUILD_TYPE} MATCHES "Debug;RelWithDebInfo") +elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "MSVC" AND CMAKE_BUILD_TYPE MATCHES Debug,RelWithDebInfo ) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:/fsanitize=address> ) From 4499e3e67eed7292804315125768fd38e285704f Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 29 Sep 2023 14:29:59 -0400 Subject: [PATCH 11/22] fix: build issues with fsanitize=thread in linux --- .github/workflows/ubuntu.yml | 4 ++-- CMakeLists.txt | 9 +++++---- CMakePresets.json | 18 ++++++++++++------ test/CMakeLists.txt | 2 +- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index ba21392..b3aab2e 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -43,10 +43,10 @@ jobs: platform: x64 - name: configure gcc - run: cmake -S . -B build -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DCMAKE_BUILD_TYPE=Debug + run: cmake -S . -B build -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DTP_THREAD_SANITIZER=OFF -DCMAKE_BUILD_TYPE=Debug - name: configure clang - run: cmake -S . -B build-clang -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DCMAKE_BUILD_TYPE=Debug + run: cmake -S . -B build-clang -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DTP_THREAD_SANITIZER=OFF -DCMAKE_BUILD_TYPE=Debug env: CC: clang CXX: clang++ diff --git a/CMakeLists.txt b/CMakeLists.txt index 489fcb1..63a5cf9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -137,19 +137,20 @@ install(FILES ${PROJECT_BINARY_DIR}/include/thread_pool/version.h option(TP_BUILD_TESTS "Turn on to build unit tests." ON) option(TP_BUILD_EXAMPLES "Turn on to build examples." ON) option(TP_BUILD_BENCHMARKS "Turn on to build benchmarks." ON) +option(TP_THREAD_SANITIZER "Turn on to build with thread sanitizer." OFF) -if(${TP_BUILD_TESTS} OR ${TP_BUILD_EXAMPLES} OR ${TP_BUILD_BENCHMARKS}) +if(TP_BUILD_TESTS OR TP_BUILD_EXAMPLES OR TP_BUILD_BENCHMARKS) # see https://github.com/TheLartians/CPM.cmake for more info include(cmake/CPM.cmake) endif() -if(${TP_BUILD_TESTS}) +if(TP_BUILD_TESTS) enable_testing() add_subdirectory(test) endif() -if(${TP_BUILD_EXAMPLES}) +if(TP_BUILD_EXAMPLES) add_subdirectory(examples) endif() -if(${TP_BUILD_BENCHMARKS}) +if(TP_BUILD_BENCHMARKS) add_subdirectory(benchmark) endif() diff --git a/CMakePresets.json b/CMakePresets.json index 95ca434..c2c297e 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -26,7 +26,6 @@ "binaryDir": "${sourceDir}/out/build/${presetName}", "installDir": "${sourceDir}/out/install/${presetName}", "cacheVariables": { - "CMAKE_C_COMPILER": "cl", "CMAKE_CXX_COMPILER": "cl" }, "condition": { @@ -40,7 +39,6 @@ "hidden": true, "inherits": "linux-base", "cacheVariables": { - "CMAKE_C_COMPILER": "gcc", "CMAKE_CXX_COMPILER": "g++" } }, @@ -48,7 +46,10 @@ "name": "gcc-debug", "inherits": "gcc-base", "displayName": "GCC Debug", - "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug" } + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "TP_THREAD_SANITIZER": "ON" + } }, { "name": "gcc-release", @@ -61,7 +62,6 @@ "hidden": true, "inherits": "linux-base", "cacheVariables": { - "CMAKE_C_COMPILER": "clang", "CMAKE_CXX_COMPILER": "clang++" } }, @@ -69,7 +69,10 @@ "name": "clang-debug", "inherits": "clang-base", "displayName": "Clang Debug", - "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug" } + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "TP_THREAD_SANITIZER": "ON" + } }, { "name": "clang-release", @@ -81,7 +84,10 @@ "name": "clang-release-with-debug-info", "inherits": "clang-base", "displayName": "Clang RelWithDebInfo", - "cacheVariables": { "CMAKE_BUILD_TYPE": "RelWithDebInfo" } + "cacheVariables": { + "CMAKE_BUILD_TYPE": "RelWithDebInfo", + "TP_THREAD_SANITIZER": "ON" + } }, { "name": "x64-debug", diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9f29ba8..9a7c6b0 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -45,7 +45,7 @@ target_link_options( ${PROJECT_NAME} PRIVATE $<$:--coverage> ) -if(NOT WIN32) +if(NOT WIN32 AND TP_THREAD_SANITIZER) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread -g> ) From 079c447bbc011038705383f30acf83e011f407e4 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 29 Sep 2023 14:50:54 -0400 Subject: [PATCH 12/22] chore: update license copyright --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index 3d9bf92..b9ede5b 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 Paul Tsouchlos +Copyright (c) 2021-2023 Paul Tsouchlos Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal From d2c964d2b75b34d9ff2e114ef8e807110a2e01f5 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 11 Apr 2024 08:02:25 -0400 Subject: [PATCH 13/22] build: update build preset --- CMakePresets.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CMakePresets.json b/CMakePresets.json index c2c297e..1bd56ad 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -32,6 +32,10 @@ "type": "equals", "lhs": "${hostSystemName}", "rhs": "Windows" + }, + "architecture": { + "value": "x64", + "strategy": "external" } }, { From 91b2f1203508271291f54e69e3a443f7837c868c Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 11 Apr 2024 08:11:55 -0400 Subject: [PATCH 14/22] fix: small typo --- CMakePresets.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakePresets.json b/CMakePresets.json index 1bd56ad..f4f9f11 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -147,7 +147,7 @@ "cacheVariables": { "CMAKE_BUILD_TYPE": "Release", "TP_BUILD_TESTS": "OFF", - "TP_BUILD_EXAMPLES": "OFFF", + "TP_BUILD_EXAMPLES": "OFF", "TP_BUILD_BENCHMARKS": "OFF" } } From 1201744471ba950d5f0a941ecfa27cb9d18391ef Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 12:01:11 -0400 Subject: [PATCH 15/22] Add `wait_for_tasks()` method --- include/thread_pool/thread_pool.h | 28 +++++++++++++++++++++++++--- test/source/thread_pool.cpp | 20 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 08a4735..2c2bb7d 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -36,7 +36,7 @@ namespace dp { public: explicit thread_pool( const unsigned int &number_of_threads = std::thread::hardware_concurrency()) - : tasks_(number_of_threads) { + : tasks_(number_of_threads), waiting_barrier_(number_of_threads) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); @@ -69,8 +69,15 @@ namespace dp { } } while (pending_tasks_.load(std::memory_order_acquire) > 0); - - priority_queue_.rotate_to_front(id); + + priority_queue_.rotate_to_front(id); + // once tasks are done, arrive at the barrier. + if (waiting_.load(std::memory_order_acquire)) { + waiting_barrier_.arrive_and_wait(); + // notify the waiter + wait_signal_.release(); + } + } while (!stop_tok.stop_requested()); }); @@ -79,6 +86,8 @@ namespace dp { } catch (...) { // catch all + // if an exception occurs, drop the count for the barrier + waiting_barrier_.arrive_and_drop(); // remove one item from the tasks tasks_.pop_back(); @@ -192,6 +201,16 @@ namespace dp { })); } + void wait_for_tasks() { + // first check if there are any pending tasks + if (pending_tasks_.load(std::memory_order_acquire) == 0) return; + waiting_.store(true); + // wait for all threads to arrive at the barrier + wait_signal_.acquire(); + // reset the waiting flag + waiting_.store(false); + } + [[nodiscard]] auto size() const { return threads_.size(); } private: @@ -217,6 +236,9 @@ namespace dp { std::deque tasks_; dp::thread_safe_queue priority_queue_; std::atomic_int_fast64_t pending_tasks_{}; + std::atomic_bool waiting_{}; + std::barrier<> waiting_barrier_; + std::binary_semaphore wait_signal_{0}; }; /** diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 38a31a6..ad1b7c5 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -251,6 +251,26 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { pool.enqueue_detach(task); } } + CHECK_EQ(counter.load(), total_tasks); +} + +TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { + std::atomic counter = 0; + int total_tasks{}; + dp::thread_pool pool(4); + + SUBCASE("with tasks") { total_tasks = 30; } + SUBCASE("with no tasks") { total_tasks = 0; } + + for (auto i = 0; i < total_tasks; i++) { + auto task = [i, &counter]() { + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); + ++counter; + }; + pool.enqueue_detach(task); + } + + pool.wait_for_tasks(); CHECK_EQ(counter.load(), total_tasks); } From 91d7f6fb841cfc23530903c0b36096414100d886 Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 14:21:05 -0400 Subject: [PATCH 16/22] Minor fixes --- include/thread_pool/thread_pool.h | 7 ++----- test/source/thread_pool.cpp | 3 +-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 2c2bb7d..a09c1f2 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -36,7 +36,7 @@ namespace dp { public: explicit thread_pool( const unsigned int &number_of_threads = std::thread::hardware_concurrency()) - : tasks_(number_of_threads), waiting_barrier_(number_of_threads) { + : tasks_(number_of_threads), waiting_barrier_(number_of_threads + 1) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); @@ -74,8 +74,6 @@ namespace dp { // once tasks are done, arrive at the barrier. if (waiting_.load(std::memory_order_acquire)) { waiting_barrier_.arrive_and_wait(); - // notify the waiter - wait_signal_.release(); } @@ -206,7 +204,7 @@ namespace dp { if (pending_tasks_.load(std::memory_order_acquire) == 0) return; waiting_.store(true); // wait for all threads to arrive at the barrier - wait_signal_.acquire(); + waiting_barrier_.arrive_and_wait(); // reset the waiting flag waiting_.store(false); } @@ -238,7 +236,6 @@ namespace dp { std::atomic_int_fast64_t pending_tasks_{}; std::atomic_bool waiting_{}; std::barrier<> waiting_barrier_; - std::binary_semaphore wait_signal_{0}; }; /** diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index ad1b7c5..24dbe76 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -257,11 +257,11 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { std::atomic counter = 0; int total_tasks{}; - dp::thread_pool pool(4); SUBCASE("with tasks") { total_tasks = 30; } SUBCASE("with no tasks") { total_tasks = 0; } + dp::thread_pool pool(4); for (auto i = 0; i < total_tasks; i++) { auto task = [i, &counter]() { std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); @@ -269,7 +269,6 @@ TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { }; pool.enqueue_detach(task); } - pool.wait_for_tasks(); CHECK_EQ(counter.load(), total_tasks); From 29636da9863f8423ca13689c738f35115468a42a Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 14:45:20 -0400 Subject: [PATCH 17/22] Trying to address race condition --- include/thread_pool/thread_pool.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index a09c1f2..95899a2 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -43,6 +43,9 @@ namespace dp { try { threads_.emplace_back([&, id = current_id](const std::stop_token &stop_tok) { do { + // check for a stop request before acquiring the wait signal + if(stop_tok.stop_requested()) break; + // wait until signaled tasks_[id].signal.acquire(); From a63cb6ad2098c2aa3c359cdb0a86b57d911f5d4d Mon Sep 17 00:00:00 2001 From: Paul T Date: Mon, 20 Mar 2023 15:49:28 -0400 Subject: [PATCH 18/22] Wake all threads when waiting This is done to ensure that the std::barrier is always hit when waiting for tasks to complete. --- include/thread_pool/thread_pool.h | 4 +++- test/source/thread_pool.cpp | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 95899a2..366e18a 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -44,7 +44,7 @@ namespace dp { threads_.emplace_back([&, id = current_id](const std::stop_token &stop_tok) { do { // check for a stop request before acquiring the wait signal - if(stop_tok.stop_requested()) break; + if (stop_tok.stop_requested()) break; // wait until signaled tasks_[id].signal.acquire(); @@ -206,6 +206,8 @@ namespace dp { // first check if there are any pending tasks if (pending_tasks_.load(std::memory_order_acquire) == 0) return; waiting_.store(true); + // wake all threads + for (std::size_t i = 0; i < tasks_.size(); ++i) tasks_[i].signal.release(); // wait for all threads to arrive at the barrier waiting_barrier_.arrive_and_wait(); // reset the waiting flag diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 24dbe76..b9fc48c 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -257,14 +257,16 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { std::atomic counter = 0; int total_tasks{}; + constexpr auto thread_count = 4; SUBCASE("with tasks") { total_tasks = 30; } SUBCASE("with no tasks") { total_tasks = 0; } + SUBCASE("with task count less than thread count") { total_tasks = thread_count / 2; } - dp::thread_pool pool(4); + dp::thread_pool pool(thread_count); for (auto i = 0; i < total_tasks; i++) { auto task = [i, &counter]() { - std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 10)); ++counter; }; pool.enqueue_detach(task); From e8b744cdbd4cd7255ff4578eb21949a55ea27334 Mon Sep 17 00:00:00 2001 From: Paul T Date: Wed, 2 Aug 2023 00:20:56 -0400 Subject: [PATCH 19/22] Minor updates --- include/thread_pool/thread_pool.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 366e18a..c6fcbb4 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -39,7 +39,7 @@ namespace dp { : tasks_(number_of_threads), waiting_barrier_(number_of_threads + 1) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { - priority_queue_.push_back(size_t(current_id)); + priority_queue_.push_back(static_cast(current_id)); try { threads_.emplace_back([&, id = current_id](const std::stop_token &stop_tok) { do { @@ -72,13 +72,13 @@ namespace dp { } } while (pending_tasks_.load(std::memory_order_acquire) > 0); - - priority_queue_.rotate_to_front(id); + + priority_queue_.rotate_to_front(id); + // once tasks are done, arrive at the barrier. if (waiting_.load(std::memory_order_acquire)) { waiting_barrier_.arrive_and_wait(); } - } while (!stop_tok.stop_requested()); }); @@ -205,13 +205,13 @@ namespace dp { void wait_for_tasks() { // first check if there are any pending tasks if (pending_tasks_.load(std::memory_order_acquire) == 0) return; - waiting_.store(true); + waiting_.store(true, std::memory_order_release); // wake all threads for (std::size_t i = 0; i < tasks_.size(); ++i) tasks_[i].signal.release(); // wait for all threads to arrive at the barrier waiting_barrier_.arrive_and_wait(); // reset the waiting flag - waiting_.store(false); + waiting_.store(false, std::memory_order_release); } [[nodiscard]] auto size() const { return threads_.size(); } From c6c98d6cc7151a0a3e85bb21d683ff7e4baa1ef1 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 11 Apr 2024 09:19:20 -0400 Subject: [PATCH 20/22] chore: add message output --- test/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9a7c6b0..c7bdc81 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,6 +46,7 @@ target_link_options( ) if(NOT WIN32 AND TP_THREAD_SANITIZER) + message(STATUS "Settings thread sanitizer flags.") target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread -g> ) From 2f00efe23fdf7abe212aeab93bcdadd214182b53 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 11 Apr 2024 09:23:12 -0400 Subject: [PATCH 21/22] format: auto format code --- test/source/thread_pool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index b9fc48c..a4933ca 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -251,7 +251,7 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { pool.enqueue_detach(task); } } - CHECK_EQ(counter.load(), total_tasks); + CHECK_EQ(counter.load(), total_tasks); } TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { From 3256dd740ba117cf03711622e67734bd48a130b3 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 11 Apr 2024 09:26:57 -0400 Subject: [PATCH 22/22] build: more output and minor fix --- test/CMakeLists.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c7bdc81..9e21faa 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,14 +46,15 @@ target_link_options( ) if(NOT WIN32 AND TP_THREAD_SANITIZER) - message(STATUS "Settings thread sanitizer flags.") + message(STATUS "Setting thread sanitizer flags.") target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread -g> ) target_link_options( ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread> ) -elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "MSVC" AND CMAKE_BUILD_TYPE MATCHES Debug,RelWithDebInfo ) +elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "MSVC" AND "${CMAKE_BUILD_TYPE}" MATCHES ".*Deb.*" ) + message(STATUS "Setting address sanitizer flags.") target_compile_options( ${PROJECT_NAME} PRIVATE $<$:/fsanitize=address> )