Skip to content

Commit

Permalink
feat(s2n-quic-platform): add unstable option for multiple sockets (#1867
Browse files Browse the repository at this point in the history
)
  • Loading branch information
camshaft authored Jul 13, 2023
1 parent 49141ce commit 2499439
Showing 1 changed file with 33 additions and 9 deletions.
42 changes: 33 additions & 9 deletions quic/s2n-quic-platform/src/io/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,21 @@ impl Io {

let mut consumers = vec![];

let (producer, consumer) = socket::ring::pair(entries, payload_len);
consumers.push(consumer);

// spawn a task that actually reads from the socket into the ring buffer
handle.spawn(task::rx(rx_socket, producer));
let rx_socket_count = parse_env("S2N_QUIC_UNSTABLE_RX_SOCKET_COUNT").unwrap_or(1);

for idx in 0usize..rx_socket_count {
let (producer, consumer) = socket::ring::pair(entries, payload_len);
consumers.push(consumer);

// spawn a task that actually reads from the socket into the ring buffer
if idx + 1 == rx_socket_count {
handle.spawn(task::rx(rx_socket, producer));
break;
} else {
let rx_socket = rx_socket.try_clone()?;
handle.spawn(task::rx(rx_socket, producer));
}
}

// construct the RX side for the endpoint event loop
let max_mtu = MaxMtu::try_from(payload_len as u16).unwrap();
Expand Down Expand Up @@ -202,11 +212,21 @@ impl Io {

let mut producers = vec![];

let (producer, consumer) = socket::ring::pair(entries, payload_len);
producers.push(producer);
let tx_socket_count = parse_env("S2N_QUIC_UNSTABLE_TX_SOCKET_COUNT").unwrap_or(1);

for idx in 0usize..tx_socket_count {
let (producer, consumer) = socket::ring::pair(entries, payload_len);
producers.push(producer);

// spawn a task that actually flushes the ring buffer to the socket
handle.spawn(task::tx(tx_socket, consumer, gso.clone()));
// spawn a task that actually flushes the ring buffer to the socket
if idx + 1 == tx_socket_count {
handle.spawn(task::tx(tx_socket, consumer, gso.clone()));
break;
} else {
let tx_socket = tx_socket.try_clone()?;
handle.spawn(task::tx(tx_socket, consumer, gso.clone()));
}
}

// construct the TX side for the endpoint event loop
socket::io::tx::Tx::new(producers, gso, max_mtu)
Expand Down Expand Up @@ -235,3 +255,7 @@ fn convert_addr_to_std(addr: socket2::SockAddr) -> io::Result<std::net::SocketAd
addr.as_socket()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "invalid domain for socket"))
}

fn parse_env<T: core::str::FromStr>(name: &str) -> Option<T> {
std::env::var(name).ok().and_then(|v| v.parse().ok())
}

0 comments on commit 2499439

Please sign in to comment.