Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with wait_for_tasks() #68

Merged
merged 15 commits into from
Jul 5, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: use std::atomic_bool instead of a barrier
DeveloperPaul123 committed Jul 5, 2024
commit d586238d193ee183bfa4a139739156a280092435
21 changes: 6 additions & 15 deletions include/thread_pool/thread_pool.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <atomic>
#include <barrier>
#include <concepts>
#include <deque>
#include <functional>
@@ -23,10 +22,8 @@ namespace dp {

#ifdef __cpp_lib_move_only_function
using default_function_type = std::move_only_function<void()>;
using barrier_function_type = std::move_only_function<void() noexcept>;
#else
using default_function_type = std::function<void()>;
using barrier_function_type = []() noexcept {};
#endif
} // namespace details

@@ -38,14 +35,11 @@ namespace dp {
public:
template <typename InitializationFunction = std::function<void(std::size_t)>>
requires std::invocable<InitializationFunction, std::size_t> &&
std::is_same_v<void,
std::invoke_result_t<InitializationFunction, std::size_t>>
std::is_same_v<void, std::invoke_result_t<InitializationFunction, std::size_t>>
explicit thread_pool(
const unsigned int &number_of_threads = std::thread::hardware_concurrency(),
InitializationFunction init = [](std::size_t) {})
: tasks_(number_of_threads), threads_done_(number_of_threads, [this]() noexcept {
threads_complete_signal_.release();
}) {
: tasks_(number_of_threads) {
std::size_t current_id = 0;
for (std::size_t i = 0; i < number_of_threads; ++i) {
priority_queue_.push_back(size_t(current_id));
@@ -94,7 +88,8 @@ namespace dp {
// check if all tasks are completed and release the barrier (binary
// semaphore)
if (in_flight_tasks_.load(std::memory_order_acquire) == 0) {
threads_complete_signal_.release();
threads_complete_signal_ = true;
DeveloperPaul123 marked this conversation as resolved.
Show resolved Hide resolved
threads_complete_signal_.notify_one();
}

} while (!stop_tok.stop_requested());
@@ -110,9 +105,6 @@ namespace dp {

// remove our thread from the priority queue
std::ignore = priority_queue_.pop_back();

// remove one item from the barrier
threads_done_.arrive_and_drop();
}
}
}
@@ -237,7 +229,7 @@ namespace dp {
void wait_for_tasks() {
if (in_flight_tasks_.load(std::memory_order_acquire) > 0) {
// wait for all tasks to finish
threads_complete_signal_.acquire();
threads_complete_signal_.wait(false);
}
}

@@ -265,8 +257,7 @@ namespace dp {
std::deque<task_item> tasks_;
dp::thread_safe_queue<std::size_t> priority_queue_;
std::atomic_int_fast64_t unassigned_tasks_{}, in_flight_tasks_{};
std::barrier<details::barrier_function_type> threads_done_;
std::binary_semaphore threads_complete_signal_{0};
std::atomic_bool threads_complete_signal_{false};
};

/**