-
Notifications
You must be signed in to change notification settings - Fork 311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multitask quic socket operations #601
Conversation
Backports to the stable branch are to be avoided unless absolutely necessary for fixing bugs, security issues, and perf regressions. Changes intended for backport should be structured such that a minimum effective diff can be committed separately from any refactoring, plumbing, cleanup, etc that are not strictly necessary to achieve the goal. Any of the latter should go only into master and ride the normal stabilization schedule. |
Backports to the beta branch are to be avoided unless absolutely necessary for fixing bugs, security issues, and perf regressions. Changes intended for backport should be structured such that a minimum effective diff can be committed separately from any refactoring, plumbing, cleanup, etc that are not strictly necessary to achieve the goal. Any of the latter should go only into master and ride the normal stabilization schedule. Exceptions include CI/metrics changes, CLI improvements and documentation updates on a case by case basis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is pretty much straight from @alessandrod's original work. i took the liberty of removing some debug statements, making the net-utils changes less invasive and a few other things noted below
@@ -2794,6 +2795,8 @@ pub struct NodeConfig { | |||
pub public_tpu_forwards_addr: Option<SocketAddr>, | |||
} | |||
|
|||
const QUIC_ENDPOINTS: usize = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't seem like this needed to be part of public api, so i moved it out of sdk
@@ -384,29 +384,67 @@ pub fn is_host_port(string: String) -> Result<(), String> { | |||
parse_host_port(&string).map(|_| ()) | |||
} | |||
|
|||
#[derive(Clone, Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kinda regretting not deriving Copy
after typing .clone()
more times than anticipated...
} | ||
|
||
impl Default for SocketConfig { | ||
#[allow(clippy::derivable_impls)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we can be explicit
std::iter::once(Ok(socket)) | ||
.chain((1..num).map(|_| bind_to_with_config(ip, port, config.clone()))) | ||
.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made this functional instead of imperative
@@ -37,6 +38,7 @@ x509-parser = { workspace = true } | |||
|
|||
[dev-dependencies] | |||
assert_matches = { workspace = true } | |||
socket2 = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from normal deps
@@ -2794,6 +2795,8 @@ pub struct NodeConfig { | |||
pub public_tpu_forwards_addr: Option<SocketAddr>, | |||
} | |||
|
|||
const QUIC_ENDPOINTS: usize = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're going with 10
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried with 4 -- there was about 20% degradation in term of throughput. I think 10 is an okay choice for now. I have not seen much memory usage increase under the spamming tool.
) | ||
.unwrap(); | ||
let quic_config = SocketConfig { | ||
reuseaddr: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to redeclare when it was cloned at 2831?
); | ||
let tpu_forwards_quic = | ||
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for clone for the last one
#[cfg(any(windows, target_os = "ios"))] | ||
fn udp_socket(_reuseaddr: bool) -> io::Result<Socket> { | ||
let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; | ||
Ok(sock) | ||
} | ||
|
||
#[cfg(any(windows, target_os = "ios"))] | ||
fn udp_socket_with_config(config: SocketConfig) -> io::Result<Socket> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config not used? -> _config
@@ -185,8 +224,37 @@ async fn run_server( | |||
stats.clone(), | |||
coalesce, | |||
)); | |||
|
|||
let mut accepts = incoming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please put some comments here why we are doing this select! it is not immediately obvious.
@@ -1274,15 +1366,29 @@ pub mod test { | |||
SocketAddr, | |||
Arc<StreamStats>, | |||
) { | |||
let s = UdpSocket::bind("127.0.0.1:0").unwrap(); | |||
let sockets = (0..10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a constant
while !exit.load(Ordering::Relaxed) { | ||
let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; | ||
let timeout_connection = select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we're looking for is just to await the FuturesUnordered, or use JoinSet. This can also be made to work with a timeout, by simply adding a timeout future to the FuturesUnordered or JoinSet (and having the futures return an option/result).
SO_REUSEPORT does not load balance as you would expect on RHEL 8 |
closing for #611 |
@ripatel-fd do you have a [cit] for this? because we looked into git history and stickiness was there since day 1 |
No, this is behavior observed when I tested on my box. I can write up a reprod C program and post a log of it running on my box. I'll send it to you in Discord once I have it. (I do believe you with the stickiness. Glad to hear it's there... maybe I just messed up my test infra) |
Problem
quinn uses a single task to do socket operations. not great
Summary of Changes
use SO_REUSEPORT to fanout socket operation to multiple
quinn::Endpoints