Skip to content

Commit

Permalink
task<> scheduler affinity (#290)
Browse files Browse the repository at this point in the history
This commit implements scheduler affinity -- aka "sticky" scheduling -- in `unifex::task<>`. The idea is that it is impossible for a child operation to cause the current coroutine to resume on the wrong execution context.

* `task<>`-based coroutines track and propagate the current scheduler
* `at_coroutine_exit` remembers current scheduler from when the cleanup action is scheduled
* `schedule` always returns an instance of `sender_for<schedule, the_sender>`,
  which is also a `scheduler_provider`
* scheduler affinity when co_await-ing senders in a `task<>`-returning coroutine
* scheduler affinity when co_await-ing awaitables in a `task<>`-returning coroutine
* awaitables and senders that are `blocking_kind::always_inline` don't get a thunk
* More senders and awaitables support compile-time blocking queries
* `co_await schedule(sched)` is magic in a `task<>`-returning coroutine: it changes execution
  context and schedules a cleanup action to transition back to the original scheduler

Move implementation of special co_await behavior of scheduler senders out of task.hpp

Hoist untyped RAII containers for coroutine_handle<> out of task<> and its awaiter (#329)

While looking at the binary size impact of adopting coroutines with
`unifex::task<>`, I noticed that a number of operations on
`coroutine_handle<T>` are expressed in `unifex::task<>` as if they
depend on `T` when they don't.  The consequence is extra code.

This diff creates a `coro_holder` class that uniquely owns a
`coroutine_handle<>` and makes `unifex::task<>` inherit from it.  We
technically lose some type safety, but it's still correct by
construction.  This change saves about 1.5 kilobytes in one of our apps.

Similar to the above, I noticed binary duplication due to false
template parameter dependencies in `unifex::task<>`'s awaiter type.

This diff hoists a non-type-specific RAII container for a
`coroutine_handle<>` that stores the handle as a `std::uintptr_t` so
that `task`'s awaiter can use the low bit as a dirty flag.  This change
saves another ~1.5 kilobytes in one of our apps.

Fix scheduler affinity (#405)

* Fix scheduler affinity

We have been storing a `task<>`'s scheduler as an `any_scheduler_ref`,
which has proven to be a source of use-after-free bugs.  This change
switches all the `any_scheduler_ref`s to `any_scheduler`s, fixing the
lifetime issues.

Make task<>'s thunk-on-resume unstoppable (#495)

* Make task<>'s thunk-on-resume unstoppable

When awaiting an async Sender that swallows done signals (such as
let_done(never_sender{}, just)), the user-level code looks like it
swallows done signals:
```
// never cancels
co_await let_done(never_sender{}, just);
```

However, `task<>`'s Scheduler affinity implementation transforms the
above code into this:
```
co_await typed_via(let_done(never_sender{}, just), <current scheduler>);
```

The `schedule()` operation inside the injected `typed_via` can emit done
if the current stop token has had stop requested, leading to very
non-obvious cancellation behaviour that can't be worked around.

This diff introduces a pair of regression tests that capture the above
scenario, and the analogous scenario of awaiting an async Awaitable that
completes with done.  The next diff will fix these failing tests.

* Change task<>'s thunk-on-resume to be unstoppable

This diff fixes the broken tests in the previous diff.

Respect blocking_kind in `let_value()` (#381)

* `let_value()` would always assume `blocking_kind::maybe`, which
  results in potentially unnecessary reschedule on resumption
* replicate `blocking_kind` customization from `finally()`

fix `variant_sender` blocking kind (#474)

add `unifex::v2::async_scope` (#463)

* simpler than `unifex::v1::async_scope` (`nest()` and `join()`)
* does not support cancellation

fixing linter error (#414)

move deduction guide to namespace scope for gcc-10

in scheduler concept, check copy_constructability after requiring call to schedule()

work around gcc-10 bugs

avoid warning about missing braces in initializer

back out change to awaiter_type_t

Co-authored-by: Eric Niebler <[email protected]>
Co-authored-by: Ian Petersen <[email protected]>
Co-authored-by: Ondrej Lehecka <[email protected]>
  • Loading branch information
4 people committed May 3, 2023
1 parent 63ce7c6 commit 69df6dc
Show file tree
Hide file tree
Showing 39 changed files with 1,547 additions and 304 deletions.
5 changes: 5 additions & 0 deletions include/unifex/allocate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unifex/sender_concepts.hpp>
#include <unifex/tag_invoke.hpp>
#include <unifex/bind_back.hpp>
#include <unifex/blocking.hpp>

#include <memory>
#include <type_traits>
Expand Down Expand Up @@ -105,6 +106,10 @@ namespace _alloc {
static_cast<Self&&>(s).sender_, (Receiver &&) r};
}

friend constexpr auto tag_invoke(tag_t<unifex::blocking>, const type& self) noexcept {
return blocking(self.sender_);
}

Sender sender_;
};
} // namespace _alloc
Expand Down
51 changes: 48 additions & 3 deletions include/unifex/any_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ struct _with<CPOs...>::any_scheduler {
return _sender{this};
}

type_index type() const noexcept {
return _get_type_index(impl_);
}

friend _equal_to_fn;
friend bool operator==(const any_scheduler& left, const any_scheduler& right) noexcept {
return _equal_to(left.impl_, right);
Expand All @@ -210,15 +214,44 @@ struct _with<CPOs...>::any_scheduler {
any_scheduler_impl<CPOs...> impl_;
};

template <typename... ReceiverCPOs>
using any_scheduler_ref_impl = any_ref_t<_schedule_and_connect<ReceiverCPOs...>>;
template <typename... CPOs>
using any_scheduler_ref_impl =
any_ref_t<
_schedule_and_connect<CPOs...>,
_get_type_index,
overload<bool(const this_&, const any_scheduler_ref<CPOs...>&) noexcept>(_equal_to)>;

#if defined(__GLIBCXX__)
template <typename>
inline constexpr bool _is_tuple = false;

template <typename... Ts>
inline constexpr bool _is_tuple<std::tuple<Ts...>> = true;

template <typename... Ts>
inline constexpr bool _is_tuple<std::tuple<Ts...> const> = true;
#endif

template <typename... CPOs>
struct _with<CPOs...>::any_scheduler_ref {
#if !defined(__GLIBCXX__)
template (typename Scheduler)
(requires (!same_as<const Scheduler, const any_scheduler_ref>) AND scheduler<Scheduler>)
(requires (!same_as<const Scheduler, const any_scheduler_ref>) AND
scheduler<Scheduler>)
/* implicit */ any_scheduler_ref(Scheduler& sched) noexcept
: impl_(sched) {}
#else
// Under-constrained implicit tuple converting constructor from a
// single argument doesn't exclude instances of the tuple type
// itself, so it is considered for copy/move constructors, leading
// to constraint recursion with the any_scheduler_ref constructor
// below.
template (typename Scheduler)
(requires (!same_as<const Scheduler, const any_scheduler_ref>) AND
(!_is_tuple<Scheduler>) AND scheduler<Scheduler>)
/* implicit */ any_scheduler_ref(Scheduler& sched) noexcept
: impl_(sched) {}
#endif

struct _sender {
template <template <class...> class Variant, template <class...> class Tuple>
Expand Down Expand Up @@ -257,14 +290,26 @@ struct _with<CPOs...>::any_scheduler_ref {
return _sender{this};
}

type_index type() const noexcept {
return _get_type_index(impl_);
}

// Shallow equality comparison by default, for regularity:
friend bool operator==(const any_scheduler_ref& left, const any_scheduler_ref& right) noexcept {
return left.impl_ == right.impl_;
}
friend bool operator!=(const any_scheduler_ref& left, const any_scheduler_ref& right) noexcept {
return !(left == right);
}

// Deep equality comparison:
friend _equal_to_fn;
bool equal_to(const any_scheduler_ref& that) const noexcept {
return _equal_to(impl_, that);
}

private:

any_scheduler_ref_impl<CPOs...> impl_;
};

Expand Down
2 changes: 1 addition & 1 deletion include/unifex/async_trace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ namespace _async_trace {
return operation<Receiver>{(Receiver &&) r};
}

friend blocking_kind tag_invoke(tag_t<blocking>, const sender&) noexcept {
friend auto tag_invoke(tag_t<blocking>, const sender&) noexcept {
return blocking_kind::always_inline;
}
};
Expand Down
45 changes: 38 additions & 7 deletions include/unifex/at_coroutine_exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
#include <unifex/tag_invoke.hpp>
#include <unifex/await_transform.hpp>
#include <unifex/continuations.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/stop_token_concepts.hpp>
#include <unifex/unstoppable_token.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/inline_scheduler.hpp>
#include <unifex/any_scheduler.hpp>
#include <unifex/blocking.hpp>

#if UNIFEX_NO_COROUTINES
# error "Coroutine support is required to use this header"
Expand Down Expand Up @@ -115,11 +119,19 @@ struct _cleanup_promise_base {
}
#endif

friend unstoppable_token tag_invoke(tag_t<get_stop_token>, const _cleanup_promise_base&) noexcept {
friend unstoppable_token
tag_invoke(tag_t<get_stop_token>, const _cleanup_promise_base&) noexcept {
return unstoppable_token{};
}

friend any_scheduler
tag_invoke(tag_t<get_scheduler>, const _cleanup_promise_base& p) noexcept {
return p.sched_;
}

inline static constexpr inline_scheduler _default_scheduler{};
continuation_handle<> continuation_{};
any_scheduler sched_{_default_scheduler};
bool isUnhandledDone_{false};
};

Expand All @@ -145,6 +157,15 @@ struct _die_on_done_rec {
UNIFEX_ASSERT(!"A cleanup action tried to cancel. Calling terminate...");
std::terminate();
}

template(typename CPO)
(requires is_receiver_query_cpo_v<CPO> AND
is_callable_v<CPO, const Receiver&>)
friend auto tag_invoke(CPO cpo, const type& p)
noexcept(is_nothrow_callable_v<CPO, const Receiver&>)
-> callable_result_t<CPO, const Receiver&> {
return cpo(p.rec_);
}
};
};

Expand Down Expand Up @@ -177,7 +198,7 @@ struct _die_on_done {
_die_on_done_rec_t<Receiver>{(Receiver&&) rec});
}

Sender sender_;
UNIFEX_NO_UNIQUE_ADDRESS Sender sender_;
};
};

Expand Down Expand Up @@ -229,7 +250,7 @@ struct _cleanup_promise : _cleanup_promise_base {
return unifex::await_transform(*this, _die_on_done_fn{}((Value&&) value));
}

std::tuple<Ts&...> args_;
UNIFEX_NO_UNIQUE_ADDRESS std::tuple<Ts&...> args_;
};

