Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Connection::send_datagram_wait #1740

Merged
merged 3 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,40 @@ pub struct Datagrams<'a> {
impl<'a> Datagrams<'a> {
/// Queue an unreliable, unordered datagram for immediate transmission
///
/// Returns `Err` iff a `len`-byte datagram cannot currently be sent
pub fn send(&mut self, data: Bytes) -> Result<(), SendDatagramError> {
/// If `drop` is true, previously queued datagrams which are still unsent may be discarded to
devsnek marked this conversation as resolved.
Show resolved Hide resolved
/// make space for this datagram, in order of oldest to newest. If `drop` is false, and there
/// isn't enough space due to previously queued datagrams, this function will return
/// `SendDatagramError::Blocked`. `Event::DatagramsUnblocked` will be emitted once datagrams
/// have been sent.
///
/// Returns `Err` iff a `len`-byte datagram cannot currently be sent.
pub fn send(&mut self, data: Bytes, drop: bool) -> Result<(), SendDatagramError> {
Ralith marked this conversation as resolved.
Show resolved Hide resolved
if self.conn.config.datagram_receive_buffer_size.is_none() {
return Err(SendDatagramError::Disabled);
}
let max = self
.max_size()
.ok_or(SendDatagramError::UnsupportedByPeer)?;
while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
let prev = self
.conn
.datagrams
.outgoing
.pop_front()
.expect("datagrams.outgoing_total desynchronized");
trace!(len = prev.data.len(), "dropping outgoing datagram");
self.conn.datagrams.outgoing_total -= prev.data.len();
}
if data.len() > max {
return Err(SendDatagramError::TooLarge);
}
if drop {
while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
let prev = self
.conn
.datagrams
.outgoing
.pop_front()
.expect("datagrams.outgoing_total desynchronized");
trace!(len = prev.data.len(), "dropping outgoing datagram");
self.conn.datagrams.outgoing_total -= prev.data.len();
}
} else if self.conn.datagrams.outgoing_total + data.len()
> self.conn.config.datagram_send_buffer_size
{
self.conn.datagrams.send_blocked = true;
return Err(SendDatagramError::Blocked(data));
}
self.conn.datagrams.outgoing_total += data.len();
self.conn.datagrams.outgoing.push_back(Datagram { data });
Ok(())
Expand Down Expand Up @@ -95,6 +108,7 @@ pub(super) struct DatagramState {
pub(super) incoming: VecDeque<Datagram>,
pub(super) outgoing: VecDeque<Datagram>,
pub(super) outgoing_total: usize,
pub(super) send_blocked: bool,
}

impl DatagramState {
Expand Down Expand Up @@ -167,4 +181,7 @@ pub enum SendDatagramError {
/// exceeded.
#[error("datagram too large")]
TooLarge,
/// Send would block
#[error("datagram send blocked")]
Blocked(Bytes),
}
8 changes: 8 additions & 0 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3149,15 +3149,21 @@ impl Connection {
}

// DATAGRAM
let mut sent_datagrams = false;
while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
match self.datagrams.write(buf, max_size) {
true => {
sent_datagrams = true;
sent.non_retransmits = true;
self.stats.frame_tx.datagram += 1;
}
false => break,
}
}
if self.datagrams.send_blocked && sent_datagrams {
self.events.push_back(Event::DatagramsUnblocked);
self.datagrams.send_blocked = false;
}

// STREAM
if space_id == SpaceId::Data {
Expand Down Expand Up @@ -3632,6 +3638,8 @@ pub enum Event {
Stream(StreamEvent),
/// One or more application datagrams have been received
DatagramReceived,
/// One or more application datagrams have been sent after blocking
DatagramsUnblocked,
}

fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
Expand Down
24 changes: 17 additions & 7 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,9 @@ fn datagram_send_recv() {
assert_matches!(pair.client_datagrams(client_ch).max_size(), Some(x) if x > 0);

const DATA: &[u8] = b"whee";
pair.client_datagrams(client_ch).send(DATA.into()).unwrap();
pair.client_datagrams(client_ch)
.send(DATA.into(), true)
.unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Expand Down Expand Up @@ -1665,9 +1667,15 @@ fn datagram_recv_buffer_overflow() {
const DATA1: &[u8] = &[0xAB; (WINDOW / 3) + 1];
const DATA2: &[u8] = &[0xBC; (WINDOW / 3) + 1];
const DATA3: &[u8] = &[0xCD; (WINDOW / 3) + 1];
pair.client_datagrams(client_ch).send(DATA1.into()).unwrap();
pair.client_datagrams(client_ch).send(DATA2.into()).unwrap();
pair.client_datagrams(client_ch).send(DATA3.into()).unwrap();
pair.client_datagrams(client_ch)
.send(DATA1.into(), true)
.unwrap();
pair.client_datagrams(client_ch)
.send(DATA2.into(), true)
.unwrap();
pair.client_datagrams(client_ch)
.send(DATA3.into(), true)
.unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Expand All @@ -1677,7 +1685,9 @@ fn datagram_recv_buffer_overflow() {
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA3);
assert_matches!(pair.server_datagrams(server_ch).recv(), None);

pair.client_datagrams(client_ch).send(DATA1.into()).unwrap();
pair.client_datagrams(client_ch)
.send(DATA1.into(), true)
.unwrap();
pair.drive();
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA1);
assert_matches!(pair.server_datagrams(server_ch).recv(), None);
Expand All @@ -1698,7 +1708,7 @@ fn datagram_unsupported() {
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
assert_matches!(pair.client_datagrams(client_ch).max_size(), None);

match pair.client_datagrams(client_ch).send(Bytes::new()) {
match pair.client_datagrams(client_ch).send(Bytes::new(), true) {
Err(SendDatagramError::UnsupportedByPeer) => {}
Err(e) => panic!("unexpected error: {e}"),
Ok(_) => panic!("unexpected success"),
Expand Down Expand Up @@ -2799,7 +2809,7 @@ fn pure_sender_voluntarily_acks() {
for _ in 0..100 {
const MSG: &[u8] = b"hello";
pair.client_datagrams(client_ch)
.send(Bytes::from_static(MSG))
.send(Bytes::from_static(MSG), true)
.unwrap();
pair.drive();
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), MSG);
Expand Down
85 changes: 79 additions & 6 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl Connection {
pub fn read_datagram(&self) -> ReadDatagram<'_> {
ReadDatagram {
conn: &self.0,
notify: self.0.shared.datagrams.notified(),
notify: self.0.shared.datagram_received.notified(),
}
}

Expand Down Expand Up @@ -392,19 +392,36 @@ impl Connection {
return Err(SendDatagramError::ConnectionLost(x.clone()));
}
use proto::SendDatagramError::*;
match conn.inner.datagrams().send(data) {
match conn.inner.datagrams().send(data, true) {
Ok(()) => {
conn.wake();
Ok(())
}
Err(e) => Err(match e {
Blocked(..) => unreachable!(),
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
Disabled => SendDatagramError::Disabled,
TooLarge => SendDatagramError::TooLarge,
}),
}
}

/// Transmit `data` as an unreliable, unordered application datagram
///
/// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
/// conditions, which effectively prioritizes old datagrams over new datagrams.
///
/// See [`send_datagram()`] for details.
///
/// [`send_datagram()`]: Connection::send_datagram
pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
Ralith marked this conversation as resolved.
Show resolved Hide resolved
SendDatagram {
conn: &self.0,
data: Some(data),
notify: self.0.shared.datagrams_unblocked.notified(),
}
}

/// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
///
/// Returns `None` if datagrams are unsupported by the peer or disabled locally.
Expand Down Expand Up @@ -744,12 +761,63 @@ impl Future for ReadDatagram<'_> {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => this.notify.set(this.conn.shared.datagrams.notified()),
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagram_received.notified()),
}
}
}
}

