Skip to content

Commit

Permalink
General Cleanup (#539)
Browse files Browse the repository at this point in the history
* Some lint fixes for the chain library CI.

* Cleanup

- Cleaned up mutex usage in await to eliminate a flag, and directly protect the element being modified.
- Fixed a memory leak in Windows if scheduling work fails in the default executor.
- Made package_task<> move-only which simplifies promise by removing promise count.
- Removed unused reduction_failed error code.
- Added a reduction test for future<future<void>>
  • Loading branch information
sean-parent authored Mar 4, 2024
1 parent bc092d5 commit facbd55
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 47 deletions.
18 changes: 6 additions & 12 deletions stlab/concurrency/await.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,12 @@ T await(future<T> x) {

std::mutex m;
std::condition_variable condition;
bool flag{false};

future<T> result;

auto hold = std::move(x).recover(immediate_executor, [&](future<T>&& r) {
result = std::move(r);
{
std::unique_lock<std::mutex> lock{m};
flag = true;
result = std::move(r);
condition.notify_one(); // must notify under lock
}
});
Expand All @@ -122,23 +119,20 @@ T await(future<T> x) {
backoff *= 2) {
{
std::unique_lock<std::mutex> lock{m};
if (condition.wait_for(lock, backoff, [&] { return flag; })) {
break;
if (condition.wait_for(lock, backoff, [&] { return result.is_ready(); })) {
return detail::_get_ready_future<T>{}(std::move(result));
}
}
detail::pts().wake(); // try to wake something to unstick.
}

#else

{
std::unique_lock<std::mutex> lock{m};
condition.wait(lock, [&] { return flag; });
}
std::unique_lock<std::mutex> lock{m};
condition.wait(lock, [&] { return result.is_ready(); });
return detail::_get_ready_future<T>{}(std::move(result));

#endif

return detail::_get_ready_future<T>{}(std::move(result));
}

namespace detail {
Expand Down
23 changes: 14 additions & 9 deletions stlab/concurrency/default_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include <cassert>
#include <chrono>
#include <functional>
#include <memory>
#include <type_traits>
#include <utility>

#if STLAB_TASK_SYSTEM(LIBDISPATCH)
#include <dispatch/dispatch.h>
Expand Down Expand Up @@ -94,12 +96,12 @@ struct executor_type {
using result_type = void;

template <typename F>
auto operator()(F f) const -> std::enable_if_t<std::is_nothrow_invocable_v<F>> {
using f_t = decltype(f);
auto operator()(F&& f) const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
using f_t = std::decay_t<F>;

dispatch_group_async_f(detail::group()._group,
dispatch_get_global_queue(platform_priority(P), 0),
new f_t(std::move(f)), [](void* f_) {
new f_t(std::forward<F>(f)), [](void* f_) {
auto f = static_cast<f_t*>(f_);
(*f)();
delete f;
Expand Down Expand Up @@ -161,12 +163,13 @@ class task_system {

template <typename F>
void operator()(F&& f) {
auto work = CreateThreadpoolWork(&callback_impl<F>, new F(std::forward<F>(f)),
&_callBackEnvironment);
auto p = std::make_unique<F>(std::forward<F>(f));
auto work = CreateThreadpoolWork(&callback_impl<F>, p.get(), &_callBackEnvironment);

if (work == nullptr) {
throw std::bad_alloc();
}
p.release(); // ownership was passed to thread
SubmitThreadpoolWork(work);
}

Expand Down Expand Up @@ -461,12 +464,13 @@ template <executor_priority P = executor_priority::medium>
struct executor_type {
using result_type = void;

void operator()(task<void() noexcept>&& f) const {
template <class F>
auto operator()(F&& f) const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
static task_system<P> only_task_system{[] {
at_pre_exit([]() noexcept { only_task_system.join(); });
return task_system<P>{};
}()};
only_task_system(std::move(f));
only_task_system(std::forward<F>(f));
}
};

Expand All @@ -476,8 +480,9 @@ template <executor_priority P = executor_priority::medium>
struct executor_type {
using result_type = void;

void operator()(task<void() noexcept>&& f) const {
pts().execute<static_cast<std::size_t>(P)>(std::move(f));
template <class F>
auto operator()(F&& f) const -> std::enable_if_t<std::is_nothrow_invocable_v<std::decay_t<F>>> {
pts().execute<static_cast<std::size_t>(P)>(std::forward<F>(f));
}
};

Expand Down
39 changes: 14 additions & 25 deletions stlab/concurrency/future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ inline namespace v1 {

enum class future_error_codes { // names for futures errors
broken_promise = 1,
reduction_failed,
no_state
};

Expand All @@ -64,9 +63,6 @@ inline const char* Future_error_map(
case future_error_codes::no_state:
return "no state";

case future_error_codes::reduction_failed:
return "reduction failed";

default:
return nullptr;
}
Expand Down Expand Up @@ -246,8 +242,8 @@ struct shared_future {
template <typename... Args>
struct shared_task {
virtual ~shared_task() = default;

virtual void remove_promise() = 0;
virtual void add_promise() = 0;

virtual void operator()(Args... args) = 0;

Expand Down Expand Up @@ -504,24 +500,20 @@ struct shared_base<void> : std::enable_shared_from_this<shared_base<void>> {
};

template <typename R, typename... Args>
struct shared<R(Args...)> : shared_base<R>, shared_task<Args...> {
struct shared<R(Args...)> final : shared_base<R>, shared_task<Args...> {
using function_t = task<R(Args...)>;

std::atomic_size_t _promise_count{1};
function_t _f;

template <typename F>
shared(executor_t s, F&& f) : shared_base<R>(std::move(s)), _f(std::forward<F>(f)) {}

void remove_promise() override {
if ((--_promise_count == 0) && _f) {
this->set_exception(
std::make_exception_ptr(future_error(future_error_codes::broken_promise)));
}
if (!_f) return;
this->set_exception(
std::make_exception_ptr(future_error(future_error_codes::broken_promise)));
}

void add_promise() override { ++_promise_count; }

void operator()(Args... args) noexcept override {
if (!_f) return;

Expand All @@ -530,6 +522,7 @@ struct shared<R(Args...)> : shared_base<R>, shared_task<Args...> {
} catch (...) {
this->set_exception(std::current_exception());
}

_f = function_t();
}

Expand Down Expand Up @@ -571,14 +564,10 @@ class packaged_task {
if (auto p = _p.lock()) p->remove_promise();
}

packaged_task(const packaged_task& x) : _p(x._p) {
if (auto p = _p.lock()) p->add_promise();
}
packaged_task(const packaged_task&) = delete;
packaged_task& operator=(const packaged_task&) = delete;

packaged_task(packaged_task&&) noexcept = default;

packaged_task& operator=(const packaged_task& x) { return *this = packaged_task{x}; }

packaged_task& operator=(packaged_task&& x) noexcept = default;

template <typename... A>
Expand Down Expand Up @@ -1182,9 +1171,9 @@ auto apply_when_any_arg(F& f, P& p) {
}

template <std::size_t i, typename E, typename P, typename T>
void attach_when_arg_(E&& executor, std::shared_ptr<P>& p, T a) {
void attach_when_arg_(E&& executor, std::shared_ptr<P>& shared, T a) {
auto holds =
std::move(a).recover(std::forward<E>(executor), [_w = std::weak_ptr<P>(p)](auto x) {
std::move(a).recover(std::forward<E>(executor), [_w = std::weak_ptr<P>(shared)](auto x) {
auto p = _w.lock();
if (!p) return;

Expand All @@ -1194,8 +1183,8 @@ void attach_when_arg_(E&& executor, std::shared_ptr<P>& p, T a) {
p->template done<i>(std::move(x));
}
});
std::unique_lock<std::mutex> lock{p->_guard};
p->_holds[i] = std::move(holds);
std::unique_lock<std::mutex> lock{shared->_guard};
shared->_holds[i] = std::move(holds);
}

template <typename E, typename P, typename... Ts, std::size_t... I>
Expand Down Expand Up @@ -1670,8 +1659,8 @@ auto async(E executor, F&& f, Args&&... args)
executor,
std::bind<result_type>(
[_f = std::forward<F>(f)](
unwrap_reference_t<std::decay_t<Args>>&... args) mutable -> result_type {
return std::move(_f)(move_if<!is_reference_wrapper_v<std::decay_t<Args>>>(args)...);
unwrap_reference_t<std::decay_t<Args>>&... brgs) mutable -> result_type {
return std::move(_f)(move_if<!is_reference_wrapper_v<std::decay_t<Args>>>(brgs)...);
},
std::forward<Args>(args)...));

Expand Down
3 changes: 2 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ add_executable( stlab.test.future
future_when_any_range_tests.cpp
tuple_algorithm_test.cpp
main.cpp
future_test_helper.hpp )
future_test_helper.hpp
future_reduction_tests.cpp)

if( NOT STLAB_NO_STD_COROUTINES )
target_sources( stlab.test.future PUBLIC future_coroutine_tests.cpp )
Expand Down
25 changes: 25 additions & 0 deletions test/future_reduction_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2015 Adobe
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/

/**************************************************************************************************/

#include <stlab/concurrency/future.hpp>

#include <boost/test/unit_test.hpp>

#include <stlab/concurrency/utility.hpp>

using namespace stlab;

BOOST_AUTO_TEST_SUITE(reduction_tests)

BOOST_AUTO_TEST_CASE(void_void_reduction) {
auto f = make_ready_future(immediate_executor) |
[] { return make_ready_future(immediate_executor); };
BOOST_REQUIRE(!f.exception());
}

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit facbd55

Please sign in to comment.