-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change: add Mutex to AsyncRuntime #1213
Conversation
@drmingdrmer Looks like that issue has already been in your "bother" list 🥹 |
In Openraft T does not have to have a lifetime, so making T a From c72f15b7f6bd59f1667900c0cf6f2d8f9f0aa409 Mon Sep 17 00:00:00 2001
From: =?utf-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= <[email protected]>
Date: Wed, 31 Jul 2024 16:10:21 +0800
Subject: [PATCH] M
openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
---
.../type_config/async_runtime/impls/tokio_runtime.rs | 10 ++++------
openraft/src/type_config/async_runtime/mod.rs | 2 +-
openraft/src/type_config/async_runtime/mutex.rs | 9 +++------
3 files changed, 8 insertions(+), 13 deletions(-)
diff --git openraft/src/type_config/async_runtime/impls/tokio_runtime.rs openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
index d9e0475e..627fa910 100644
--- openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
+++ openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
@@ -70,21 +70,21 @@ impl AsyncRuntime for TokioRuntime {
}
#[inline]
fn thread_rng() -> Self::ThreadLocalRng {
rand::thread_rng()
}
type MpscUnbounded = TokioMpscUnbounded;
type Watch = TokioWatch;
type Oneshot = TokioOneshot;
- type Mutex<T: OptionalSend> = TokioMutex<T>;
+ type Mutex<T: OptionalSend + 'static> = TokioMutex<T>;
}
pub struct TokioMpscUnbounded;
impl MpscUnbounded for TokioMpscUnbounded {
type Sender<T: OptionalSend> = mpsc::UnboundedSender<T>;
type Receiver<T: OptionalSend> = mpsc::UnboundedReceiver<T>;
type WeakSender<T: OptionalSend> = mpsc::WeakUnboundedSender<T>;
/// Creates an unbounded mpsc channel for communicating between asynchronous
@@ -196,24 +196,22 @@ where T: OptionalSend
{
#[inline]
fn send(self, t: T) -> Result<(), T> {
self.send(t)
}
}
type TokioMutex<T> = tokio::sync::Mutex<T>;
impl<T> mutex::Mutex<T> for TokioMutex<T>
-where T: OptionalSend
+where T: OptionalSend + 'static
{
- type Guard<'a> = tokio::sync::MutexGuard<'a, T>
- where T:'a;
+ type Guard<'a> = tokio::sync::MutexGuard<'a, T>;
fn new(value: T) -> Self {
TokioMutex::new(value)
}
- fn lock<'a>(&'a self) -> impl Future<Output = Self::Guard<'a>> + OptionalSend
- where T: 'a {
+ fn lock<'a>(&'a self) -> impl Future<Output = Self::Guard<'a>> + OptionalSend {
self.lock()
}
}
diff --git openraft/src/type_config/async_runtime/mod.rs openraft/src/type_config/async_runtime/mod.rs
index 18b42e1a..69fcbed2 100644
--- openraft/src/type_config/async_runtime/mod.rs
+++ openraft/src/type_config/async_runtime/mod.rs
@@ -97,12 +97,12 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
/// This is a per-thread instance, which cannot be shared across threads or
/// sent to another thread.
fn thread_rng() -> Self::ThreadLocalRng;
type MpscUnbounded: MpscUnbounded;
type Watch: Watch;
type Oneshot: Oneshot;
- type Mutex<T: OptionalSend>: Mutex<T>;
+ type Mutex<T: OptionalSend + 'static>: Mutex<T>;
}
diff --git openraft/src/type_config/async_runtime/mutex.rs openraft/src/type_config/async_runtime/mutex.rs
index 070fa593..4f075601 100644
--- openraft/src/type_config/async_runtime/mutex.rs
+++ openraft/src/type_config/async_runtime/mutex.rs
@@ -1,21 +1,18 @@
use std::future::Future;
use std::ops::DerefMut;
use crate::OptionalSend;
use crate::OptionalSync;
/// Represents an implementation of an asynchronous Mutex.
-pub trait Mutex<T: OptionalSend>: OptionalSend + OptionalSync {
+pub trait Mutex<T: OptionalSend + 'static>: OptionalSend + OptionalSync {
/// Handle to an acquired lock, should release it when dropped.
type Guard<'a>: DerefMut<Target = T> + OptionalSend
- where
- T: 'a,
- Self: 'a;
+ where Self: 'a;
/// Creates a new lock.
fn new(value: T) -> Self;
/// Locks this Mutex.
- fn lock<'a>(&'a self) -> impl Future<Output = Self::Guard<'a>> + OptionalSend
- where T: 'a;
+ fn lock<'a>(&'a self) -> impl Future<Output = Self::Guard<'a>> + OptionalSend;
}
--
2.34.1
|
Awesome! Thanks for the help! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 8 files at r1, 3 of 3 files at r2, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @drmingdrmer)
openraft/src/type_config/util.rs
line 99 at r2 (raw file):
/// This is just a wrapper of /// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`). fn mutex_lock<T>(value: T) -> MutexOf<Self, T>
Why not just mutex()
?
At the usage site it sounds like you are already locking the mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @drmingdrmer)
openraft/src/type_config/util.rs
line 99 at r2 (raw file):
Previously, schreter wrote…
Why not just
mutex()
?At the usage site it sounds like you are already locking the mutex.
Let me rename it to mutex()
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 8 files at r1, 2 of 3 files at r2.
Reviewable status: 5 of 8 files reviewed, 2 unresolved discussions (waiting on @schreter and @SteveLauC)
openraft/src/type_config/util.rs
line 99 at r2 (raw file):
/// This is just a wrapper of /// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`). fn mutex_lock<T>(value: T) -> MutexOf<Self, T>
The function name mutex_lock()
could be misleading as it suggests an action of locking a mutex. A more descriptive name such as mutex_new()
or simply mutex()
would more clearly indicate that the function is used for creating a new mutex instance.
openraft/src/type_config/async_runtime/mutex.rs
line 10 at r2 (raw file):
pub trait Mutex<T: OptionalSend + 'static>: OptionalSend + OptionalSync { /// Handle to an acquired lock, should release it when dropped. type Guard<'a>: DerefMut<Target = T> + OptionalSend
The Guard needs to be Sync
to across .await
point. The following code does not compile without Sync
:
fn test<C: RaftTypeConfig>() {
let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
rt.block_on(async move { tokio::spawn(foo::<C>()) });
}
async fn foo<C: RaftTypeConfig>() {
let mutex = C::mutex_lock(());
let guard = mutex.lock().await;
let r = &guard;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
println!("{:?}", **r);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 8 files reviewed, 2 unresolved discussions (waiting on @drmingdrmer and @schreter)
openraft/src/type_config/util.rs
line 99 at r2 (raw file):
Previously, drmingdrmer (张炎泼) wrote…
The function name
mutex_lock()
could be misleading as it suggests an action of locking a mutex. A more descriptive name such asmutex_new()
or simplymutex()
would more clearly indicate that the function is used for creating a new mutex instance.
Renamed to mutex()
openraft/src/type_config/async_runtime/mutex.rs
line 10 at r2 (raw file):
Previously, drmingdrmer (张炎泼) wrote…
The Guard needs to be
Sync
to across.await
point. The following code does not compile withoutSync
:fn test<C: RaftTypeConfig>() { let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); rt.block_on(async move { tokio::spawn(foo::<C>()) }); } async fn foo<C: RaftTypeConfig>() { let mutex = C::mutex_lock(()); let guard = mutex.lock().await; let r = &guard; tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; println!("{:?}", **r); }
Good catch for this usage let r = &guard;
.
I have added the trait bound, though doing so will also require T
to be OptionalSync
(Required by the Sync impl of tokio::sync::MutexGuard
, but it should be applicable to all the MutexGuard impls), which in turn requires RaftTypeConfig::SnapshotData
to be OptionalSync
as it has been wrapped in a Mutex, see this commit:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 3 files at r3, 5 of 5 files at r4, all commit messages.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @schreter and @SteveLauC)
openraft/src/type_config.rs
line 75 at r4 (raw file):
/// /// [sto]: crate::docs::getting_started#3-implement-raftlogstorage-and-raftstatemachine type SnapshotData: OptionalSend + OptionalSync + 'static;
Hmm? why does it need to be Sync? Is there any compilation error without it?
openraft/src/type_config/util.rs
line 100 at r4 (raw file):
/// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`). fn mutex<T>(value: T) -> MutexOf<Self, T> where T: OptionalSend + OptionalSync {
T does not need to bo Sync. Mutex
does not allow multiple threads to share the reference to the T. At any time, there is only one thread using T, even when a Guard
cross .await
.
openraft/src/type_config/async_runtime/mod.rs
line 107 at r4 (raw file):
type Oneshot: Oneshot; type Mutex<T: OptionalSend + OptionalSync + 'static>: Mutex<T>;
T does not need to be Sync
openraft/src/type_config/async_runtime/mutex.rs
line 10 at r2 (raw file):
Previously, SteveLauC (SteveLauC) wrote…
Good catch for this usage
let r = &guard;
.I have added the trait bound, though doing so will also require
T
to beOptionalSync
(Required by the Sync impl oftokio::sync::MutexGuard
, but it should be applicable to all the MutexGuard impls), which in turn requiresRaftTypeConfig::SnapshotData
to beOptionalSync
as it has been wrapped in a Mutex, see this commit:)
T does not need to be Sync for Guard
to be Sync
. See my above comments.
openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
line 80 at r4 (raw file):
type Watch = TokioWatch; type Oneshot = TokioOneshot; type Mutex<T: OptionalSend + OptionalSync + 'static> = TokioMutex<T>;
T does not need Sync
openraft/src/type_config/async_runtime/impls/tokio_runtime.rs
line 206 at r4 (raw file):
impl<T> mutex::Mutex<T> for TokioMutex<T> where T: OptionalSend + OptionalSync + 'static
T does not need Sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 3 files at r3, 5 of 5 files at r4, all commit messages.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @SteveLauC)
openraft/src/type_config.rs
line 75 at r4 (raw file):
/// /// [sto]: crate::docs::getting_started#3-implement-raftlogstorage-and-raftstatemachine type SnapshotData: OptionalSend + OptionalSync + 'static;
This should not be needed.
openraft/src/type_config/util.rs
line 100 at r4 (raw file):
/// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`). fn mutex<T>(value: T) -> MutexOf<Self, T> where T: OptionalSend + OptionalSync {
T
doesn't have to be Sync
, since the mutex guarantees that only a single thread will access it at a time.
openraft/src/type_config/async_runtime/mod.rs
line 107 at r4 (raw file):
type Oneshot: Oneshot; type Mutex<T: OptionalSend + OptionalSync + 'static>: Mutex<T>;
Same here
openraft/src/type_config/async_runtime/mutex.rs
line 8 at r4 (raw file):
/// Represents an implementation of an asynchronous Mutex. pub trait Mutex<T: OptionalSend + OptionalSync + 'static>: OptionalSend + OptionalSync {
and here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 5 files at r5, all commit messages.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @SteveLauC)
Previously, drmingdrmer (张炎泼) wrote…
This is the error I got, as you can see, to make $ cargo c
Checking openraft v0.10.0 (openraft/openraft)
error[E0277]: `T` cannot be shared between threads safely
--> openraft/src/type_config/async_runtime/impls/tokio_runtime.rs:208:22
|
208 | type Guard<'a> = tokio::sync::MutexGuard<'a, T>;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `T` cannot be shared between threads safely
|
= note: required for `tokio::sync::MutexGuard<'a, T>` to implement `Sync`
note: required for `<tokio::sync::Mutex<T> as mutex::Mutex<T>>::Guard<'a>` to implement `OptionalSync`
--> openraft/src/base/mod.rs:20:15
|
20 | pub trait OptionalSync: Sync {}
| ^^^^^^^^^^^^
note: required by a bound in `mutex::Mutex::Guard`
--> openraft/src/type_config/async_runtime/mutex.rs:10:59
|
10 | type Guard<'a>: DerefMut<Target = T> + OptionalSend + OptionalSync
| ^^^^^^^^^^^^ required by this bound in `Mutex::Guard`
help: consider further restricting this bound
|
206 | where T: OptionalSend + 'static + std::marker::Sync
| +++++++++++++++++++
For more information about this error, try `rustc --explain E0277`.
error: could not compile `openraft` (lib) due to 1 previous error
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @SteveLauC)
openraft/src/type_config.rs
line 75 at r4 (raw file):
Previously, SteveLauC (SteveLauC) wrote…
This is the error I got, as you can see, to make
tokio::sync::MutexGuard
Sync,T
has to beSync
:$ cargo c Checking openraft v0.10.0 (openraft/openraft) error[E0277]: `T` cannot be shared between threads safely --> openraft/src/type_config/async_runtime/impls/tokio_runtime.rs:208:22 | 208 | type Guard<'a> = tokio::sync::MutexGuard<'a, T>; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `T` cannot be shared between threads safely | = note: required for `tokio::sync::MutexGuard<'a, T>` to implement `Sync` note: required for `<tokio::sync::Mutex<T> as mutex::Mutex<T>>::Guard<'a>` to implement `OptionalSync` --> openraft/src/base/mod.rs:20:15 | 20 | pub trait OptionalSync: Sync {} | ^^^^^^^^^^^^ note: required by a bound in `mutex::Mutex::Guard` --> openraft/src/type_config/async_runtime/mutex.rs:10:59 | 10 | type Guard<'a>: DerefMut<Target = T> + OptionalSend + OptionalSync | ^^^^^^^^^^^^ required by this bound in `Mutex::Guard` help: consider further restricting this bound | 206 | where T: OptionalSend + 'static + std::marker::Sync | +++++++++++++++++++ For more information about this error, try `rustc --explain E0277`. error: could not compile `openraft` (lib) due to 1 previous error
Hmmm... I do not quite get it. But It looks like T must be Sync then :(
@schreter would love to hear your thought as well:D |
And interestingly, that |
Right. It looks like it allows to send a reference of MutexGuard to another thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And interestingly, that
T: Sync
bound required bytokio::sync::MutexGuard
is added manually: https://docs.rs/tokio/latest/src/tokio/sync/mutex.rs.html#260
No, this only says that if T
is Sync
, so is MutexGuard
.
Reviewed 5 of 5 files at r5, all commit messages.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @drmingdrmer and @SteveLauC)
openraft/src/type_config.rs
line 75 at r4 (raw file):
Previously, drmingdrmer (张炎泼) wrote…
Hmmm... I do not quite get it. But It looks like T must be Sync then :(
Looking into Tokio code, the MutexGuard
is Sync
only if the T
is sync. Do we need the OptionalSync
bound here for the Guard
at all? Sending the Guard
to another thread is not a problem. Holding it across await point still just requires Send
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @schreter and @SteveLauC)
openraft/src/type_config.rs
line 75 at r4 (raw file):
Previously, schreter wrote…
Looking into Tokio code, the
MutexGuard
isSync
only if theT
is sync. Do we need theOptionalSync
bound here for theGuard
at all? Sending theGuard
to another thread is not a problem. Holding it across await point still just requiresSend
.
It looks like Sync is not required 🤔 by Openraft for now.
Remove OptionalSync bound and dd a Sync bound in future if it is required? @SteveLauC
Previously, drmingdrmer (张炎泼) wrote…
done |
Will squash the commits if you both think the PR is ready for merge:) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be OK now.
Reviewed 1 of 1 files at r6, all commit messages.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @drmingdrmer and @SteveLauC)
What does this PR do
As discussed in #1208, add the
Mutex
primitive to traitAsyncRuntime
.It is still a draft PR because I hit this issue Unexpected higher-ranked lifetime error in GAT usage and am still trying to find a workaround that works for us:Checklist
This change is