pin_project! {
/// Future produced by [`Connection::send_datagram_wait`]
pub struct SendDatagram<'a> {
conn: &'a ConnectionRef,
data: Option<Bytes>,
#[pin]
notify: Notified<'a>,
}
}

impl Future for SendDatagram<'_> {
type Output = Result<(), SendDatagramError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut state = this.conn.state.lock("SendDatagram::poll");
if let Some(ref e) = state.error {
return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
}
use proto::SendDatagramError::*;
match state
.inner
.datagrams()
.send(this.data.take().unwrap(), false)
{
Ok(()) => {
state.wake();
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(match e {
Blocked(data) => {
this.data.replace(data);
loop {
match this.notify.as_mut().poll(ctx) {
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagrams_unblocked.notified()),
}
}
}
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
Disabled => SendDatagramError::Disabled,
TooLarge => SendDatagramError::TooLarge,
})),
}
}
}

#[derive(Debug)]
pub(crate) struct ConnectionRef(Arc<ConnectionInner>);

Expand Down Expand Up @@ -838,7 +906,8 @@ pub(crate) struct Shared {
stream_budget_available: [Notify; 2],
/// Notified when the peer has initiated a new stream
stream_incoming: [Notify; 2],
datagrams: Notify,
datagram_received: Notify,
datagrams_unblocked: Notify,
closed: Notify,
}

Expand Down Expand Up @@ -969,7 +1038,10 @@ impl State {
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
}
DatagramReceived => {
shared.datagrams.notify_waiters();
shared.datagram_received.notify_waiters();
}
DatagramsUnblocked => {
shared.datagrams_unblocked.notify_waiters();
}
Stream(StreamEvent::Readable { id }) => {
if let Some(reader) = self.blocked_readers.remove(&id) {
Expand Down Expand Up @@ -1077,7 +1149,8 @@ impl State {
shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
shared.datagrams.notify_waiters();
shared.datagram_received.notify_waiters();
shared.datagrams_unblocked.notify_waiters();
for (_, x) in self.finishing.drain() {
let _ = x.send(Some(WriteError::ConnectionLost(reason.clone())));
}
Expand Down
2 changes: 1 addition & 1 deletion quinn/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ async fn two_datagram_readers() {
async {
server.send_datagram(b"one"[..].into()).unwrap();
done.notified().await;
server.send_datagram(b"two"[..].into()).unwrap();
server.send_datagram_wait(b"two"[..].into()).await.unwrap();
}
);
assert!(*a == *b"one" || *b == *b"one");
Expand Down