diff --git a/quic/s2n-quic-core/src/datagram/default.rs b/quic/s2n-quic-core/src/datagram/default.rs index 0f0ea7e9f5..8f2a95d67b 100644 --- a/quic/s2n-quic-core/src/datagram/default.rs +++ b/quic/s2n-quic-core/src/datagram/default.rs @@ -253,6 +253,7 @@ pub struct Sender { capacity: usize, min_packet_space: usize, max_packet_space: usize, + dropped_datagrams: u64, smoothed_packet_size: f64, waker: Option, max_datagram_payload: u64, @@ -449,43 +450,64 @@ impl Sender { pub fn smoothed_packet_space(&self) -> usize { self.smoothed_packet_size as usize } + + /// Returns the number of datagrams that have been dropped by the sender + /// + /// The cause of drops is due to the datagrams being larger than the current path MTU. If this + /// number is non-zero, applications should try to send smaller datagrams. + #[inline] + pub fn dropped_datagrams(&self) -> u64 { + self.dropped_datagrams + } } impl super::Sender for Sender { + #[inline] fn on_transmit(&mut self, packet: &mut P) { // Cede space to stream data when datagrams are not prioritized if packet.has_pending_streams() && !packet.datagrams_prioritized() { return; } + self.record_capacity_stats(packet.remaining_capacity()); + let mut has_written = false; + while packet.remaining_capacity() > 0 { - if let Some(datagram) = self.queue.pop_front() { - // Ensure there is enough space in the packet to send a datagram - if packet.remaining_capacity() >= datagram.data.len() { - match packet.write_datagram(&datagram.data) { - Ok(()) => has_written = true, - Err(_error) => { - continue; - } - } - // Since a datagram was popped off the queue, wake the - // stored waker if we have one to let the application know - // that there is space on the queue for more datagrams. - if let Some(w) = self.waker.take() { - w.wake(); - } - } else { - // This check keeps us from popping all the datagrams off the - // queue when packet space remaining is smaller than the datagram. - if has_written { - self.queue.push_front(datagram); - return; - } + let Some(datagram) = self.queue.pop_front() else { + break; + }; + + // Ensure there is enough space in the packet to send a datagram + if packet.remaining_capacity() < datagram.data.len() { + // This check keeps us from popping all the datagrams off the + // queue when packet space remaining is smaller than the datagram. + if has_written { + self.queue.push_front(datagram); + break; + } + + // the datagram is too large for the current packet and unlikely to ever fit so + // record a metric and try the next datagram in the queue + self.dropped_datagrams += 1; + continue; + } + + match packet.write_datagram(&datagram.data) { + Ok(()) => has_written = true, + Err(_error) => { + // TODO log this + self.dropped_datagrams += 1; + continue; } - } else { - // If there are no datagrams on the queue we return - return; + } + } + + // If we now have additional capacity wake the stored waker if we have one to + // let the application know that there is space on the queue for more datagrams. + if self.capacity > self.queue.len() { + if let Some(w) = self.waker.take() { + w.wake(); } } } @@ -540,6 +562,7 @@ impl SenderBuilder { queue: VecDeque::with_capacity(self.queue_capacity), capacity: self.queue_capacity, max_datagram_payload: self.max_datagram_payload, + dropped_datagrams: 0, max_packet_space: 0, min_packet_space: 0, smoothed_packet_size: 0.0, @@ -780,6 +803,50 @@ mod tests { assert!(!default_sender.queue.is_empty()); } + /// Ensures the application waker is called when capacity becomes available + #[test] + fn wake_with_capacity() { + let (waker, wake_count) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let conn_info = ConnectionInfo::new(100, waker.clone()); + + let mut default_sender = Sender::builder() + .with_capacity(1) + .with_connection_info(&conn_info) + .build() + .unwrap(); + + let datagram = bytes::Bytes::from_static(&[1, 2, 3]); + + assert!(default_sender + .poll_send_datagram(&mut datagram.clone(), &mut cx) + .is_ready()); + assert!(default_sender + .poll_send_datagram(&mut datagram.clone(), &mut cx) + .is_pending()); + + assert_eq!(wake_count.get(), 0); + + // Packet size is just enough to write the first datagram with some + // room left over, but not enough to write the second. + let mut packet = MockPacket { + remaining_capacity: 2, + has_pending_streams: false, + datagrams_prioritized: false, + }; + crate::datagram::Sender::on_transmit(&mut default_sender, &mut packet); + + // Packet capacity has not changed + assert_eq!(packet.remaining_capacity, 2); + // Send queue is completely depleted + assert!(default_sender.queue.is_empty()); + // The waker was called since we now have capacity + assert_eq!(wake_count.get(), 1); + // The sender should record the number of dropped datagrams + assert_eq!(default_sender.dropped_datagrams(), 1); + } + fn fake_receive_context() -> crate::datagram::ReceiveContext<'static> { crate::datagram::ReceiveContext { path: crate::event::api::Path {