-
Notifications
You must be signed in to change notification settings - Fork 254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use tpu client next in validator with receiver #3893
base: master
Are you sure you want to change the base?
Use tpu client next in validator with receiver #3893
Conversation
If this PR represents a change to the public RPC API:
Thank you for keeping the RPC clients in sync with the server API @KirillLykov. |
core/src/validator.rs
Outdated
}; | ||
|
||
static GLOBAL_RUNTIME: LazyLock<TokioRuntime> = LazyLock::new(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, it would be better to define it validator/src/main.rs and pass to the Validator::new
. But doing so would require changing new which means a lot of changes in the codebase. I propose to do so in separate PR which is not necessary to backport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static is fine, but this needs a better name imo. The validator already uses a bunch
of runtimes, GLOBAL_RUNTIME suggests that this is "the" runtime, while this is
only used by the tpu client as far as I can tell.
STS_CLIENT_RUNTIME
?
9a2de64
to
4670c01
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! left some minor comments
core/src/validator.rs
Outdated
}; | ||
|
||
static GLOBAL_RUNTIME: LazyLock<TokioRuntime> = LazyLock::new(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static is fine, but this needs a better name imo. The validator already uses a bunch
of runtimes, GLOBAL_RUNTIME suggests that this is "the" runtime, while this is
only used by the tpu client as far as I can tell.
STS_CLIENT_RUNTIME
?
core/src/validator.rs
Outdated
}); | ||
|
||
// Function to get a handle to the runtime | ||
fn get_runtime_handle() -> tokio::runtime::Handle { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has one caller in new, I probably wouldn't make it a function, just inline
in the call site.
.my_contact_info() | ||
.tpu(connection_cache.protocol()) | ||
.unwrap(); | ||
let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now hardcodes QUIC - did the old code actually work with UDP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old code checks DEFAULT_CONNECTION_CACHE_USE_QUIC
which is true
. So the old code could work with UDP if this DEFAULT_CONNECTION_CACHE_USE_QUIC
modified in code to be false
(which means never).
What I think is sufficient to do for tests is to add check that client protocol is always QUIC
so that if someone changes the implementation of ConnectionCache to use UDP, tests will not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this is testing RPC and so it doesn't matter what the underlying
protocol is
core/src/validator.rs
Outdated
@@ -989,6 +1008,9 @@ impl Validator { | |||
|
|||
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); | |||
|
|||
// ConnectionCache might be used for JsonRpc and for Forwarding. Since | |||
// the later is not migrated yet to the tpu-client-next, create |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: latter
core/src/validator.rs
Outdated
key_notifies.push(client_updater); | ||
} else { | ||
// add connection_cache because it is still used in Forwarder. | ||
key_notifies.push(connection_cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this need to be added even if tpu_client_next is used for STS, or the
forwarder identity won't change?
let handle = self.join_and_cancel.clone(); | ||
|
||
let join_handle = { | ||
let Ok(mut lock) = handle.lock() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let Ok((token, handle)) = ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry but I don't understand what do you propose here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad you can't destructure there because the guard needs to be alive. But
below you should do:
let (handle, token) = lock;
token.cancel();
handle.take();
if you use lock.1 and lock.0 I as a reader have no idea what's what.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can take it:
let (handle, token) = std::mem::take(&mut *lock);
bf56164
to
5bca5cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
almost there, just a few more comments
.my_contact_info() | ||
.tpu(connection_cache.protocol()) | ||
.unwrap(); | ||
let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this is testing RPC and so it doesn't matter what the underlying
protocol is
rpc/src/rpc.rs
Outdated
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra newline? (how does this pass CI?)
let handle = self.join_and_cancel.clone(); | ||
|
||
let join_handle = { | ||
let Ok(mut lock) = handle.lock() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad you can't destructure there because the guard needs to be alive. But
below you should do:
let (handle, token) = lock;
token.cancel();
handle.take();
if you use lock.1 and lock.0 I as a reader have no idea what's what.
}; | ||
|
||
if let Some(join_handle) = join_handle { | ||
let Ok(result) = join_handle.await else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you don't need let else ... and then a match below. You can do everything
in the match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
if let Some(Ok(join_handle)) = join_handle.map(async |handle| handle.await) {
would be possible if async closures existed. But even in this case, I would like to have this log message about panic
.
5bca5cd
to
67a4ea3
Compare
67a4ea3
to
50fd073
Compare
@alessandrod added a commit with graceful exit of the tpu-client-next and also added a test for admin_rpc that checks that change of the identity and exit rpc calls work as expected. |
Problem
Summary of Changes