Skip to content

Commit

Permalink
add Connection::send_datagram_wait
Browse files Browse the repository at this point in the history
  • Loading branch information
devsnek committed Jan 18, 2024
1 parent 7723cbc commit 55376a3
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 25 deletions.
41 changes: 29 additions & 12 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,40 @@ pub struct Datagrams<'a> {
impl<'a> Datagrams<'a> {
/// Queue an unreliable, unordered datagram for immediate transmission
///
/// Returns `Err` iff a `len`-byte datagram cannot currently be sent
pub fn send(&mut self, data: Bytes) -> Result<(), SendDatagramError> {
/// If `drop` is true, previously queued datagrams which are still unsent may be discarded to
/// make space for this datagram, in order of oldest to newest. If `drop` is false, and there
/// isn't enough space due to previously queued datagrams, this function will return
/// `SendDatagramError::Blocked`. `Event::DatagramsUnblocked` will be emitted once datagrams
/// have been sent.
///
/// Returns `Err` iff a `len`-byte datagram cannot currently be sent.
pub fn send(&mut self, data: Bytes, drop: bool) -> Result<(), SendDatagramError> {
if self.conn.config.datagram_receive_buffer_size.is_none() {
return Err(SendDatagramError::Disabled);
}
let max = self
.max_size()
.ok_or(SendDatagramError::UnsupportedByPeer)?;
while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
let prev = self
.conn
.datagrams
.outgoing
.pop_front()
.expect("datagrams.outgoing_total desynchronized");
trace!(len = prev.data.len(), "dropping outgoing datagram");
self.conn.datagrams.outgoing_total -= prev.data.len();
}
if data.len() > max {
return Err(SendDatagramError::TooLarge);
}
if drop {
while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
let prev = self
.conn
.datagrams
.outgoing
.pop_front()
.expect("datagrams.outgoing_total desynchronized");
trace!(len = prev.data.len(), "dropping outgoing datagram");
self.conn.datagrams.outgoing_total -= prev.data.len();
}
} else if self.conn.datagrams.outgoing_total + data.len()
> self.conn.config.datagram_send_buffer_size
{
self.conn.datagrams.send_blocked = true;
return Err(SendDatagramError::Blocked(data));
}
self.conn.datagrams.outgoing_total += data.len();
self.conn.datagrams.outgoing.push_back(Datagram { data });
Ok(())
Expand Down Expand Up @@ -95,6 +108,7 @@ pub(super) struct DatagramState {
pub(super) incoming: VecDeque<Datagram>,
pub(super) outgoing: VecDeque<Datagram>,
pub(super) outgoing_total: usize,
pub(super) send_blocked: bool,
}

impl DatagramState {
Expand Down Expand Up @@ -167,4 +181,7 @@ pub enum SendDatagramError {
/// exceeded.
#[error("datagram too large")]
TooLarge,
/// Send would block
#[error("datagram send blocked")]
Blocked(Bytes),
}
8 changes: 8 additions & 0 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3095,15 +3095,21 @@ impl Connection {
}

// DATAGRAM
let mut sent_datagrams = false;
while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
match self.datagrams.write(buf, max_size) {
true => {
sent_datagrams = true;
sent.non_retransmits = true;
self.stats.frame_tx.datagram += 1;
}
false => break,
}
}
if self.datagrams.send_blocked && sent_datagrams {
self.events.push_back(Event::DatagramsUnblocked);
self.datagrams.send_blocked = false;
}

// STREAM
if space_id == SpaceId::Data {
Expand Down Expand Up @@ -3596,6 +3602,8 @@ pub enum Event {
Stream(StreamEvent),
/// One or more application datagrams have been received
DatagramReceived,
/// One or more application datagrams have been sent after blocking
DatagramsUnblocked,
}

struct PathResponse {
Expand Down
22 changes: 16 additions & 6 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,9 @@ fn datagram_send_recv() {
assert_matches!(pair.client_datagrams(client_ch).max_size(), Some(x) if x > 0);

const DATA: &[u8] = b"whee";
pair.client_datagrams(client_ch).send(DATA.into()).unwrap();
pair.client_datagrams(client_ch)
.send(DATA.into(), true)
.unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Expand Down Expand Up @@ -1628,9 +1630,15 @@ fn datagram_recv_buffer_overflow() {
const DATA1: &[u8] = &[0xAB; (WINDOW / 3) + 1];
const DATA2: &[u8] = &[0xBC; (WINDOW / 3) + 1];
const DATA3: &[u8] = &[0xCD; (WINDOW / 3) + 1];
pair.client_datagrams(client_ch).send(DATA1.into()).unwrap();
pair.client_datagrams(client_ch).send(DATA2.into()).unwrap();
pair.client_datagrams(client_ch).send(DATA3.into()).unwrap();
pair.client_datagrams(client_ch)
.send(DATA1.into(), true)
.unwrap();
pair.client_datagrams(client_ch)
.send(DATA2.into(), true)
.unwrap();
pair.client_datagrams(client_ch)
.send(DATA3.into(), true)
.unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Expand All @@ -1640,7 +1648,9 @@ fn datagram_recv_buffer_overflow() {
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA3);
assert_matches!(pair.server_datagrams(server_ch).recv(), None);

pair.client_datagrams(client_ch).send(DATA1.into()).unwrap();
pair.client_datagrams(client_ch)
.send(DATA1.into(), true)
.unwrap();
pair.drive();
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA1);
assert_matches!(pair.server_datagrams(server_ch).recv(), None);
Expand All @@ -1661,7 +1671,7 @@ fn datagram_unsupported() {
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
assert_matches!(pair.client_datagrams(client_ch).max_size(), None);

match pair.client_datagrams(client_ch).send(Bytes::new()) {
match pair.client_datagrams(client_ch).send(Bytes::new(), true) {
Err(SendDatagramError::UnsupportedByPeer) => {}
Err(e) => panic!("unexpected error: {e}"),
Ok(_) => panic!("unexpected success"),
Expand Down
85 changes: 79 additions & 6 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl Connection {
pub fn read_datagram(&self) -> ReadDatagram<'_> {
ReadDatagram {
conn: &self.0,
notify: self.0.shared.datagrams.notified(),
notify: self.0.shared.datagram_received.notified(),
}
}

Expand Down Expand Up @@ -392,19 +392,36 @@ impl Connection {
return Err(SendDatagramError::ConnectionLost(x.clone()));
}
use proto::SendDatagramError::*;
match conn.inner.datagrams().send(data) {
match conn.inner.datagrams().send(data, true) {
Ok(()) => {
conn.wake();
Ok(())
}
Err(e) => Err(match e {
Blocked(..) => unreachable!(),
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
Disabled => SendDatagramError::Disabled,
TooLarge => SendDatagramError::TooLarge,
}),
}
}

/// Transmit `data` as an unreliable, unordered application datagram
///
/// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
/// conditions, which effectively prioritizes old datagrams over new datagrams.
///
/// See [`send_datagram()`] for details.
///
/// [`send_datagram()`]: Connection::send_datagram
pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
SendDatagram {
conn: &self.0,
data: Some(data),
notify: self.0.shared.datagrams_unblocked.notified(),
}
}

/// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
///
/// Returns `None` if datagrams are unsupported by the peer or disabled locally.
Expand Down Expand Up @@ -744,12 +761,63 @@ impl Future for ReadDatagram<'_> {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => this.notify.set(this.conn.shared.datagrams.notified()),
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagram_received.notified()),
}
}
}
}

pin_project! {
/// Future produced by [`Connection::send_datagram_wait`]
pub struct SendDatagram<'a> {
conn: &'a ConnectionRef,
data: Option<Bytes>,
#[pin]
notify: Notified<'a>,
}
}

