Skip to content

Commit

Permalink
Fix makeUnorderedAsyncGenerator to not require generator draining
Browse files Browse the repository at this point in the history
Summary:
makeUnorderedAsyncGeneratorImpl allocates shared state on generator stack, which means that it requires the generator to be fully drained before it's destroyed. That's a contract that's very hard to enforce since in many cases it's fairly common for generators to be destroyed early.

This diff just moves the shared state to be heap allocated and managed by a shared_ptr.

Reviewed By: wangyishuo123

Differential Revision: D64585119

fbshipit-source-id: 2a42f943d1369e1fe48f46d6f11832458498744c
  • Loading branch information
andriigrynenko authored and facebook-github-bot committed Oct 19, 2024
1 parent 1f71891 commit a714d80
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 14 deletions.
32 changes: 18 additions & 14 deletions folly/coro/Collect-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,17 @@ auto makeUnorderedAsyncGeneratorImpl(
return [](AsyncScope& scopeParam,
InputRange awaitablesParam) -> AsyncGenerator<Item&&> {
auto [results, pipe] = AsyncPipe<Item, false>::create();
const CancellationSource cancelSource;
auto guard = folly::makeGuard([&] { cancelSource.requestCancellation(); });
struct SharedState {
explicit SharedState(AsyncPipe<Item, false>&& p) : pipe(std::move(p)) {}

AsyncPipe<Item, false> pipe;
const CancellationSource cancelSource;
};
auto sharedState = std::make_shared<SharedState>(std::move(pipe));
auto cancelToken = sharedState->cancelSource.getToken();

auto guard = folly::makeGuard(
[&] { sharedState->cancelSource.requestCancellation(); });
auto ex = co_await co_current_executor;
size_t expected = 0;
// Save the initial context and restore it after starting each task
Expand All @@ -246,24 +255,19 @@ auto makeUnorderedAsyncGeneratorImpl(
const auto context = RequestContext::saveContext();

for (auto&& semiAwaitable : static_cast<InputRange&&>(awaitablesParam)) {
auto task = [](auto semiAwaitableParam,
auto& cancelSourceParam,
auto& p) -> Task<void> {
auto task = [](auto semiAwaitableParam, auto state) -> Task<void> {
auto result = co_await co_awaitTry(std::move(semiAwaitableParam));
if (!result.hasValue() && !IsTry::value) {
cancelSourceParam.requestCancellation();
state->cancelSource.requestCancellation();
}
p.write(std::move(result));
}(static_cast<decltype(semiAwaitable)&&>(semiAwaitable),
cancelSource,
pipe);
state->pipe.write(std::move(result));
}(static_cast<decltype(semiAwaitable)&&>(semiAwaitable), sharedState);
if constexpr (std::is_same_v<AsyncScope, folly::coro::AsyncScope>) {
scopeParam.add(
co_withCancellation(cancelSource.getToken(), std::move(task))
.scheduleOn(ex));
co_withCancellation(cancelToken, std::move(task)).scheduleOn(ex));
} else {
static_assert(std::is_same_v<AsyncScope, CancellableAsyncScope>);
scopeParam.add(std::move(task).scheduleOn(ex), cancelSource.getToken());
scopeParam.add(std::move(task).scheduleOn(ex), cancelToken);
}
++expected;
RequestContext::setContext(context);
Expand All @@ -272,7 +276,7 @@ auto makeUnorderedAsyncGeneratorImpl(
while (expected > 0) {
CancellationCallback cancelCallback(
co_await co_current_cancellation_token,
[&]() noexcept { cancelSource.requestCancellation(); });
[&]() noexcept { sharedState->cancelSource.requestCancellation(); });

if constexpr (!IsTry::value) {
auto result = co_await co_awaitTry(results.next());
Expand Down
28 changes: 28 additions & 0 deletions folly/coro/test/CollectTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3457,3 +3457,31 @@ TEST_F(
co_await scope.joinAsync();
}());
}

TEST(MakeUnorderedAsyncGeneratorTest, GeneratorEarlyDestroy) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::coro::AsyncScope scope;
folly::CPUThreadPoolExecutor executor(2);

std::vector<folly::coro::TaskWithExecutor<int>> tasks;

tasks.push_back(folly::coro::co_invoke([]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
std::this_thread::sleep_for(std::chrono::seconds{2});
co_return 42;
}).scheduleOn(&executor));
tasks.push_back(folly::coro::co_invoke([]() -> folly::coro::Task<int> {
co_await folly::coro::co_reschedule_on_current_executor;
std::this_thread::sleep_for(std::chrono::seconds{1});
co_return 43;
}).scheduleOn(&executor));

{
auto gen =
folly::coro::makeUnorderedAsyncGenerator(scope, std::move(tasks));
EXPECT_EQ(43, *(co_await gen.next()));
}

co_await scope.joinAsync();
}());
}

0 comments on commit a714d80

Please sign in to comment.