Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-dc): update MTU on dc path when MTU is updated #2327

Merged
merged 10 commits into from
Oct 10, 2024
28 changes: 20 additions & 8 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl Map {
) -> Option<(seal::Once, Credentials, ApplicationParams)> {
let state = self.state.peers.get_by_key(&peer)?;
let (sealer, credentials) = state.uni_sealer();
Some((sealer, credentials, state.parameters))
Some((sealer, credentials, state.parameters.clone()))
}

pub fn open_once(
Expand All @@ -319,7 +319,7 @@ impl Map {
let state = self.state.peers.get_by_key(&peer)?;
let keys = state.bidi_local(features);

Some((keys, state.parameters))
Some((keys, state.parameters.clone()))
}

pub fn pair_for_credentials(
Expand All @@ -330,7 +330,7 @@ impl Map {
) -> Option<(Bidirectional, ApplicationParams)> {
let state = self.pre_authentication(credentials, control_out)?;

let params = state.parameters;
let params = state.parameters.clone();
let keys = state.bidi_remote(self.clone(), credentials, features);

Some((keys, params))
Expand Down Expand Up @@ -684,13 +684,13 @@ impl Entry {
secret: schedule::Secret,
sender: sender::State,
receiver: receiver::State,
mut parameters: ApplicationParams,
parameters: ApplicationParams,
rehandshake_time: Duration,
) -> Self {
// clamp max datagram size to a well-known value
parameters.max_datagram_size = parameters
parameters
.max_datagram_size
.min(crate::stream::MAX_DATAGRAM_SIZE as _);
.fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed);

assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
Self {
Expand Down Expand Up @@ -911,6 +911,7 @@ pub struct HandshakingPath {
parameters: ApplicationParams,
endpoint_type: s2n_quic_core::endpoint::Type,
secret: Option<schedule::Secret>,
entry: Option<Arc<Entry>>,
map: Map,
}

Expand All @@ -924,9 +925,10 @@ impl HandshakingPath {
Self {
peer: connection_info.remote_address.clone().into(),
dc_version: connection_info.dc_version,
parameters: connection_info.application_params,
parameters: connection_info.application_params.clone(),
endpoint_type,
secret: None,
entry: None,
map,
}
}
Expand Down Expand Up @@ -1018,12 +1020,22 @@ impl dc::Path for HandshakingPath {
.expect("peer tokens are only received after secrets are ready"),
sender,
receiver,
self.parameters,
self.parameters.clone(),
self.map.state.rehandshake_period,
);
let entry = Arc::new(entry);
self.entry = Some(entry.clone());
self.map.insert(entry);
}

fn on_mtu_updated(&mut self, mtu: u16) {
if let Some(entry) = self.entry.as_ref() {
entry
.parameters
.max_datagram_size
.store(mtu, Ordering::Relaxed);
}
}
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use s2n_quic_core::{
inet::{ExplicitCongestionNotification, SocketAddress},
varint::VarInt,
};
use std::{io, sync::Arc};
use std::{
io,
sync::{atomic::Ordering, Arc},
};
use tracing::{debug_span, Instrument as _};

type Result<T = (), E = io::Error> = core::result::Result<T, E>;
Expand Down Expand Up @@ -193,7 +196,7 @@ where
let flow = flow::non_blocking::State::new(flow_offset);

let path = send::path::Info {
max_datagram_size: parameters.max_datagram_size,
max_datagram_size: parameters.max_datagram_size.load(Ordering::Relaxed),
send_quantum,
ecn: ExplicitCongestionNotification::Ect0,
next_expected_control_packet: VarInt::ZERO,
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/send/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use s2n_quic_core::{
varint::VarInt,
};
use slotmap::SlotMap;
use std::collections::{BinaryHeap, VecDeque};
use std::{
collections::{BinaryHeap, VecDeque},
sync::atomic::Ordering,
};
use tracing::{debug, trace};

pub mod probe;
Expand Down Expand Up @@ -118,7 +121,7 @@ pub struct PeerActivity {
impl State {
#[inline]
pub fn new(stream_id: stream::Id, params: &ApplicationParams) -> Self {
let max_datagram_size = params.max_datagram_size;
let max_datagram_size = params.max_datagram_size.load(Ordering::Relaxed);
let initial_max_data = params.remote_max_data;
let local_max_data = params.local_send_max_data;

Expand Down
73 changes: 69 additions & 4 deletions quic/s2n-quic-core/src/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use crate::{
transport::parameters::{DcSupportedVersions, InitialFlowControlLimits},
varint::VarInt,
};
use core::{num::NonZeroU32, time::Duration};
use core::{
num::NonZeroU32,
sync::atomic::{AtomicU16, Ordering},
time::Duration,
};

mod disabled;
mod traits;
Expand Down Expand Up @@ -91,25 +95,37 @@ impl<'a> DatagramInfo<'a> {
}

/// Various settings relevant to the dc path
#[derive(Clone, Copy, Debug)]
#[derive(Debug)]
#[non_exhaustive]
pub struct ApplicationParams {
pub max_datagram_size: u16,
pub max_datagram_size: AtomicU16,
pub remote_max_data: VarInt,
pub local_send_max_data: VarInt,
pub local_recv_max_data: VarInt,
// Actually a Duration, stored as milliseconds to shrink this struct
pub max_idle_timeout: Option<NonZeroU32>,
}

impl Clone for ApplicationParams {
fn clone(&self) -> Self {
Self {
max_datagram_size: AtomicU16::new(self.max_datagram_size.load(Ordering::Relaxed)),
remote_max_data: self.remote_max_data,
local_send_max_data: self.local_send_max_data,
local_recv_max_data: self.local_recv_max_data,
max_idle_timeout: self.max_idle_timeout,
}
}
}

impl ApplicationParams {
pub fn new(
max_datagram_size: u16,
peer_flow_control_limits: &InitialFlowControlLimits,
limits: &Limits,
) -> Self {
Self {
max_datagram_size,
max_datagram_size: AtomicU16::new(max_datagram_size),
remote_max_data: peer_flow_control_limits.max_data,
local_send_max_data: limits.initial_stream_limits().max_data_bidi_local,
local_recv_max_data: limits.initial_stream_limits().max_data_bidi_remote,
Expand All @@ -125,3 +141,52 @@ impl ApplicationParams {
Some(Duration::from_millis(self.max_idle_timeout?.get() as u64))
}
}

#[cfg(test)]
mod tests {
use crate::{
connection::Limits, dc::ApplicationParams, transport::parameters::InitialFlowControlLimits,
varint::VarInt,
};
use std::{sync::atomic::Ordering, time::Duration};

#[test]
fn clone() {
let initial_flow_control_limits = InitialFlowControlLimits {
max_data: VarInt::from_u32(2222),
..Default::default()
};

let limits = Limits {
bidirectional_local_data_window: 1234.try_into().unwrap(),
bidirectional_remote_data_window: 6789.try_into().unwrap(),
max_idle_timeout: Duration::from_millis(999).try_into().unwrap(),
..Default::default()
};

let params = ApplicationParams::new(9000, &initial_flow_control_limits, &limits);

assert_eq!(9000, params.max_datagram_size.load(Ordering::Relaxed));
assert_eq!(limits.max_idle_timeout(), params.max_idle_timeout());
assert_eq!(1234, params.local_send_max_data.as_u64());
assert_eq!(6789, params.local_recv_max_data.as_u64());
assert_eq!(2222, params.remote_max_data.as_u64());

let cloned_params = params.clone();

assert_eq!(
params.max_datagram_size.load(Ordering::Relaxed),
cloned_params.max_datagram_size.load(Ordering::Relaxed)
);
assert_eq!(params.max_idle_timeout, cloned_params.max_idle_timeout);
assert_eq!(
params.local_send_max_data,
cloned_params.local_send_max_data
);
assert_eq!(
params.local_recv_max_data,
cloned_params.local_recv_max_data
);
assert_eq!(params.remote_max_data, cloned_params.remote_max_data);
}
}
4 changes: 4 additions & 0 deletions quic/s2n-quic-core/src/dc/disabled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ impl Path for () {
) {
unimplemented!()
}

fn on_mtu_updated(&mut self, _mtu: u16) {
unimplemented!()
}
}
16 changes: 13 additions & 3 deletions quic/s2n-quic-core/src/dc/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use core::{num::NonZeroU32, time::Duration};
use std::sync::{
atomic::{AtomicU8, Ordering},
atomic::{AtomicU16, AtomicU8, Ordering},
Arc,
};

Expand All @@ -36,14 +36,19 @@ pub struct MockDcPath {
pub on_peer_stateless_reset_tokens_count: u8,
pub stateless_reset_tokens: Vec<stateless_reset::Token>,
pub peer_stateless_reset_tokens: Vec<stateless_reset::Token>,
pub mtu: u16,
}

impl dc::Endpoint for MockDcEndpoint {
type Path = MockDcPath;

fn new_path(&mut self, _connection_info: &ConnectionInfo) -> Option<Self::Path> {
fn new_path(&mut self, connection_info: &ConnectionInfo) -> Option<Self::Path> {
Some(MockDcPath {
stateless_reset_tokens: self.stateless_reset_tokens.clone(),
mtu: connection_info
.application_params
.max_datagram_size
.load(Ordering::Relaxed),
..Default::default()
})
}
Expand Down Expand Up @@ -76,10 +81,15 @@ impl dc::Path for MockDcPath {
self.peer_stateless_reset_tokens
.extend(stateless_reset_tokens);
}

fn on_mtu_updated(&mut self, mtu: u16) {
self.mtu = mtu
}
}

#[allow(clippy::declare_interior_mutable_const)]
pub const TEST_APPLICATION_PARAMS: ApplicationParams = ApplicationParams {
max_datagram_size: 1472,
max_datagram_size: AtomicU16::new(1472),
remote_max_data: VarInt::from_u32(1u32 << 25),
local_send_max_data: VarInt::from_u32(1u32 << 25),
local_recv_max_data: VarInt::from_u32(1u32 << 25),
Expand Down
10 changes: 10 additions & 0 deletions quic/s2n-quic-core/src/dc/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub trait Path: 'static + Send {
&mut self,
stateless_reset_tokens: impl Iterator<Item = &'a stateless_reset::Token>,
);

/// Called when the MTU has been updated for the path
fn on_mtu_updated(&mut self, mtu: u16);
}

impl<P: Path> Path for Option<P> {
Expand All @@ -69,4 +72,11 @@ impl<P: Path> Path for Option<P> {
path.on_peer_stateless_reset_tokens(stateless_reset_tokens)
}
}

#[inline]
fn on_mtu_updated(&mut self, max_datagram_size: u16) {
if let Some(path) = self {
path.on_mtu_updated(max_datagram_size)
}
}
}
Loading
Loading