Skip to content

Commit

Permalink
Feature: add adaptive primitives for both fiber and pthread context
Browse files Browse the repository at this point in the history
- Add FiberMutex primitive
- Add FiberConditionVariable primitive
- Add FiberSharedMutex primitive
- Add FiberSleepFor primitive
- Add FiberSleepUntil primitive
- Add FiberLatch primitive
- Add FiberEvent primitive
- Add futex_notifier_test
  • Loading branch information
hzlushiliang committed Dec 7, 2023
1 parent f99f77d commit 157b4c3
Show file tree
Hide file tree
Showing 22 changed files with 1,120 additions and 173 deletions.
1 change: 1 addition & 0 deletions trpc/coroutine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ cc_test(
srcs = ["fiber_test.cc"],
deps = [
"//trpc/coroutine/testing:fiber_runtime_test",
"//trpc/util:latch",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
Expand Down
15 changes: 13 additions & 2 deletions trpc/coroutine/fiber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "trpc/coroutine/fiber.h"

#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -151,8 +152,18 @@ void FiberYield() {
}

void FiberSleepUntil(const std::chrono::steady_clock::time_point& expires_at) {
fiber::detail::WaitableTimer wt(expires_at);
wt.wait();
if (trpc::fiber::detail::IsFiberContextPresent()) {
fiber::detail::WaitableTimer wt(expires_at);
wt.wait();
return;
}

auto now = ReadSteadyClock();
if (expires_at <= now) {
return;
}

std::this_thread::sleep_for(expires_at - now);
}

void FiberSleepFor(const std::chrono::nanoseconds& expires_in) {
Expand Down
12 changes: 6 additions & 6 deletions trpc/coroutine/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,23 @@ bool BatchStartFiberDetached(std::vector<Function<void()>>&& start_procs);
/// @note It only uses in fiber runtime.
void FiberYield();

/// @brief Block calling fiber until `expires_at`.
/// @note It only uses in fiber runtime.
/// @brief Block calling pthread or calling fiber until `expires_at`.
/// @note It can be used in pthread context and fiber context.
void FiberSleepUntil(const std::chrono::steady_clock::time_point& expires_at);

/// @brief Block calling fiber for `expires_in`.
/// @note It only uses in fiber runtime.
/// @brief Block calling pthread or calling fiber for `expires_in`.
/// @note It can be used in pthread context and fiber context.
void FiberSleepFor(const std::chrono::nanoseconds& expires_in);

/// @brief `SleepUntil` for clocks other than `std::steady_clock`.
/// @note It only uses in fiber runtime.
/// @note It can be used in pthread context and fiber context.
template <class Clock, class Duration>
void FiberSleepUntil(const std::chrono::time_point<Clock, Duration>& expires_at) {
return FiberSleepUntil(ReadSteadyClock() + (expires_at - Clock::now()));
}

/// @brief `SleepFor` for durations other than `std::chrono::nanoseconds`.
/// @note It only uses in fiber runtime.
/// @note It can be used in pthread context and fiber context.
template <class Rep, class Period>
void FiberSleepFor(const std::chrono::duration<Rep, Period>& expires_in) {
return FiberSleepFor(static_cast<std::chrono::nanoseconds>(expires_in));
Expand Down
3 changes: 1 addition & 2 deletions trpc/coroutine/fiber_condition_variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

namespace trpc {

/// @brief Analogous to `std::condition_variable`, but it's for fiber.
/// @note It only uses in fiber runtime.
/// @brief Adaptive condition variable primitive for both fiber and pthread context.
class FiberConditionVariable {
public:
/// @brief Wake up one waiter.
Expand Down
123 changes: 122 additions & 1 deletion trpc/coroutine/fiber_condition_variable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace trpc {

TEST(FiberConditionVariable, All) {
TEST(FiberConditionVariable, UseInFiberContext) {
RunAsFiber([] {
for (int k = 0; k != 10; ++k) {
constexpr auto N = 600;
Expand Down Expand Up @@ -64,4 +64,125 @@ TEST(FiberConditionVariable, All) {
});
}

TEST(FiberConditionVariable, UseInPthreadContext) {
constexpr auto N = 64;
std::atomic<std::size_t> run{0};
FiberMutex lock[N];
FiberConditionVariable cv[N];
bool set[N] = {false};
std::vector<std::thread> prod(N);
std::vector<std::thread> cons(N);

for (int i = 0; i != N; ++i) {
prod[i] = std::thread([&run, i, &cv, &lock, &set] {
FiberSleepFor(Random(20) * std::chrono::milliseconds(1));
std::unique_lock lk(lock[i]);
cv[i].wait(lk, [&] { return set[i]; });
++run;
});

cons[i] = std::thread([&run, i, &cv, &lock, &set] {
FiberSleepFor(Random(20) * std::chrono::milliseconds(1));
std::scoped_lock _(lock[i]);
set[i] = true;
cv[i].notify_one();
++run;
});
}

for (auto&& e : prod) {
ASSERT_TRUE(e.joinable());
e.join();
}

for (auto&& e : cons) {
ASSERT_TRUE(e.joinable());
e.join();
}

ASSERT_EQ(N * 2, run);
}

TEST(FiberConditionVariable, NotifyPthreadFromFiber) {
RunAsFiber([] {
constexpr auto N = 64;
std::atomic<std::size_t> run{0};
FiberMutex lock[N];
FiberConditionVariable cv[N];
bool set[N] = {false};
std::vector<std::thread> prod(N);
std::vector<Fiber> cons(N);

for (int i = 0; i != N; ++i) {
prod[i] = std::thread([&run, i, &cv, &lock, &set] {
FiberSleepFor(Random(20) * std::chrono::milliseconds(1));
std::unique_lock lk(lock[i]);
cv[i].wait(lk, [&] { return set[i]; });
++run;
});

cons[i] = Fiber([&run, i, &cv, &lock, &set] {
FiberSleepFor(Random(20) * std::chrono::milliseconds(1));
std::scoped_lock _(lock[i]);
set[i] = true;
cv[i].notify_one();
++run;
});
}

for (auto&& e : prod) {
ASSERT_TRUE(e.joinable());
e.join();
}

for (auto&& e : cons) {
ASSERT_TRUE(e.Joinable());
e.Join();
}

ASSERT_EQ(N * 2, run);
});
}

TEST(FiberConditionVariable, NotifyFiberFromPthread) {
RunAsFiber([] {
constexpr auto N = 64;
std::atomic<std::size_t> run{0};
FiberMutex lock[N];
FiberConditionVariable cv[N];
bool set[N] = {false};
std::vector<Fiber> prod(N);
std::vector<std::thread> cons(N);

for (int i = 0; i != N; ++i) {
prod[i] = Fiber([&run, i, &cv, &lock, &set] {
FiberSleepFor(Random(20) * std::chrono::milliseconds(1));
std::unique_lock lk(lock[i]);
cv[i].wait(lk, [&] { return set[i]; });
++run;
});

cons[i] = std::thread([&run, i, &cv, &lock, &set] {
FiberSleepFor(Random(20) * std::chrono::milliseconds(1));
std::scoped_lock _(lock[i]);
set[i] = true;
cv[i].notify_one();
++run;
});
}

for (auto&& e : prod) {
ASSERT_TRUE(e.Joinable());
e.Join();
}

for (auto&& e : cons) {
ASSERT_TRUE(e.joinable());
e.join();
}

ASSERT_EQ(N * 2, run);
});
}

} // namespace trpc
6 changes: 2 additions & 4 deletions trpc/coroutine/fiber_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@

namespace trpc {

/// @brief Event for fiber.
/// @brief Adaptive event primitive for both fiber and pthread context.
class FiberEvent {
public:
/// @brief Wait until `Set()` is called.
/// If `Set()` is called before `Wait()`, this method returns immediately.
/// @note This method only uses in fiber runtime.
void Wait() { event_.Wait(); }

/// @brief Wake up fibers blockings on `Wait()`.
/// @note You can call this method outside of fiber runtime.
/// @brief Wake up fibers and pthreads blockings on `Wait()`.
void Set() { event_.Set(); }

private:
Expand Down
57 changes: 54 additions & 3 deletions trpc/coroutine/fiber_event_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,70 @@

#include "gtest/gtest.h"

#include "trpc/coroutine/fiber.h"
#include "trpc/coroutine/testing/fiber_runtime.h"
#include "trpc/util/algorithm/random.h"

namespace trpc::testing {

TEST(FiberEvent, EventOnWakeup) {
TEST(FiberEvent, WaitInFiberSetFromPthread) {
RunAsFiber([]() {
for (int i = 0; i != 1000; ++i) {
auto ev = std::make_unique<FiberEvent>();
std::thread t([&] { ev->Set(); });
auto ev = std::make_unique<trpc::FiberEvent>();
std::thread t([&] {
// Random sleep to make Set before Wait or after Wait.
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Set();
});
// Random sleep to make Wait return immediately or awakened by Set.
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Wait();
t.join();
}
});
}

TEST(FiberEvent, WaitInFiberSetFromFiber) {
RunAsFiber([]() {
for (int i = 0; i != 1000; ++i) {
auto ev = std::make_unique<trpc::FiberEvent>();
trpc::Fiber f = trpc::Fiber([&] {
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Wait();
});
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Set();
f.Join();
}
});
}

TEST(FiberEvent, WaitInPthreadSetFromFiber) {
RunAsFiber([]() {
for (int i = 0; i != 1000; ++i) {
auto ev = std::make_unique<trpc::FiberEvent>();
std::thread t([&] {
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Wait();
});
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Set();
t.join();
}
});
}

TEST(FiberEvent, WaitInPthreadSetFromPthread) {
for (int i = 0; i != 1000; ++i) {
auto ev = std::make_unique<trpc::FiberEvent>();
std::thread t([&] {
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Wait();
});
trpc::FiberSleepFor(Random(10) * std::chrono::milliseconds(1));
ev->Set();
t.join();
}
}

} // namespace trpc::testing
3 changes: 1 addition & 2 deletions trpc/coroutine/fiber_latch.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

namespace trpc {

/// @brief Analogous to `std::latch`, but it's for fiber.
/// @note It only uses in fiber runtime.
/// @brief Adaptive latch primitive for both fiber and pthread context.
class FiberLatch {
public:
explicit FiberLatch(std::ptrdiff_t count);
Expand Down
Loading

0 comments on commit 157b4c3

Please sign in to comment.