template <typename... Ts>
Expand All @@ -251,14 +272,24 @@ struct [[nodiscard]] _cleanup_task {
}

template <typename Promise>
bool await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
bool await_suspend_impl_(Promise& parent) noexcept {
continuation_.promise().continuation_ =
exchange_continuation(parent.promise(), continuation_);
exchange_continuation(parent, continuation_);
continuation_.promise().sched_ = get_scheduler(parent);
return false;
}

template <typename Promise>
bool await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
return await_suspend_impl_(parent.promise());
}

std::tuple<Ts&...> await_resume() noexcept {
return std::exchange(continuation_, {}).promise().args_;
return std::move(std::exchange(continuation_, {}).promise().args_);
}

friend constexpr auto tag_invoke(tag_t<blocking>, const _cleanup_task&) noexcept {
return blocking_kind::always_inline;
}

private:
Expand All @@ -275,7 +306,7 @@ namespace _at_coroutine_exit {
public:
template (typename Action, typename... Ts)
(requires callable<std::decay_t<Action>, std::decay_t<Ts>...>)
_cleanup_task<Ts...> operator()(Action&& action, Ts&&... ts) const {
_cleanup_task<std::decay_t<Ts>...> operator()(Action&& action, Ts&&... ts) const {
return _fn::at_coroutine_exit((Action&&) action, (Ts&&) ts...);
}
} at_coroutine_exit{};
Expand Down
17 changes: 10 additions & 7 deletions include/unifex/await_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ struct _awaitable_base<Promise, Value>::type {
struct _rec {
public:
explicit _rec(_expected<Value>* result, coro::coroutine_handle<Promise> continuation) noexcept
: result_(result)
, continuation_(continuation)
: result_(result)
, continuation_(continuation)
{}

_rec(_rec&& r) noexcept
: result_(std::exchange(r.result_, nullptr))
, continuation_(std::exchange(r.continuation_, nullptr))
: result_(std::exchange(r.result_, nullptr))
, continuation_(std::exchange(r.continuation_, nullptr))
{}

template(class... Us)
Expand Down Expand Up @@ -185,13 +185,16 @@ struct _awaitable<Promise, Sender>::type
template <typename Promise, typename Sender>
using _as_awaitable = typename _awaitable<Promise, Sender>::type;

inline const struct _fn {
struct _fn {
// Call custom implementation if present.
template(typename Promise, typename Value)
(requires tag_invocable<_fn, Promise&, Value>)
auto operator()(Promise& promise, Value&& value) const
noexcept(is_nothrow_tag_invocable_v<_fn, Promise&, Value>)
-> tag_invoke_result_t<_fn, Promise&, Value> {
static_assert(detail::_awaitable<tag_invoke_result_t<_fn, Promise&, Value>>,
"The return type of a customization of unifex::await_transform() "
"must satisfy the awaitable concept.");
return unifex::tag_invoke(_fn{}, promise, (Value&&)value);
}

Expand All @@ -218,7 +221,7 @@ inline const struct _fn {
return (Value&&) value;
}
}
} await_transform {};
};

} // namespace _await_tfx

Expand All @@ -231,7 +234,7 @@ inline const struct _fn {
//
// Coroutine promise_types can implement their .await_transform() methods to
// forward to this customisation point to enable use of type customisations.
using _await_tfx::await_transform;
inline constexpr _await_tfx::_fn await_transform {};

} // namespace unifex

Expand Down
75 changes: 67 additions & 8 deletions include/unifex/blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

namespace unifex {

enum class blocking_kind {
namespace _block {
enum class _enum {
// No guarantees about the timing and context on which the receiver will
// be called.
maybe,
maybe = 0,

// Always completes asynchronously.
// Guarantees that the receiver will not be called on the current thread
Expand All @@ -44,8 +45,46 @@ enum class blocking_kind {
always_inline
};

namespace _blocking {
inline const struct _fn {
struct blocking_kind {
template <_enum Kind>
using constant = std::integral_constant<_enum, Kind>;

blocking_kind() = default;

constexpr blocking_kind(_enum kind) noexcept
: value(kind)
{}

template <_enum Kind>
constexpr blocking_kind(constant<Kind>) noexcept
: value(Kind)
{}

constexpr operator _enum() const noexcept {
return value;
}

constexpr _enum operator()() const noexcept {
return value;
}

friend constexpr bool operator==(blocking_kind a, blocking_kind b) noexcept {
return a.value == b.value;
}

friend constexpr bool operator!=(blocking_kind a, blocking_kind b) noexcept {
return a.value != b.value;
}

static constexpr constant<_enum::maybe> maybe {};
static constexpr constant<_enum::never> never {};
static constexpr constant<_enum::always> always {};
static constexpr constant<_enum::always_inline> always_inline {};

_enum value{};
};

struct _fn {
template(typename Sender)
(requires tag_invocable<_fn, const Sender&>)
constexpr auto operator()(const Sender& s) const
Expand All @@ -55,12 +94,32 @@ inline const struct _fn {
}
template(typename Sender)
(requires (!tag_invocable<_fn, const Sender&>))
constexpr blocking_kind operator()(const Sender&) const noexcept {
constexpr auto operator()(const Sender&) const noexcept {
return blocking_kind::maybe;
}
};

namespace _cfn {
template <_enum Kind>
static constexpr auto _kind(blocking_kind::constant<Kind> kind) noexcept {
return kind;
}
static constexpr auto _kind(blocking_kind) noexcept {
return blocking_kind::maybe;
}
} blocking{};
} // namespace _blocking
using _blocking::blocking;

template <typename T>
constexpr auto cblocking() noexcept {
using blocking_t = remove_cvref_t<decltype(_fn{}(UNIFEX_DECLVAL(T&)))>;
return _cfn::_kind(blocking_t{});
}
}

} // namespace _block

inline constexpr _block::_fn blocking {};
using _block::_cfn::cblocking;
using _block::blocking_kind;

} // namespace unifex

Expand Down
Loading

0 comments on commit 69df6dc

Please sign in to comment.