Skip to content

Commit

Permalink
feature: wip implementation of chase-lev deque
Browse files Browse the repository at this point in the history
work in progress implementation of a chase-lev, lock free, work stealing deque.
  • Loading branch information
DeveloperPaul123 committed Sep 29, 2023
1 parent 079c447 commit 3e22869
Show file tree
Hide file tree
Showing 3 changed files with 413 additions and 16 deletions.
29 changes: 13 additions & 16 deletions include/thread_pool/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
#endif

#include "thread_pool/thread_safe_queue.h"
#include "thread_pool/work_stealing_deque.h"

namespace dp {
namespace details {

#ifdef __cpp_lib_move_only_function
// TODO: use move only function, work stealing deque can't use move only types
#if 0 // __cpp_lib_move_only_function
using default_function_type = std::move_only_function<void()>;
#else
using default_function_type = std::function<void()>;
Expand All @@ -48,24 +50,19 @@ namespace dp {

do {
// invoke the task
while (auto task = tasks_[id].tasks.pop_front()) {
while (auto task = tasks_[id].tasks.pop_top()) {
try {
pending_tasks_.fetch_sub(1, std::memory_order_release);
std::invoke(std::move(task.value()));
} catch (...) {
}
}

// try to steal a task
for (std::size_t j = 1; j < tasks_.size(); ++j) {
const std::size_t index = (id + j) % tasks_.size();
if (auto task = tasks_[index].tasks.steal()) {
// steal a task
pending_tasks_.fetch_sub(1, std::memory_order_release);
std::invoke(std::move(task.value()));
// stop stealing once we have invoked a stolen task
break;
}
// try to steal a task from our donor
auto donor_index = (id + 1) % tasks_.size();
if (auto task = tasks_[donor_index].tasks.pop_top()) {
pending_tasks_.fetch_sub(1, std::memory_order_release);
std::invoke(std::move(task.value()));
}

} while (pending_tasks_.load(std::memory_order_acquire) > 0);
Expand Down Expand Up @@ -116,8 +113,8 @@ namespace dp {
typename ReturnType = std::invoke_result_t<Function &&, Args &&...>>
requires std::invocable<Function, Args...>
[[nodiscard]] std::future<ReturnType> enqueue(Function f, Args... args) {
#ifdef __cpp_lib_move_only_function
// we can do this in C++23 because we now have support for move only functions
#if 0 // __cpp_lib_move_only_function
// we can do this in C++23 because we now have support for move only functions
std::promise<ReturnType> promise;
auto future = promise.get_future();
auto task = [func = std::move(f), ... largs = std::move(args),
Expand Down Expand Up @@ -204,12 +201,12 @@ namespace dp {
}
auto i = *(i_opt);
pending_tasks_.fetch_add(1, std::memory_order_relaxed);
tasks_[i].tasks.push_back(std::forward<Function>(f));
tasks_[i].tasks.push_bottom(std::forward<Function>(f));
tasks_[i].signal.release();
}

struct task_item {
dp::thread_safe_queue<FunctionType> tasks{};
dp::work_stealing_deque<FunctionType> tasks{};
std::binary_semaphore signal{0};
};

Expand Down
194 changes: 194 additions & 0 deletions include/thread_pool/work_stealing_deque.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#pragma once

#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>

namespace dp {

#ifdef __cpp_lib_hardware_interference_size
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
inline constexpr std::size_t hardware_destructive_interference_size =
2 * sizeof(std::max_align_t);
#endif

/**
* @brief Chase-Lev work stealing queue
* @details Support single producer, multiple consumer. The producer owns the back, consumers
* own the top. Consumers can also take from the top of the queue. The queue is "lock-free" in
* that it does not directly use mutexes or locks.
*
* This is an implementation of the deque described in "Correct and Efficient Work-Stealing for
* Weak Memory Models" and "Dynamic Circular Work-Stealing Deque" by Chase,Lev.
*
*/
template <typename T>
requires std::is_destructible_v<T>
class work_stealing_deque final {
/**
* @brief Simple circular array buffer that can regrow
* TODO: Leverage std::pmr facilities to automatically allocate/reclaim memory?
*/
class circular_buffer final {
public:
explicit circular_buffer(const std::int64_t size) : size_(size), mask_(size - 1) {
// size must be a power of 2
assert((size % 2) == 0);

buffer_ = std::make_unique_for_overwrite<T[]>(size_);
pointer_.store(buffer_.get(), release);
}

[[nodiscard]] std::int64_t capacity() const noexcept { return size_; }

void store(const std::size_t index, T value, std::memory_order order = acquire) noexcept
requires std::is_move_assignable_v<T>
{
auto buf = pointer_.load(order);
buf[index & mask_] = value;
}

T load(const std::size_t index, std::memory_order order = acquire) noexcept {
auto buf = pointer_.load(order);
return buf[index & mask_];
}

/**
* @brief Resize the internal buffer. Copies [start, end) to the new buffer.
* @param start The start index
* @param end The end index
*/
circular_buffer* resize(const std::size_t start, const std::size_t end) {
auto temp = new circular_buffer(size_ * 2);
for (std::size_t i = start; i != end; ++i) {
temp->store(i, load(i));
}
return temp;
}

private:
std::int64_t size_;
std::int64_t mask_;
std::atomic<T*> pointer_;
std::unique_ptr<T[]> buffer_;
};

constexpr static std::size_t default_count = 1024;
alignas(hardware_destructive_interference_size) std::atomic_int64_t top_;
alignas(hardware_destructive_interference_size) std::atomic_int64_t bottom_;
alignas(hardware_destructive_interference_size) std::atomic<circular_buffer*> buffer_;

std::vector<std::unique_ptr<circular_buffer>> garbage_{32};

static constexpr std::memory_order relaxed = std::memory_order_relaxed;
static constexpr std::memory_order acquire = std::memory_order_acquire;
static constexpr std::memory_order consume = std::memory_order_consume;
static constexpr std::memory_order release = std::memory_order_release;
static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;

public:
explicit work_stealing_deque(const std::size_t& capacity = default_count)
: top_(0), bottom_(0), buffer_(new circular_buffer(capacity)) {}

// queue is non-copyable
work_stealing_deque(work_stealing_deque&) = delete;
work_stealing_deque& operator=(work_stealing_deque&) = delete;

[[nodiscard]] std::size_t capacity() const { return buffer_.load(relaxed)->capacity(); }
[[nodiscard]] std::size_t size() const {
const auto bottom = bottom_.load(relaxed);
const auto top = top_.load(relaxed);
return static_cast<std::size_t>(bottom >= top ? bottom - top : 0);
}

[[nodiscard]] bool empty() const { return size() == 0; }
template <typename... Args>
void push_bottom(Args&&... args) {
// construct first in case it throws
T value(std::forward<Args>(args)...);
push_bottom(std::move(value));
}

void push_bottom(T value) {
auto bottom = bottom_.load(relaxed);
auto top = top_.load(acquire);
auto buffer = buffer_.load(relaxed);

if (buffer->capacity() < (bottom - top) + 1) {
garbage_.emplace_back(std::exchange(buffer, buffer->resize(top, bottom)));
buffer_.store(buffer, release);
}

buffer->store(bottom, std::move(value));

// this synchronizes with other acquire fences
// memory operations about this line cannot be reordered
std::atomic_thread_fence(release);

bottom_.store(bottom + 1, relaxed);
}

std::optional<T> take_bottom() {
auto bottom = bottom_.load(relaxed) - 1;
auto buffer = buffer_.load(relaxed);

// prevent stealing
bottom_.store(bottom, relaxed);

// this synchronizes with other release fences
// memory ops below this line cannot be reordered
std::atomic_thread_fence(acquire);

auto top = top_.load(relaxed);
if (top <= bottom) {
// queue isn't empty
if (top == bottom) {
// there is only 1 item left in the queue, we need the CAS to succeed
// since another thread may be trying to steal and could steal before we're able
// to take the bottom
if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) {
// failed race
bottom_.store(bottom + 1, relaxed);
return std::nullopt;
}
bottom_.store(bottom + 1, relaxed);
}
// there is more than one item in the queue, we can take the bottom
return buffer->load(bottom);
}
// queue is empty, reset bottom
bottom_.store(bottom + 1, relaxed);
return std::nullopt;
}

std::optional<T> pop_top() {
auto top = top_.load(acquire);
// this synchronizes with other release fences
// memory ops below this line cannot be reordered with ops above this line
std::atomic_thread_fence(acquire);
const auto bottom = bottom_.load(acquire);

if (top < bottom) {
// non-empty queue
auto buffer = buffer_.load(release);
auto temp = buffer->load(top, acquire);
if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) {
// failed the race
return std::nullopt;
}
return temp;
} else {
// deque is empty
return std::nullopt;
}
}
};
} // namespace dp
Loading

0 comments on commit 3e22869

Please sign in to comment.