impl Future for SendDatagram<'_> {
type Output = Result<(), SendDatagramError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut state = this.conn.state.lock("SendDatagram::poll");
if let Some(ref e) = state.error {
return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
}
use proto::SendDatagramError::*;
match state
.inner
.datagrams()
.send(this.data.take().unwrap(), false)
{
Ok(()) => {
state.wake();
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(match e {
Blocked(data) => {
this.data.replace(data);
loop {
match this.notify.as_mut().poll(ctx) {
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagrams_unblocked.notified()),
}
}
}
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
Disabled => SendDatagramError::Disabled,
TooLarge => SendDatagramError::TooLarge,
})),
}
}
}

#[derive(Debug)]
pub(crate) struct ConnectionRef(Arc<ConnectionInner>);

Expand Down Expand Up @@ -838,7 +906,8 @@ pub(crate) struct Shared {
stream_budget_available: [Notify; 2],
/// Notified when the peer has initiated a new stream
stream_incoming: [Notify; 2],
datagrams: Notify,
datagrams_unblocked: Notify,
datagram_received: Notify,
closed: Notify,
}

Expand Down Expand Up @@ -968,8 +1037,11 @@ impl State {
Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
}
DatagramsUnblocked => {
shared.datagrams_unblocked.notify_waiters();
}
DatagramReceived => {
shared.datagrams.notify_waiters();
shared.datagram_received.notify_waiters();
}
Stream(StreamEvent::Readable { id }) => {
if let Some(reader) = self.blocked_readers.remove(&id) {
Expand Down Expand Up @@ -1077,7 +1149,8 @@ impl State {
shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
shared.datagrams.notify_waiters();
shared.datagrams_unblocked.notify_waiters();
shared.datagram_received.notify_waiters();
for (_, x) in self.finishing.drain() {
let _ = x.send(Some(WriteError::ConnectionLost(reason.clone())));
}
Expand Down
2 changes: 1 addition & 1 deletion quinn/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ async fn two_datagram_readers() {
async {
server.send_datagram(b"one"[..].into()).unwrap();
done.notified().await;
server.send_datagram(b"two"[..].into()).unwrap();
server.send_datagram_wait(b"two"[..].into()).await.unwrap();
}
);
assert!(*a == *b"one" || *b == *b"one");
Expand Down

0 comments on commit 55376a3

Please sign in to comment.