diff --git a/quic/s2n-quic-platform/src/io/tokio.rs b/quic/s2n-quic-platform/src/io/tokio.rs index a6b3ed6c17..d203b9ca06 100644 --- a/quic/s2n-quic-platform/src/io/tokio.rs +++ b/quic/s2n-quic-platform/src/io/tokio.rs @@ -168,11 +168,21 @@ impl Io { let mut consumers = vec![]; - let (producer, consumer) = socket::ring::pair(entries, payload_len); - consumers.push(consumer); - - // spawn a task that actually reads from the socket into the ring buffer - handle.spawn(task::rx(rx_socket, producer)); + let rx_socket_count = parse_env("S2N_QUIC_UNSTABLE_RX_SOCKET_COUNT").unwrap_or(1); + + for idx in 0usize..rx_socket_count { + let (producer, consumer) = socket::ring::pair(entries, payload_len); + consumers.push(consumer); + + // spawn a task that actually reads from the socket into the ring buffer + if idx + 1 == rx_socket_count { + handle.spawn(task::rx(rx_socket, producer)); + break; + } else { + let rx_socket = rx_socket.try_clone()?; + handle.spawn(task::rx(rx_socket, producer)); + } + } // construct the RX side for the endpoint event loop let max_mtu = MaxMtu::try_from(payload_len as u16).unwrap(); @@ -201,11 +211,21 @@ impl Io { let mut producers = vec![]; - let (producer, consumer) = socket::ring::pair(entries, payload_len); - producers.push(producer); + let tx_socket_count = parse_env("S2N_QUIC_UNSTABLE_TX_SOCKET_COUNT").unwrap_or(1); + + for idx in 0usize..tx_socket_count { + let (producer, consumer) = socket::ring::pair(entries, payload_len); + producers.push(producer); - // spawn a task that actually flushes the ring buffer to the socket - handle.spawn(task::tx(tx_socket, consumer, gso.clone())); + // spawn a task that actually flushes the ring buffer to the socket + if idx + 1 == tx_socket_count { + handle.spawn(task::tx(tx_socket, consumer, gso.clone())); + break; + } else { + let tx_socket = tx_socket.try_clone()?; + handle.spawn(task::tx(tx_socket, consumer, gso.clone())); + } + } // construct the TX side for the endpoint event loop socket::io::tx::Tx::new(producers, gso, max_mtu) @@ -234,3 +254,7 @@ fn convert_addr_to_std(addr: socket2::SockAddr) -> io::Result(name: &str) -> Option { + std::env::var(name).ok().and_then(|v| v.parse().ok()) +}