diff --git a/.clang-format b/.clang-format index 2644f89..26d4c8c 100644 --- a/.clang-format +++ b/.clang-format @@ -1,17 +1,15 @@ --- BasedOnStyle: Google - AccessModifierOffset: '-2' - AlignTrailingComments: 'true' - AllowAllParametersOfDeclarationOnNextLine: 'false' - AlwaysBreakTemplateDeclarations: 'No' + AccessModifierOffset: -2 + AlignTrailingComments: true + AllowAllParametersOfDeclarationOnNextLine: false BreakBeforeBraces: Attach - ColumnLimit: '100' - ConstructorInitializerAllOnOneLineOrOnePerLine: 'false' + ColumnLimit: 100 + ConstructorInitializerAllOnOneLineOrOnePerLine: false IncludeBlocks: Regroup IndentPPDirectives: AfterHash - IndentWidth: '4' + IndentWidth: 4 NamespaceIndentation: All BreakBeforeBinaryOperators: None - BreakBeforeTernaryOperators: 'true' + BreakBeforeTernaryOperators: true AlwaysBreakTemplateDeclarations: 'Yes' -... diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index ba21392..b3aab2e 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -43,10 +43,10 @@ jobs: platform: x64 - name: configure gcc - run: cmake -S . -B build -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DCMAKE_BUILD_TYPE=Debug + run: cmake -S . -B build -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DTP_THREAD_SANITIZER=OFF -DCMAKE_BUILD_TYPE=Debug - name: configure clang - run: cmake -S . -B build-clang -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DCMAKE_BUILD_TYPE=Debug + run: cmake -S . -B build-clang -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF -DTP_THREAD_SANITIZER=OFF -DCMAKE_BUILD_TYPE=Debug env: CC: clang CXX: clang++ diff --git a/CMakeLists.txt b/CMakeLists.txt index 489fcb1..63a5cf9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -137,19 +137,20 @@ install(FILES ${PROJECT_BINARY_DIR}/include/thread_pool/version.h option(TP_BUILD_TESTS "Turn on to build unit tests." ON) option(TP_BUILD_EXAMPLES "Turn on to build examples." ON) option(TP_BUILD_BENCHMARKS "Turn on to build benchmarks." ON) +option(TP_THREAD_SANITIZER "Turn on to build with thread sanitizer." OFF) -if(${TP_BUILD_TESTS} OR ${TP_BUILD_EXAMPLES} OR ${TP_BUILD_BENCHMARKS}) +if(TP_BUILD_TESTS OR TP_BUILD_EXAMPLES OR TP_BUILD_BENCHMARKS) # see https://github.com/TheLartians/CPM.cmake for more info include(cmake/CPM.cmake) endif() -if(${TP_BUILD_TESTS}) +if(TP_BUILD_TESTS) enable_testing() add_subdirectory(test) endif() -if(${TP_BUILD_EXAMPLES}) +if(TP_BUILD_EXAMPLES) add_subdirectory(examples) endif() -if(${TP_BUILD_BENCHMARKS}) +if(TP_BUILD_BENCHMARKS) add_subdirectory(benchmark) endif() diff --git a/CMakePresets.json b/CMakePresets.json index 01f0422..f4f9f11 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -26,13 +26,16 @@ "binaryDir": "${sourceDir}/out/build/${presetName}", "installDir": "${sourceDir}/out/install/${presetName}", "cacheVariables": { - "CMAKE_C_COMPILER": "cl", "CMAKE_CXX_COMPILER": "cl" }, "condition": { "type": "equals", "lhs": "${hostSystemName}", "rhs": "Windows" + }, + "architecture": { + "value": "x64", + "strategy": "external" } }, { @@ -40,7 +43,6 @@ "hidden": true, "inherits": "linux-base", "cacheVariables": { - "CMAKE_C_COMPILER": "gcc", "CMAKE_CXX_COMPILER": "g++" } }, @@ -48,7 +50,10 @@ "name": "gcc-debug", "inherits": "gcc-base", "displayName": "GCC Debug", - "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug" } + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "TP_THREAD_SANITIZER": "ON" + } }, { "name": "gcc-release", @@ -61,7 +66,6 @@ "hidden": true, "inherits": "linux-base", "cacheVariables": { - "CMAKE_C_COMPILER": "clang", "CMAKE_CXX_COMPILER": "clang++" } }, @@ -69,7 +73,10 @@ "name": "clang-debug", "inherits": "clang-base", "displayName": "Clang Debug", - "cacheVariables": { "CMAKE_BUILD_TYPE": "Debug" } + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug", + "TP_THREAD_SANITIZER": "ON" + } }, { "name": "clang-release", @@ -77,6 +84,15 @@ "displayName": "Clang Release", "cacheVariables": { "CMAKE_BUILD_TYPE": "Release" } }, + { + "name": "clang-release-with-debug-info", + "inherits": "clang-base", + "displayName": "Clang RelWithDebInfo", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "RelWithDebInfo", + "TP_THREAD_SANITIZER": "ON" + } + }, { "name": "x64-debug", "displayName": "x64 Debug", @@ -112,10 +128,17 @@ { "name": "x64-release", "displayName": "x64 Release", - "description": "Target Windows (64-bit) with the Visual Studio development environment. (RelWithDebInfo)", + "description": "Target Windows (64-bit) with the Visual Studio development environment. (Release)", "inherits": "x64-debug", "cacheVariables": { "CMAKE_BUILD_TYPE": "Release" } }, + { + "name": "x64-release-with-debug", + "displayName": "x64 Release w/Debug", + "description": "Target Windows (64-bit) with the Visual Studio development environment. (RelWithDebInfo)", + "inherits": "x64-debug", + "cacheVariables": { "CMAKE_BUILD_TYPE": "RelWithDebInfo" } + }, { "name": "x64-windows-vcpkg", "displayName": "Install only", @@ -124,8 +147,8 @@ "cacheVariables": { "CMAKE_BUILD_TYPE": "Release", "TP_BUILD_TESTS": "OFF", - "TP_BUILD_EXAMPLES": "OFFF", - "TP_BUILD_BENCHMARKS":"OFF" + "TP_BUILD_EXAMPLES": "OFF", + "TP_BUILD_BENCHMARKS": "OFF" } } ] diff --git a/LICENSE b/LICENSE index 3d9bf92..b9ede5b 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 Paul Tsouchlos +Copyright (c) 2021-2023 Paul Tsouchlos Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 6b57f59..05c3ff5 100644 --- a/README.md +++ b/README.md @@ -26,12 +26,17 @@ A simple, *fast* and functional thread pool implementation using pure C++20. `dp::thread-pool` is a header only library. All the files needed are in `include/thread_pool`. -### CMake +### vcpkg + +`dp::thread-pool` is available on `vcpkg` -`ThreadPool` defines two CMake targets: +```powershell +vcpkg install dp-thread-pool +``` + +### CMake -* `ThreadPool::ThreadPool` -* `dp::thread-pool` +`thread-pool` defines the CMake target `dp::thread-pool`. You can then use `find_package()`: diff --git a/benchmark/include/utilities.h b/benchmark/include/utilities.h index e95c1a4..f89cda5 100644 --- a/benchmark/include/utilities.h +++ b/benchmark/include/utilities.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -53,3 +55,12 @@ template return computations; } + +template > + requires std::is_integral_v +void generate_random_data(Seq&& seq) { + static ankerl::nanobench::Rng rng(std::random_device{}()); + std::uniform_int_distribution distribution(std::numeric_limits::min(), + std::numeric_limits::max()); + std::ranges::generate(seq, [&] { return distribution(rng); }); +} diff --git a/benchmark/source/count_primes.cpp b/benchmark/source/count_primes.cpp new file mode 100644 index 0000000..b4b7745 --- /dev/null +++ b/benchmark/source/count_primes.cpp @@ -0,0 +1,137 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +template +bool is_prime(const ValueType& value) { + if (value <= 3 && value > 1) return true; + + // no need to check above sqrt(n) + const auto n = static_cast(std::ceil(std::sqrt(value) + 1)); + + for (auto i = 2; i < n; ++i) { + if (n % i == 0) { + return false; + } + } + return true; +} + +template +void count_if_prime(const ValueType& value, std::uint64_t& count) { + if (is_prime(value)) ++count; +} + +template +void count_if_prime_tp(const ValueType& value, std::atomic& count) { + if (is_prime(value)) ++count; +} + +template +std::uint64_t count_primes(const std::vector& values) { + std::uint64_t count = 0; + for (const auto& value : values) count_if_prime(value, std::ref(count)); + return count; +} + +template +std::uint64_t count_primes_thread_pool(const std::vector& values) { + std::atomic count(0); + + { + dp::thread_pool<> pool{}; + for (const auto& value : values) { + pool.enqueue_detach(count_if_prime_tp, value, std::ref(count)); + } + } + + return count.load(); +} + +template +void run_benchmark(const std::size_t& size) { + ankerl::nanobench::Bench bench; + auto bench_title = std::string("count primes ") + std::to_string(sizeof(ValueType) * 8) + + " bit " + std::to_string(size); + bench.title(bench_title).warmup(10).relative(true); + + // generate the data + std::vector values(size); + generate_random_data(values); + + std::atomic count(0); + bench.run("dp::thread_pool", [&] { + { + dp::thread_pool<> pool{}; + for (const auto& value : values) { + pool.enqueue_detach(count_if_prime_tp, value, std::ref(count)); + } + } + }); + + count.store(0); + bench.run("BS::thread_pool_light", [&] { + BS::thread_pool_light bs_thread_pool{std::thread::hardware_concurrency()}; + for (const auto& value : values) { + bs_thread_pool.push_task(count_if_prime_tp, value, std::ref(count)); + } + }); + + count.store(0); + bench.run("riften::thief_pool", [&] { + riften::Thiefpool pool{}; + for (const auto& value : values) { + pool.enqueue_detach(count_if_prime_tp, value, std::ref(count)); + } + }); +} + +TEST_CASE("count primes") { + using namespace std::chrono_literals; + + // test sequentially and with thread pool + std::vector values(100); + generate_random_data(values); + + auto result = count_primes(values); + auto pool_result = count_primes_thread_pool(values); + + CHECK(result == pool_result); + + std::vector values2(100); + generate_random_data(values2); + + result = count_primes(values2); + pool_result = count_primes_thread_pool(values2); + + CHECK(result == pool_result); + + std::vector values3(100); + generate_random_data(values3); + + result = count_primes(values3); + pool_result = count_primes_thread_pool(values3); + + CHECK(result == pool_result); + + std::vector small_int_args = {10'000, 100'000, 1'000'000}; + for (const auto& size : small_int_args) { + run_benchmark(size); + } + + std::vector args = {100, 1000, 10'000}; + for (const auto& size : args) { + run_benchmark(size); + } + + std::vector large_int_args = {100, 1000}; + for (const auto& size : large_int_args) { + run_benchmark(size); + } +} diff --git a/benchmark/source/thread_pool_scaling.cpp b/benchmark/source/thread_pool_scaling.cpp new file mode 100644 index 0000000..15ba84a --- /dev/null +++ b/benchmark/source/thread_pool_scaling.cpp @@ -0,0 +1,79 @@ +#include +#include +#include + +#include +#include +#include + +inline void thread_task() { + int a = 0; + int b = 1; + +#pragma unroll + for (int i = 0; i < 50; ++i) { +#pragma unroll + for (int j = 0; j < 25; ++j) { + a = a + b; + b = b + a; + } + } + int result = b; + // ankerl::nanobench::doNotOptimizeAway(result); +} + +// tests how well the thread pool scales for a given task +TEST_CASE("dp::thread_pool scaling") { + using namespace std::chrono_literals; + ankerl::nanobench::Bench bench; + const auto bench_title = std::string("equilibrium 64,000"); + + // clang-format off + bench.title(bench_title) + .warmup(10) + .minEpochIterations(10) + .relative(true) + .timeUnit(1ms, "ms"); + // clang-format on + + for (unsigned int n_threads = 1; n_threads <= std::thread::hardware_concurrency(); + n_threads++) { + const std::string run_title = "dp::thread_pool n_threads: " + std::to_string(n_threads); + dp::thread_pool pool{n_threads}; + std::vector> results(64'000); + bench.run(run_title, [&] { + for (auto i = 0; i < 64'000; i++) { + results[i] = pool.enqueue(thread_task); + } + for (auto& result : results) result.get(); + }); + results.clear(); + } +} + +TEST_CASE("riften::ThiefPool scaling") { + using namespace std::chrono_literals; + ankerl::nanobench::Bench bench; + const auto bench_title = std::string("equilibrium 64,000"); + + // clang-format off + bench.title(bench_title) + .warmup(10) + .minEpochIterations(100) + .relative(true) + .timeUnit(1ms, "ms"); + // clang-format on + + for (unsigned int n_threads = 1; n_threads <= std::thread::hardware_concurrency(); + n_threads++) { + const std::string run_title = "riften::ThiefPool n_threads: " + std::to_string(n_threads); + + bench.run(run_title, [=] { + riften::Thiefpool pool(n_threads); + + for (auto i = 0; i < 64'000; i++) { + pool.enqueue_detach(thread_task); + } + }); + } +} diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 94ca6bb..c6fcbb4 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -36,13 +36,16 @@ namespace dp { public: explicit thread_pool( const unsigned int &number_of_threads = std::thread::hardware_concurrency()) - : tasks_(number_of_threads) { + : tasks_(number_of_threads), waiting_barrier_(number_of_threads + 1) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { - priority_queue_.push_back(size_t(current_id)); + priority_queue_.push_back(static_cast(current_id)); try { threads_.emplace_back([&, id = current_id](const std::stop_token &stop_tok) { do { + // check for a stop request before acquiring the wait signal + if (stop_tok.stop_requested()) break; + // wait until signaled tasks_[id].signal.acquire(); @@ -72,6 +75,11 @@ namespace dp { priority_queue_.rotate_to_front(id); + // once tasks are done, arrive at the barrier. + if (waiting_.load(std::memory_order_acquire)) { + waiting_barrier_.arrive_and_wait(); + } + } while (!stop_tok.stop_requested()); }); // increment the thread id @@ -79,6 +87,8 @@ namespace dp { } catch (...) { // catch all + // if an exception occurs, drop the count for the barrier + waiting_barrier_.arrive_and_drop(); // remove one item from the tasks tasks_.pop_back(); @@ -116,7 +126,7 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { -#if __cpp_lib_move_only_function +#ifdef __cpp_lib_move_only_function // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); @@ -192,6 +202,18 @@ namespace dp { })); } + void wait_for_tasks() { + // first check if there are any pending tasks + if (pending_tasks_.load(std::memory_order_acquire) == 0) return; + waiting_.store(true, std::memory_order_release); + // wake all threads + for (std::size_t i = 0; i < tasks_.size(); ++i) tasks_[i].signal.release(); + // wait for all threads to arrive at the barrier + waiting_barrier_.arrive_and_wait(); + // reset the waiting flag + waiting_.store(false, std::memory_order_release); + } + [[nodiscard]] auto size() const { return threads_.size(); } private: @@ -217,6 +239,8 @@ namespace dp { std::deque tasks_; dp::thread_safe_queue priority_queue_; std::atomic_int_fast64_t pending_tasks_{}; + std::atomic_bool waiting_{}; + std::barrier<> waiting_barrier_; }; /** diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b08afe4..9e21faa 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,17 +40,33 @@ target_link_libraries(${PROJECT_NAME} doctest::doctest dp::thread-pool) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:--coverage> ) + target_link_options( ${PROJECT_NAME} PRIVATE $<$:--coverage> ) +if(NOT WIN32 AND TP_THREAD_SANITIZER) + message(STATUS "Setting thread sanitizer flags.") + target_compile_options( + ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread -g> + ) + target_link_options( + ${PROJECT_NAME} PRIVATE $<$:-fsanitize=thread> + ) +elseif("${CMAKE_CXX_COMPILER_ID}" MATCHES "MSVC" AND "${CMAKE_BUILD_TYPE}" MATCHES ".*Deb.*" ) + message(STATUS "Setting address sanitizer flags.") + target_compile_options( + ${PROJECT_NAME} PRIVATE $<$:/fsanitize=address> + ) +endif() + # enable compiler warnings if(NOT TEST_INSTALLED_VERSION) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-Wall -Wpedantic -Wextra -Werror> ) - target_compile_options(${PROJECT_NAME} PRIVATE $<$:/W4 /WX>) + target_compile_options(${PROJECT_NAME} PRIVATE $<$:/W4 /WX /wd4324>) target_compile_definitions( ${PROJECT_NAME} PRIVATE $<$:DOCTEST_CONFIG_USE_STD_HEADERS> ) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index f3c20e0..a4933ca 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -13,19 +13,19 @@ auto multiply(int a, int b) { return a * b; } TEST_CASE("Multiply using global function") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto result = pool.enqueue(multiply, 3, 4); CHECK_EQ(result.get(), 12); } TEST_CASE("Multiply using lambda") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto result = pool.enqueue([](int a, int b) { return a * b; }, 3, 4); CHECK_EQ(result.get(), 12); } TEST_CASE("Multiply with functor") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto result = pool.enqueue(std::multiplies{}, 3, 4); CHECK_EQ(result.get(), 12); } @@ -33,7 +33,7 @@ TEST_CASE("Multiply with functor") { TEST_CASE("Pass reference to pool") { int x = 2; { - dp::thread_pool pool; + dp::thread_pool pool{}; pool.enqueue_detach([](int& a) { a *= 2; }, std::ref(x)); } CHECK_EQ(x, 4); @@ -42,14 +42,14 @@ TEST_CASE("Pass reference to pool") { TEST_CASE("Pass raw reference to pool") { int x = 2; { - dp::thread_pool pool; + dp::thread_pool pool{}; pool.enqueue_detach([](int& a) { a *= 2; }, x); } CHECK_EQ(x, 2); } TEST_CASE("Support enqueue with void return type") { - dp::thread_pool pool; + dp::thread_pool pool{}; auto value = 8; auto future = pool.enqueue([](int& x) { x *= 2; }, std::ref(value)); future.wait(); @@ -149,7 +149,7 @@ TEST_CASE("Ensure task load is spread evenly across threads") { // worst case is the same thread doing the long task back to back. Tasks are assigned // sequentially in the thread pool so this would be the default execution if there was no work // stealing. - CHECK_LT(duration.count(), long_task_time * 2); + CHECK_LT(duration.count(), long_task_time * 2 + 1); } TEST_CASE("Ensure task exception doesn't kill worker thread") { @@ -165,7 +165,7 @@ TEST_CASE("Ensure task exception doesn't kill worker thread") { }; { - dp::thread_pool pool; + dp::thread_pool pool{}; auto throw_future = pool.enqueue(throw_task, 1); auto no_throw_future = pool.enqueue(regular_task, 2); @@ -251,6 +251,27 @@ TEST_CASE("Ensure work completes with fewer threads than expected.") { pool.enqueue_detach(task); } } + CHECK_EQ(counter.load(), total_tasks); +} + +TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { + std::atomic counter = 0; + int total_tasks{}; + constexpr auto thread_count = 4; + + SUBCASE("with tasks") { total_tasks = 30; } + SUBCASE("with no tasks") { total_tasks = 0; } + SUBCASE("with task count less than thread count") { total_tasks = thread_count / 2; } + + dp::thread_pool pool(thread_count); + for (auto i = 0; i < total_tasks; i++) { + auto task = [i, &counter]() { + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 10)); + ++counter; + }; + pool.enqueue_detach(task); + } + pool.wait_for_tasks(); CHECK_EQ(counter.load(), total_tasks); }