From 2b5cb23856018dc527870431d7b0998e180ed7c8 Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Fri, 11 Aug 2023 12:17:22 -0600 Subject: [PATCH] feat(s2n-quic-core): add task cooldown module (#1862) * feat(s2n-quic-core): add task cooldown module * add cooldown disabled test --- quic/s2n-quic-core/src/io/event_loop.rs | 5 + quic/s2n-quic-core/src/lib.rs | 1 + quic/s2n-quic-core/src/task.rs | 4 + quic/s2n-quic-core/src/task/cooldown.rs | 178 ++++++++++++++++++ quic/s2n-quic-platform/src/io/testing.rs | 1 + .../src/io/testing/socket.rs | 4 +- quic/s2n-quic-platform/src/io/tokio.rs | 29 ++- quic/s2n-quic-platform/src/io/tokio/task.rs | 7 +- .../src/io/tokio/task/simple.rs | 7 +- .../src/io/tokio/task/unix.rs | 7 +- quic/s2n-quic-platform/src/io/turmoil.rs | 1 + quic/s2n-quic-platform/src/io/xdp.rs | 5 +- quic/s2n-quic-platform/src/socket/io/tx.rs | 20 +- quic/s2n-quic-platform/src/socket/ring.rs | 30 ++- quic/s2n-quic-platform/src/socket/task/rx.rs | 82 ++++---- quic/s2n-quic-platform/src/socket/task/tx.rs | 81 ++++---- quic/s2n-quic-qns/src/client/interop.rs | 6 + quic/s2n-quic-qns/src/client/perf.rs | 2 +- quic/s2n-quic-qns/src/io.rs | 14 ++ quic/s2n-quic-qns/src/limits.rs | 51 +++++ quic/s2n-quic-qns/src/main.rs | 1 + quic/s2n-quic-qns/src/perf.rs | 38 ---- quic/s2n-quic-qns/src/server/interop.rs | 10 +- quic/s2n-quic-qns/src/server/perf.rs | 2 +- quic/s2n-quic-qns/src/xdp.rs | 15 +- tools/xdp/s2n-quic-xdp/src/io/rx.rs | 35 ++++ 26 files changed, 500 insertions(+), 136 deletions(-) create mode 100644 quic/s2n-quic-core/src/task.rs create mode 100644 quic/s2n-quic-core/src/task/cooldown.rs create mode 100644 quic/s2n-quic-qns/src/limits.rs diff --git a/quic/s2n-quic-core/src/io/event_loop.rs b/quic/s2n-quic-core/src/io/event_loop.rs index 67c97d2f87..365d48ba01 100644 --- a/quic/s2n-quic-core/src/io/event_loop.rs +++ b/quic/s2n-quic-core/src/io/event_loop.rs @@ -5,6 +5,7 @@ use crate::{ endpoint::Endpoint, event::{self, EndpointPublisher}, io::{rx::Rx, tx::Tx}, + task::cooldown::Cooldown, time::clock::{ClockWithTimer, Timer}, }; use core::pin::Pin; @@ -17,6 +18,7 @@ pub struct EventLoop { pub clock: C, pub rx: R, pub tx: T, + pub cooldown: Cooldown, } impl EventLoop @@ -33,6 +35,7 @@ where clock, mut rx, mut tx, + mut cooldown, } = self; /// Creates a event publisher with the endpoint's subscriber @@ -78,6 +81,8 @@ where // Concurrently poll all of the futures and wake up on the first one that's ready let select = Select::new(rx_ready, tx_ready, wakeups, timer_ready); + let select = cooldown.wrap(select); + let select::Outcome { rx_result, tx_result, diff --git a/quic/s2n-quic-core/src/lib.rs b/quic/s2n-quic-core/src/lib.rs index b1d56bdc88..87a904c8d2 100644 --- a/quic/s2n-quic-core/src/lib.rs +++ b/quic/s2n-quic-core/src/lib.rs @@ -77,6 +77,7 @@ pub mod slice; pub mod stateless_reset; pub mod stream; pub mod sync; +pub mod task; pub mod time; pub mod token; pub mod transmission; diff --git a/quic/s2n-quic-core/src/task.rs b/quic/s2n-quic-core/src/task.rs new file mode 100644 index 0000000000..8971175c39 --- /dev/null +++ b/quic/s2n-quic-core/src/task.rs @@ -0,0 +1,4 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod cooldown; diff --git a/quic/s2n-quic-core/src/task/cooldown.rs b/quic/s2n-quic-core/src/task/cooldown.rs new file mode 100644 index 0000000000..4ea6a1f95f --- /dev/null +++ b/quic/s2n-quic-core/src/task/cooldown.rs @@ -0,0 +1,178 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use pin_project_lite::pin_project; + +#[derive(Clone, Debug, Default)] +pub struct Cooldown { + credits: u16, + limit: u16, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Outcome { + /// The task should loop + Loop, + /// The task should return Pending and wait for an actual wake notification + Sleep, +} + +impl Outcome { + #[inline] + pub fn is_loop(&self) -> bool { + matches!(self, Self::Loop) + } + + #[inline] + pub fn is_sleep(&self) -> bool { + matches!(self, Self::Sleep) + } +} + +impl Cooldown { + #[inline] + pub fn new(limit: u16) -> Self { + Self { + limit, + credits: limit, + } + } + + #[inline] + pub fn state(&self) -> Outcome { + if self.credits > 0 { + Outcome::Loop + } else { + Outcome::Sleep + } + } + + /// Notifies the cooldown that the poll operation was ready + /// + /// This resets the cooldown period until another `Pending` result. + #[inline] + pub fn on_ready(&mut self) { + // reset the pending count + self.credits = self.limit; + } + + /// Notifies the cooldown that the poll operation was pending + /// + /// This consumes a cooldown credit until they are exhausted at which point the task should + /// sleep. + #[inline] + pub fn on_pending(&mut self) -> Outcome { + if self.credits > 0 { + self.credits -= 1; + return Outcome::Loop; + } + + Outcome::Sleep + } + + #[inline] + pub fn on_pending_task(&mut self, cx: &mut core::task::Context) -> Outcome { + let outcome = self.on_pending(); + + if outcome.is_loop() { + cx.waker().wake_by_ref(); + } + + outcome + } + + #[inline] + pub async fn wrap(&mut self, fut: F) -> F::Output + where + F: Future + Unpin, + { + Wrapped { + fut, + cooldown: self, + } + .await + } +} + +pin_project!( + struct Wrapped<'a, F> + where + F: core::future::Future, + { + #[pin] + fut: F, + cooldown: &'a mut Cooldown, + } +); + +impl<'a, F> Future for Wrapped<'a, F> +where + F: Future, +{ + type Output = F::Output; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + match this.fut.poll(cx) { + Poll::Ready(v) => { + this.cooldown.on_ready(); + Poll::Ready(v) + } + Poll::Pending => { + this.cooldown.on_pending_task(cx); + Poll::Pending + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cooldown_test() { + let mut cooldown = Cooldown::new(2); + + assert_eq!(cooldown.on_pending(), Outcome::Loop); + assert_eq!(cooldown.on_pending(), Outcome::Loop); + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + + // call on ready to restore credits + cooldown.on_ready(); + + assert_eq!(cooldown.on_pending(), Outcome::Loop); + assert_eq!(cooldown.on_pending(), Outcome::Loop); + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + + cooldown.on_ready(); + + // call on ready while we're still looping + assert_eq!(cooldown.on_pending(), Outcome::Loop); + cooldown.on_ready(); + + assert_eq!(cooldown.on_pending(), Outcome::Loop); + assert_eq!(cooldown.on_pending(), Outcome::Loop); + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + } + + #[test] + fn disabled_test() { + let mut cooldown = Cooldown::new(0); + + // with cooldown disabled, it should always return sleep + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + + cooldown.on_ready(); + assert_eq!(cooldown.on_pending(), Outcome::Sleep); + } +} diff --git a/quic/s2n-quic-platform/src/io/testing.rs b/quic/s2n-quic-platform/src/io/testing.rs index 037f359792..7ff3c7840c 100644 --- a/quic/s2n-quic-platform/src/io/testing.rs +++ b/quic/s2n-quic-platform/src/io/testing.rs @@ -316,6 +316,7 @@ impl Io { clock, tx, rx, + cooldown: Default::default(), }; let join = executor.spawn(event_loop.start()); Ok((join, handle)) diff --git a/quic/s2n-quic-platform/src/io/testing/socket.rs b/quic/s2n-quic-platform/src/io/testing/socket.rs index 6822799bfa..176f08eb9d 100644 --- a/quic/s2n-quic-platform/src/io/testing/socket.rs +++ b/quic/s2n-quic-platform/src/io/testing/socket.rs @@ -18,7 +18,7 @@ use std::{io, sync::Arc}; /// A task to receive on a socket pub async fn rx(socket: Socket, producer: ring::Producer) -> io::Result<()> { - let result = task::Receiver::new(producer, socket).await; + let result = task::Receiver::new(producer, socket, Default::default()).await; if let Some(err) = result { Err(err) } else { @@ -28,7 +28,7 @@ pub async fn rx(socket: Socket, producer: ring::Producer) -> io::Result /// A task to send on a socket pub async fn tx(socket: Socket, consumer: ring::Consumer, gso: Gso) -> io::Result<()> { - let result = task::Sender::new(consumer, socket, gso).await; + let result = task::Sender::new(consumer, socket, gso, Default::default()).await; if let Some(err) = result { Err(err) } else { diff --git a/quic/s2n-quic-platform/src/io/tokio.rs b/quic/s2n-quic-platform/src/io/tokio.rs index 3d0a20c28f..2c4ef6b012 100644 --- a/quic/s2n-quic-platform/src/io/tokio.rs +++ b/quic/s2n-quic-platform/src/io/tokio.rs @@ -8,6 +8,7 @@ use s2n_quic_core::{ inet::{self, SocketAddress}, io::event_loop::EventLoop, path::MaxMtu, + task::cooldown::Cooldown, time::Clock as ClockTrait, }; use std::{convert::TryInto, io, io::ErrorKind}; @@ -171,17 +172,21 @@ impl Io { let rx_socket_count = parse_env("S2N_QUIC_UNSTABLE_RX_SOCKET_COUNT").unwrap_or(1); + // configure the number of self-wakes before "cooling down" and waiting for epoll to + // complete + let rx_cooldown = cooldown("RX"); + for idx in 0usize..rx_socket_count { let (producer, consumer) = socket::ring::pair(entries, payload_len); consumers.push(consumer); // spawn a task that actually reads from the socket into the ring buffer if idx + 1 == rx_socket_count { - handle.spawn(task::rx(rx_socket, producer)); + handle.spawn(task::rx(rx_socket, producer, rx_cooldown)); break; } else { let rx_socket = rx_socket.try_clone()?; - handle.spawn(task::rx(rx_socket, producer)); + handle.spawn(task::rx(rx_socket, producer, rx_cooldown.clone())); } } @@ -214,17 +219,26 @@ impl Io { let tx_socket_count = parse_env("S2N_QUIC_UNSTABLE_TX_SOCKET_COUNT").unwrap_or(1); + // configure the number of self-wakes before "cooling down" and waiting for epoll to + // complete + let tx_cooldown = cooldown("TX"); + for idx in 0usize..tx_socket_count { let (producer, consumer) = socket::ring::pair(entries, payload_len); producers.push(producer); // spawn a task that actually flushes the ring buffer to the socket if idx + 1 == tx_socket_count { - handle.spawn(task::tx(tx_socket, consumer, gso.clone())); + handle.spawn(task::tx(tx_socket, consumer, gso.clone(), tx_cooldown)); break; } else { let tx_socket = tx_socket.try_clone()?; - handle.spawn(task::tx(tx_socket, consumer, gso.clone())); + handle.spawn(task::tx( + tx_socket, + consumer, + gso.clone(), + tx_cooldown.clone(), + )); } } @@ -241,6 +255,7 @@ impl Io { clock, rx, tx, + cooldown: cooldown("ENDPOINT"), } .start(), ); @@ -259,3 +274,9 @@ fn convert_addr_to_std(addr: socket2::SockAddr) -> io::Result(name: &str) -> Option { std::env::var(name).ok().and_then(|v| v.parse().ok()) } + +pub fn cooldown(direction: &str) -> Cooldown { + let name = format!("S2N_QUIC_UNSTABLE_COOLDOWN_{direction}"); + let limit = parse_env(&name).unwrap_or(0); + Cooldown::new(limit) +} diff --git a/quic/s2n-quic-platform/src/io/tokio/task.rs b/quic/s2n-quic-platform/src/io/tokio/task.rs index ab82fb65f0..a8b37c0fe8 100644 --- a/quic/s2n-quic-platform/src/io/tokio/task.rs +++ b/quic/s2n-quic-platform/src/io/tokio/task.rs @@ -24,20 +24,23 @@ macro_rules! libc_msg { mod $message { use super::unix; use crate::{features::Gso, message::$message::Message, socket::ring}; + use s2n_quic_core::task::cooldown::Cooldown; pub async fn rx>( socket: S, producer: ring::Producer, + cooldown: Cooldown, ) -> std::io::Result<()> { - unix::rx(socket, producer).await + unix::rx(socket, producer, cooldown).await } pub async fn tx>( socket: S, consumer: ring::Consumer, gso: Gso, + cooldown: Cooldown, ) -> std::io::Result<()> { - unix::tx(socket, consumer, gso).await + unix::tx(socket, consumer, gso, cooldown).await } } }; diff --git a/quic/s2n-quic-platform/src/io/tokio/task/simple.rs b/quic/s2n-quic-platform/src/io/tokio/task/simple.rs index 43ea714043..90f0137cdb 100644 --- a/quic/s2n-quic-platform/src/io/tokio/task/simple.rs +++ b/quic/s2n-quic-platform/src/io/tokio/task/simple.rs @@ -11,17 +11,19 @@ use crate::{ syscall::SocketEvents, }; use core::task::{Context, Poll}; +use s2n_quic_core::task::cooldown::Cooldown; use tokio::{io, net::UdpSocket}; pub async fn rx>( socket: S, producer: ring::Producer, + cooldown: Cooldown, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = UdpSocket::from_std(socket).unwrap(); - let result = task::Receiver::new(producer, socket).await; + let result = task::Receiver::new(producer, socket, cooldown).await; if let Some(err) = result { Err(err) } else { @@ -33,12 +35,13 @@ pub async fn tx>( socket: S, consumer: ring::Consumer, gso: Gso, + cooldown: Cooldown, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = UdpSocket::from_std(socket).unwrap(); - let result = task::Sender::new(consumer, socket, gso).await; + let result = task::Sender::new(consumer, socket, gso, cooldown).await; if let Some(err) = result { Err(err) } else { diff --git a/quic/s2n-quic-platform/src/io/tokio/task/unix.rs b/quic/s2n-quic-platform/src/io/tokio/task/unix.rs index ef649cd40e..07fa08f739 100644 --- a/quic/s2n-quic-platform/src/io/tokio/task/unix.rs +++ b/quic/s2n-quic-platform/src/io/tokio/task/unix.rs @@ -10,18 +10,20 @@ use crate::{ syscall::{SocketType, UnixMessage}, }; use core::task::{Context, Poll}; +use s2n_quic_core::task::cooldown::Cooldown; use std::{io, os::unix::io::AsRawFd}; use tokio::io::unix::AsyncFd; pub async fn rx, M: UnixMessage + Unpin>( socket: S, producer: ring::Producer, + cooldown: Cooldown, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = AsyncFd::new(socket).unwrap(); - let result = rx::Receiver::new(producer, socket).await; + let result = rx::Receiver::new(producer, socket, cooldown).await; if let Some(err) = result { Err(err) } else { @@ -33,12 +35,13 @@ pub async fn tx, M: UnixMessage + Unpin>( socket: S, consumer: ring::Consumer, gso: Gso, + cooldown: Cooldown, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = AsyncFd::new(socket).unwrap(); - let result = tx::Sender::new(consumer, socket, gso).await; + let result = tx::Sender::new(consumer, socket, gso, cooldown).await; if let Some(err) = result { Err(err) } else { diff --git a/quic/s2n-quic-platform/src/io/turmoil.rs b/quic/s2n-quic-platform/src/io/turmoil.rs index e1b87f9e4b..d568cc4114 100644 --- a/quic/s2n-quic-platform/src/io/turmoil.rs +++ b/quic/s2n-quic-platform/src/io/turmoil.rs @@ -115,6 +115,7 @@ impl Io { rx, tx, endpoint, + cooldown: Default::default(), } .start(); diff --git a/quic/s2n-quic-platform/src/io/xdp.rs b/quic/s2n-quic-platform/src/io/xdp.rs index 62c4dc484e..7bf5ba3842 100644 --- a/quic/s2n-quic-platform/src/io/xdp.rs +++ b/quic/s2n-quic-platform/src/io/xdp.rs @@ -5,7 +5,6 @@ use crate::io::tokio::Clock; use s2n_quic_core::{ endpoint::Endpoint, inet::SocketAddress, io::event_loop::EventLoop, path::MaxMtu, }; - pub use s2n_quic_core::{ io::rx, sync::{spsc, worker}, @@ -76,7 +75,8 @@ pub mod tx { producers.push(producer); // spawn a task that actually flushes the ring buffer to the socket - let task = crate::io::tokio::task::tx(socket, consumer, gso.clone()); + let cooldown = s2n_quic_core::task::cooldown::Cooldown::default(); + let task = crate::io::tokio::task::tx(socket, consumer, gso.clone(), cooldown); // construct the TX side for the endpoint event loop let io = crate::socket::io::tx::Tx::new(producers, gso, max_mtu); @@ -154,6 +154,7 @@ where clock, rx, tx, + cooldown: crate::io::tokio::cooldown("ENDPOINT"), }; // spawn the event loop on to the tokio handle diff --git a/quic/s2n-quic-platform/src/socket/io/tx.rs b/quic/s2n-quic-platform/src/socket/io/tx.rs index 33a4305fb5..9399caace6 100644 --- a/quic/s2n-quic-platform/src/socket/io/tx.rs +++ b/quic/s2n-quic-platform/src/socket/io/tx.rs @@ -352,17 +352,27 @@ impl<'a, T: Message> TxQueue<'a, T> { fn release_message(&mut self) { self.capacity -= 1; *self.is_full = self.capacity == 0; - self.message_index += 1; + + let channel = unsafe { + // Safety: the channel_index should always be in-bound if gso_segment is set + s2n_quic_core::assume!(self.channels.len() > self.channel_index); + &mut self.channels[self.channel_index] + }; + + channel.release_no_wake(1); + self.pending_release += 1; } /// Flushes the current channel and releases any pending messages #[inline] fn flush_channel(&mut self) { - if let Some(channel) = self.channels.get_mut(self.channel_index) { - channel.release(self.pending_release); - self.message_index = 0; - self.pending_release = 0; + if self.pending_release > 0 { + if let Some(channel) = self.channels.get_mut(self.channel_index) { + channel.wake(); + self.message_index = 0; + self.pending_release = 0; + } } } } diff --git a/quic/s2n-quic-platform/src/socket/ring.rs b/quic/s2n-quic-platform/src/socket/ring.rs index 0c42dae4a0..1e7c83b267 100644 --- a/quic/s2n-quic-platform/src/socket/ring.rs +++ b/quic/s2n-quic-platform/src/socket/ring.rs @@ -156,9 +156,19 @@ impl Consumer { /// Releases consumed messages to the producer #[inline] pub fn release(&mut self, release_len: u32) { + self.release_no_wake(release_len); + self.wake(); + } + + /// Releases consumed messages to the producer without waking the producer + #[inline] + pub fn release_no_wake(&mut self, release_len: u32) { self.cursor.release_consumer(release_len); + } - self.wakers.wake(); + #[inline] + pub fn wake(&self) { + self.wakers.wake() } /// Returns the currently acquired messages @@ -227,11 +237,23 @@ impl Producer { /// Releases ready-to-consume messages to the consumer #[inline] + #[allow(dead_code)] // even though this isn't used, it's kept for completeness pub fn release(&mut self, release_len: u32) { if release_len == 0 { return; } + self.release_no_wake(release_len); + self.wake(); + } + + /// Releases consumed messages to the producer without waking the producer + #[inline] + pub fn release_no_wake(&mut self, release_len: u32) { + if release_len == 0 { + return; + } + debug_assert!( release_len <= self.cursor.cached_producer_len(), "cannot release more messages than acquired" @@ -293,9 +315,11 @@ impl Producer { // finally release the len to the consumer self.cursor.release_producer(release_len); + } - // wake up the consumer to notify it of progress - self.wakers.wake(); + #[inline] + pub fn wake(&self) { + self.wakers.wake() } /// Returns the empty messages for the producer diff --git a/quic/s2n-quic-platform/src/socket/task/rx.rs b/quic/s2n-quic-platform/src/socket/task/rx.rs index 2e6477988a..39bbc266d2 100644 --- a/quic/s2n-quic-platform/src/socket/task/rx.rs +++ b/quic/s2n-quic-platform/src/socket/task/rx.rs @@ -10,7 +10,7 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use futures::ready; +use s2n_quic_core::task::cooldown::Cooldown; pub use events::RxEvents as Events; @@ -29,10 +29,8 @@ pub struct Receiver> { ring: Producer, /// Implementation of a socket that fills free slots in the ring buffer rx: S, - /// The number of messages that have been filled but not yet released to the consumer. - /// - /// This value is to avoid calling `release` too much and excessively waking up the consumer. - pending: u32, + ring_cooldown: Cooldown, + io_cooldown: Cooldown, } impl Receiver @@ -41,45 +39,42 @@ where S: Socket + Unpin, { #[inline] - pub fn new(ring: Producer, rx: S) -> Self { + pub fn new(ring: Producer, rx: S, cooldown: Cooldown) -> Self { Self { ring, rx, - pending: 0, + ring_cooldown: cooldown.clone(), + io_cooldown: cooldown.clone(), } } #[inline] fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll> { loop { - let count = match self.ring.poll_acquire(watermark, cx) { - Poll::Ready(count) => count, - Poll::Pending if self.pending == 0 => { - return if !self.ring.is_open() { - Err(()).into() - } else { - Poll::Pending - }; + let is_loop = self.ring_cooldown.state().is_loop(); + + let count = if is_loop { + self.ring.acquire(watermark) + } else { + match self.ring.poll_acquire(watermark, cx) { + Poll::Ready(count) => count, + Poll::Pending if !self.ring.is_open() => return Err(()).into(), + Poll::Pending => 0, } - Poll::Pending => 0, }; // if the number of free slots increased since last time then yield - if count > self.pending { + if count > 0 { + self.ring_cooldown.on_ready(); return Ok(()).into(); } - // If there is no additional capacity available (i.e. we have filled all slots), - // then release those filled slots for the consumer to read from. Once - // the consumer reads, we will have spare capacity to populate again. - self.release(); - } - } + if is_loop && self.ring_cooldown.on_pending_task(cx).is_sleep() { + continue; + } - #[inline] - fn release(&mut self) { - let to_release = core::mem::take(&mut self.pending); - self.ring.release(to_release); + return Poll::Pending; + } } } @@ -96,26 +91,43 @@ where let mut events = Events::default(); + let mut pending_wake = false; + while !events.take_blocked() { - if ready!(this.poll_ring(u32::MAX, cx)).is_err() { - return None.into(); + match this.poll_ring(u32::MAX, cx) { + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(_)) => return None.into(), + Poll::Pending => { + if pending_wake { + this.ring.wake(); + } + return Poll::Pending; + } } - // slice the ring data by the number of slots we've already filled - let entries = &mut this.ring.data()[this.pending as usize..]; + let entries = this.ring.data(); // perform the recv syscall match this.rx.recv(cx, entries, &mut events) { - Ok(()) => { + Ok(_) => { // increment the number of received messages - this.pending += events.take_count() as u32 + let count = events.take_count() as u32; + + if count > 0 { + this.ring.release_no_wake(count); + this.io_cooldown.on_ready(); + pending_wake = true; + } } Err(err) => return Some(err).into(), } } - // release any of the messages we wrote back to the consumer - this.release(); + this.io_cooldown.on_pending_task(cx); + + if pending_wake { + this.ring.wake(); + } Poll::Pending } diff --git a/quic/s2n-quic-platform/src/socket/task/tx.rs b/quic/s2n-quic-platform/src/socket/task/tx.rs index 6fcdbf1792..63e238a58c 100644 --- a/quic/s2n-quic-platform/src/socket/task/tx.rs +++ b/quic/s2n-quic-platform/src/socket/task/tx.rs @@ -11,7 +11,7 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use futures::ready; +use s2n_quic_core::task::cooldown::Cooldown; pub use events::TxEvents as Events; @@ -30,11 +30,9 @@ pub struct Sender> { ring: Consumer, /// Implementation of a socket that transmits filled slots in the ring buffer tx: S, - /// The number of messages that have been transmitted but not yet released to the producer. - /// - /// This value is to avoid calling `release` too much and excessively waking up the producer. - pending: u32, events: Events, + ring_cooldown: Cooldown, + io_cooldown: Cooldown, } impl Sender @@ -43,46 +41,43 @@ where S: Socket + Unpin, { #[inline] - pub fn new(ring: Consumer, tx: S, gso: Gso) -> Self { + pub fn new(ring: Consumer, tx: S, gso: Gso, cooldown: Cooldown) -> Self { Self { ring, tx, - pending: 0, events: Events::new(gso), + ring_cooldown: cooldown.clone(), + io_cooldown: cooldown.clone(), } } #[inline] fn poll_ring(&mut self, watermark: u32, cx: &mut Context) -> Poll> { loop { - let count = match self.ring.poll_acquire(watermark, cx) { - Poll::Ready(count) => count, - Poll::Pending if self.pending == 0 => { - return if !self.ring.is_open() { - Err(()).into() - } else { - Poll::Pending - }; + let is_loop = self.ring_cooldown.state().is_loop(); + + let count = if is_loop { + self.ring.acquire(watermark) + } else { + match self.ring.poll_acquire(watermark, cx) { + Poll::Ready(count) => count, + Poll::Pending if !self.ring.is_open() => return Err(()).into(), + Poll::Pending => 0, } - Poll::Pending => 0, }; // if the number of free slots increased since last time then yield - if count > self.pending { + if count > 0 { + self.ring_cooldown.on_ready(); return Ok(()).into(); } - // If there is no additional capacity available (i.e. we have filled all slots), - // then release those filled slots for the consumer to read from. Once - // the consumer reads, we will have spare capacity to populate again. - self.release(); - } - } + if is_loop && self.ring_cooldown.on_pending_task(cx).is_sleep() { + continue; + } - #[inline] - fn release(&mut self) { - let to_release = core::mem::take(&mut self.pending); - self.ring.release(to_release); + return Poll::Pending; + } } } @@ -97,26 +92,44 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.get_mut(); + let mut pending_wake = false; + while !this.events.take_blocked() { - if ready!(this.poll_ring(u32::MAX, cx)).is_err() { - return None.into(); + match this.poll_ring(u32::MAX, cx) { + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(_)) => return None.into(), + Poll::Pending => { + if pending_wake { + this.ring.wake(); + } + return Poll::Pending; + } } // slice the ring data by the number of items we've already received - let entries = &mut this.ring.data()[this.pending as usize..]; + let entries = this.ring.data(); // perform the send syscall match this.tx.send(cx, entries, &mut this.events) { - Ok(()) => { + Ok(_) => { // increment the number of received messages - this.pending += this.events.take_count() as u32 + let count = this.events.take_count() as u32; + + if count > 0 { + this.ring.release_no_wake(count); + this.io_cooldown.on_ready(); + pending_wake = true; + } } Err(err) => return Some(err).into(), } } - // release any of the messages we wrote back to the consumer - this.release(); + this.io_cooldown.on_pending_task(cx); + + if pending_wake { + this.ring.wake(); + } Poll::Pending } diff --git a/quic/s2n-quic-qns/src/client/interop.rs b/quic/s2n-quic-qns/src/client/interop.rs index ca5ae78065..f019687862 100644 --- a/quic/s2n-quic-qns/src/client/interop.rs +++ b/quic/s2n-quic-qns/src/client/interop.rs @@ -43,6 +43,9 @@ pub struct Interop { #[structopt(min_values = 1, required = true)] requests: Vec, + #[structopt(flatten)] + limits: crate::limits::Limits, + #[structopt(flatten)] io: crate::io::Client, @@ -147,8 +150,11 @@ impl Interop { fn client(&self) -> Result { let io = self.io.build()?; + let limits = self.limits.limits(); + let client = Client::builder() .with_io(io)? + .with_limits(limits)? .with_event(event::tracing::Subscriber::default())?; let client = self.tls.build(client, &self.application_protocols)?; diff --git a/quic/s2n-quic-qns/src/client/perf.rs b/quic/s2n-quic-qns/src/client/perf.rs index 367d6dadfb..6bb6eabac2 100644 --- a/quic/s2n-quic-qns/src/client/perf.rs +++ b/quic/s2n-quic-qns/src/client/perf.rs @@ -39,7 +39,7 @@ pub struct Perf { streams: u64, #[structopt(flatten)] - limits: perf::Limits, + limits: crate::limits::Limits, /// Logs statistics for the endpoint #[structopt(long)] diff --git a/quic/s2n-quic-qns/src/io.rs b/quic/s2n-quic-qns/src/io.rs index 8ce7355ebd..9a7983b139 100644 --- a/quic/s2n-quic-qns/src/io.rs +++ b/quic/s2n-quic-qns/src/io.rs @@ -72,6 +72,12 @@ pub struct Client { #[structopt(long, default_value = "9000")] pub max_mtu: u16, + #[structopt(long)] + pub queue_recv_buffer_size: Option, + + #[structopt(long)] + pub queue_send_buffer_size: Option, + #[structopt(short, long, default_value = "::")] pub local_ip: std::net::IpAddr, @@ -102,6 +108,14 @@ impl Client { io_builder = io_builder.with_gso_disabled()?; } + if let Some(size) = self.queue_send_buffer_size { + io_builder = io_builder.with_internal_send_buffer_size(size)?; + } + + if let Some(size) = self.queue_recv_buffer_size { + io_builder = io_builder.with_internal_recv_buffer_size(size)?; + } + Ok(io_builder.build()?) } } diff --git a/quic/s2n-quic-qns/src/limits.rs b/quic/s2n-quic-qns/src/limits.rs new file mode 100644 index 0000000000..abfd8ee9b9 --- /dev/null +++ b/quic/s2n-quic-qns/src/limits.rs @@ -0,0 +1,51 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Debug, structopt::StructOpt)] +pub struct Limits { + /// The maximum bits/sec for each connection + #[structopt(long, default_value = "150")] + pub max_throughput: u64, + + /// The expected RTT in milliseconds + #[structopt(long, default_value = "100")] + pub expected_rtt: u64, + + #[structopt(long)] + pub stream_send_buffer_size: Option, +} + +impl Limits { + pub fn limits(&self) -> s2n_quic::provider::limits::Limits { + let data_window = self.data_window(); + + let mut limits = s2n_quic::provider::limits::Limits::default(); + + limits = limits + .with_data_window(data_window) + .unwrap() + .with_max_send_buffer_size(data_window.min(u32::MAX as _) as _) + .unwrap() + .with_bidirectional_local_data_window(data_window) + .unwrap() + .with_bidirectional_remote_data_window(data_window) + .unwrap() + .with_unidirectional_data_window(data_window) + .unwrap(); + + if let Some(size) = self.stream_send_buffer_size { + limits = limits.with_max_send_buffer_size(size).unwrap(); + } + + limits + } + + fn data_window(&self) -> u64 { + s2n_quic_core::transport::parameters::compute_data_window( + self.max_throughput, + core::time::Duration::from_millis(self.expected_rtt), + 2, + ) + .as_u64() + } +} diff --git a/quic/s2n-quic-qns/src/main.rs b/quic/s2n-quic-qns/src/main.rs index c096516151..98ffa895a1 100644 --- a/quic/s2n-quic-qns/src/main.rs +++ b/quic/s2n-quic-qns/src/main.rs @@ -10,6 +10,7 @@ mod client; mod file; mod interop; mod io; +mod limits; mod perf; mod runtime; mod server; diff --git a/quic/s2n-quic-qns/src/perf.rs b/quic/s2n-quic-qns/src/perf.rs index e87e62c671..706a00c10a 100644 --- a/quic/s2n-quic-qns/src/perf.rs +++ b/quic/s2n-quic-qns/src/perf.rs @@ -88,41 +88,3 @@ pub async fn read_stream_size(stream: &mut ReceiveStream) -> Result<(u64, Bytes) Ok((id, chunk)) } - -#[derive(Debug, structopt::StructOpt)] -pub struct Limits { - /// The maximum bits/sec for each connection - #[structopt(long, default_value = "150")] - pub max_throughput: u64, - - /// The expected RTT in milliseconds - #[structopt(long, default_value = "100")] - pub expected_rtt: u64, -} - -impl Limits { - pub fn limits(&self) -> s2n_quic::provider::limits::Limits { - let data_window = self.data_window(); - - s2n_quic::provider::limits::Limits::default() - .with_data_window(data_window) - .unwrap() - .with_max_send_buffer_size(data_window.min(u32::MAX as _) as _) - .unwrap() - .with_bidirectional_local_data_window(data_window) - .unwrap() - .with_bidirectional_remote_data_window(data_window) - .unwrap() - .with_unidirectional_data_window(data_window) - .unwrap() - } - - fn data_window(&self) -> u64 { - s2n_quic_core::transport::parameters::compute_data_window( - self.max_throughput, - core::time::Duration::from_millis(self.expected_rtt), - 2, - ) - .as_u64() - } -} diff --git a/quic/s2n-quic-qns/src/server/interop.rs b/quic/s2n-quic-qns/src/server/interop.rs index 4901c9c4ec..4d02489b81 100644 --- a/quic/s2n-quic-qns/src/server/interop.rs +++ b/quic/s2n-quic-qns/src/server/interop.rs @@ -31,6 +31,9 @@ pub struct Interop { #[structopt(long, env = "TESTCASE", possible_values = &Testcase::supported(is_supported_testcase))] testcase: Option, + #[structopt(flatten)] + limits: crate::limits::Limits, + #[structopt(flatten)] tls: tls::Server, @@ -81,15 +84,18 @@ impl Interop { max_handshakes = 0; } - let limits = endpoint_limits::Default::builder() + let endpoint_limits = endpoint_limits::Default::builder() .with_inflight_handshake_limit(max_handshakes)? .build()?; + let limits = self.limits.limits(); + let io = self.io.build()?; let server = Server::builder() .with_io(io)? - .with_endpoint_limits(limits)? + .with_endpoint_limits(endpoint_limits)? + .with_limits(limits)? .with_event(( EventSubscriber(1), s2n_quic::provider::event::tracing::Subscriber::default(), diff --git a/quic/s2n-quic-qns/src/server/perf.rs b/quic/s2n-quic-qns/src/server/perf.rs index 4f5eedd099..6f6403f153 100644 --- a/quic/s2n-quic-qns/src/server/perf.rs +++ b/quic/s2n-quic-qns/src/server/perf.rs @@ -22,7 +22,7 @@ pub struct Perf { connections: Option, #[structopt(flatten)] - limits: perf::Limits, + limits: crate::limits::Limits, /// Logs statistics for the endpoint #[structopt(long)] diff --git a/quic/s2n-quic-qns/src/xdp.rs b/quic/s2n-quic-qns/src/xdp.rs index b84f817487..335f479fe7 100644 --- a/quic/s2n-quic-qns/src/xdp.rs +++ b/quic/s2n-quic-qns/src/xdp.rs @@ -11,12 +11,16 @@ use s2n_quic::provider::io::{ xdp::{ bpf, encoder, if_xdp::{self, XdpFlags}, - io::{self as xdp_io}, + io::{ + self as xdp_io, + rx::{Driver as _, WithCooldown}, + }, ring, socket, syscall, tx::{self, TxExt as _}, umem, Provider, }, }; +use s2n_quic_core::task::cooldown::Cooldown; use std::{ffi::CString, net::SocketAddr, os::unix::io::AsRawFd, sync::Arc}; use structopt::StructOpt; use tokio::{io::unix::AsyncFd, net::UdpSocket}; @@ -47,6 +51,9 @@ pub struct Xdp { #[structopt(long)] no_checksum: bool, + + #[structopt(long, default_value)] + rx_cooldown: u16, } #[derive(Clone, Copy, Debug)] @@ -88,7 +95,7 @@ impl From for programs::xdp::XdpFlags { type SetupResult = Result<( umem::Umem, - Vec>>>, + Vec>>>>, Vec<(u32, socket::Fd)>, Vec>, )>; @@ -158,10 +165,12 @@ impl Xdp { // put descriptors in the Fill queue fill.init((&mut desc).take(rx_queue_len as _)); + let cooldown = Cooldown::new(self.rx_cooldown); + rx_channels.push(xdp_io::rx::Channel { rx, fill, - driver: async_fd.clone(), + driver: async_fd.clone().with_cooldown(cooldown), }); }; diff --git a/tools/xdp/s2n-quic-xdp/src/io/rx.rs b/tools/xdp/s2n-quic-xdp/src/io/rx.rs index 84eb4f17f2..39e710c389 100644 --- a/tools/xdp/s2n-quic-xdp/src/io/rx.rs +++ b/tools/xdp/s2n-quic-xdp/src/io/rx.rs @@ -10,6 +10,7 @@ use s2n_quic_core::{ io::rx, slice::zip, sync::atomic_waker, + task::cooldown::{self, Cooldown}, xdp::{decoder, path}, }; @@ -28,6 +29,16 @@ pub trait ErrorLogger: Send { /// Drives the Rx and Fill rings forward pub trait Driver: 'static { fn poll(&mut self, rx: &mut ring::Rx, fill: &mut ring::Fill, cx: &mut Context) -> Option; + + fn with_cooldown(self, cooldown: Cooldown) -> WithCooldown + where + Self: Sized, + { + WithCooldown { + driver: self, + cooldown, + } + } } impl Driver for atomic_waker::Handle { @@ -65,6 +76,30 @@ impl Driver for atomic_waker::Handle { } } +pub struct WithCooldown { + driver: D, + cooldown: Cooldown, +} + +impl Driver for WithCooldown { + #[inline] + fn poll(&mut self, rx: &mut ring::Rx, fill: &mut ring::Fill, cx: &mut Context) -> Option { + let mut count = rx.acquire(u32::MAX); + count = fill.acquire(count).min(count); + + // we have items to receive and fill so return + if count > 0 { + self.cooldown.on_ready(); + return Some(count); + } + + match self.cooldown.on_pending_task(cx) { + cooldown::Outcome::Loop => return Some(0), + cooldown::Outcome::Sleep => return self.driver.poll(rx, fill, cx), + } + } +} + pub struct Channel { pub rx: ring::Rx, pub fill: ring::Fill,