From afc56cd96b7693179bce0e2bc110f184db1f109c Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Thu, 31 Oct 2024 13:33:53 -0600 Subject: [PATCH] refactor(s2n-quic-dc): put map impl behind trait (#2361) --- dc/s2n-quic-dc/src/fixed_map.rs | 8 +- dc/s2n-quic-dc/src/path/secret/map.rs | 1007 +---------------- dc/s2n-quic-dc/src/path/secret/map/cleaner.rs | 116 ++ dc/s2n-quic-dc/src/path/secret/map/entry.rs | 299 +++++ .../src/path/secret/map/entry/tests.rs | 21 + .../src/path/secret/map/handshake.rs | 156 +++ dc/s2n-quic-dc/src/path/secret/map/size_of.rs | 33 + dc/s2n-quic-dc/src/path/secret/map/state.rs | 317 ++++++ .../secret/map/{test.rs => state/tests.rs} | 139 +-- dc/s2n-quic-dc/src/path/secret/map/status.rs | 118 ++ dc/s2n-quic-dc/src/path/secret/map/store.rs | 90 ++ 11 files changed, 1260 insertions(+), 1044 deletions(-) create mode 100644 dc/s2n-quic-dc/src/path/secret/map/cleaner.rs create mode 100644 dc/s2n-quic-dc/src/path/secret/map/entry.rs create mode 100644 dc/s2n-quic-dc/src/path/secret/map/entry/tests.rs create mode 100644 dc/s2n-quic-dc/src/path/secret/map/handshake.rs create mode 100644 dc/s2n-quic-dc/src/path/secret/map/size_of.rs create mode 100644 dc/s2n-quic-dc/src/path/secret/map/state.rs rename dc/s2n-quic-dc/src/path/secret/map/{test.rs => state/tests.rs} (66%) create mode 100644 dc/s2n-quic-dc/src/path/secret/map/status.rs create mode 100644 dc/s2n-quic-dc/src/path/secret/map/store.rs diff --git a/dc/s2n-quic-dc/src/fixed_map.rs b/dc/s2n-quic-dc/src/fixed_map.rs index 18848b973..845b94b26 100644 --- a/dc/s2n-quic-dc/src/fixed_map.rs +++ b/dc/s2n-quic-dc/src/fixed_map.rs @@ -12,9 +12,11 @@ use core::{ hash::Hash, sync::atomic::{AtomicU8, Ordering}, }; -use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard}; +use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard}; use std::{collections::hash_map::RandomState, hash::BuildHasher}; +pub use parking_lot::MappedRwLockReadGuard as ReadGuard; + pub struct Map { slots: Box<[Slot]>, hash_builder: S, @@ -94,7 +96,7 @@ where self.get_by_key(key).is_some() } - pub fn get_by_key(&self, key: &K) -> Option> { + pub fn get_by_key(&self, key: &K) -> Option> { self.slot_by_hash(key).get_by_key(key) } } @@ -148,7 +150,7 @@ where None } - fn get_by_key(&self, needle: &K) -> Option> { + fn get_by_key(&self, needle: &K) -> Option> { // Scan each value and check if our requested needle is present. let values = self.values.read(); for (value_idx, value) in values.iter().enumerate() { diff --git a/dc/s2n-quic-dc/src/path/secret/map.rs b/dc/s2n-quic-dc/src/path/secret/map.rs index b25a8e4b1..22deb37a7 100644 --- a/dc/s2n-quic-dc/src/path/secret/map.rs +++ b/dc/s2n-quic-dc/src/path/secret/map.rs @@ -1,40 +1,30 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::{ - open, receiver, - schedule::{self, Initiator}, - seal, sender, stateless_reset, -}; use crate::{ credentials::{Credentials, Id}, - crypto, fixed_map, - packet::{secret_control as control, Packet, WireVersion}, + packet::{secret_control as control, Packet}, + path::secret::{open, seal, stateless_reset}, stream::TransportFeatures, }; -use rand::Rng as _; -use s2n_codec::EncoderBuffer; -use s2n_quic_core::{ - dc::{self, ApplicationParams, DatagramInfo}, - ensure, - event::api::EndpointType, - varint::VarInt, -}; -use std::{ - fmt, - hash::{BuildHasherDefault, Hasher}, - net::{Ipv4Addr, SocketAddr}, - sync::{ - atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - Arc, Mutex, - }, - time::{Duration, Instant}, -}; -use zeroize::Zeroizing; +use s2n_quic_core::dc; +use std::{net::SocketAddr, sync::Arc}; + +mod cleaner; +mod entry; +mod handshake; +mod size_of; +mod state; +mod status; +mod store; -const TLS_EXPORTER_LABEL: &str = "EXPERIMENTAL EXPORTER s2n-quic-dc"; -const TLS_EXPORTER_CONTEXT: &str = ""; -const TLS_EXPORTER_LENGTH: usize = schedule::EXPORT_SECRET_LEN; +use entry::Entry; +use store::Store; + +pub use entry::{ApplicationPair, Bidirectional, ControlPair}; + +pub(crate) use size_of::SizeOf; +pub(crate) use status::Dedup; // FIXME: Most of this comment is not true today, we're expecting to implement the details // contained here. This is presented as a roadmap. @@ -49,275 +39,47 @@ const TLS_EXPORTER_LENGTH: usize = schedule::EXPORT_SECRET_LEN; /// maximum rate (corresponding to no false positives in replay prevention for 15 seconds). #[derive(Clone)] pub struct Map { - pub(super) state: Arc, -} - -#[derive(Default)] -pub(super) struct NoopIdHasher(Option); - -impl Hasher for NoopIdHasher { - fn finish(&self) -> u64 { - self.0.unwrap() - } - - fn write(&mut self, _bytes: &[u8]) { - unimplemented!() - } - - fn write_u64(&mut self, x: u64) { - debug_assert!(self.0.is_none()); - self.0 = Some(x); - } -} - -// # Managing memory consumption -// -// For regular rotation with live peers, we retain at most two secrets: one derived from the most -// recent locally initiated handshake and the most recent remote initiated handshake (from our -// perspective). We guarantee that at most one handshake is ongoing for a given peer pair at a -// time, so both sides will have at least one mutually trusted entry after the handshake. If a peer -// is only acting as a client or only as a server, then one of the peer maps will always be empty. -// -// Previous entries can safely be removed after a grace period (EVICTION_TIME). EVICTION_TIME -// is only needed because a stream/datagram might be opening/sent concurrently with the new -// handshake (e.g., during regular rotation), and we don't want that to fail spuriously. -// -// We also need to manage secrets for no longer existing peers. These are peers where typically the -// underlying host has gone away and/or the address for it has changed. At 95% occupancy for the -// maximum size allowed, we will remove least recently used secrets (1% of these per minute). Usage -// is defined by access to the entry in the map. Unfortunately we lack any good way to authenticate -// a peer as *not* having credentials, especially after the peer is gone. It's possible that in the -// future information could also come from the TLS provider. -pub(super) struct State { - // This is in number of entries. - max_capacity: usize, - - rehandshake_period: Duration, - - // peers is the most recent entry originating from a locally *or* remote initiated handshake. - // - // Handshakes use s2n-quic and the SocketAddr is the address of the handshake socket. Since - // s2n-quic only has Client or Server endpoints, a given SocketAddr can only be used for - // exactly one of a locally initiated handshake or a remote initiated handshake. As a result we - // can use a single map to store both kinds and treat them identically. - // - // In the future it's likely we'll want to build bidirectional support in which case splitting - // this into two maps (per the discussion in "Managing memory consumption" above) will be - // needed. - pub(super) peers: fixed_map::Map>, - - // Stores the set of SocketAddr for which we received a UnknownPathSecret packet. - // When handshake_with is called we will allow a new handshake if this contains a socket, this - // is a temporary solution until we implement proper background handshaking. - pub(super) requested_handshakes: flurry::HashSet, - - // All known entries. - pub(super) ids: fixed_map::Map, BuildHasherDefault>, - - pub(super) signer: stateless_reset::Signer, - - // This socket is used *only* for sending secret control packets. - // FIXME: This will get replaced with sending on a handshake socket associated with the map. - pub(super) control_socket: std::net::UdpSocket, - - pub(super) receiver_shared: Arc, - - handled_control_packets: AtomicUsize, - - cleaner: Cleaner, -} - -struct Cleaner { - should_stop: AtomicBool, - thread: Mutex>>, - epoch: AtomicU64, -} - -impl Drop for Cleaner { - fn drop(&mut self) { - self.stop(); - } -} - -impl Cleaner { - fn new() -> Cleaner { - Cleaner { - should_stop: AtomicBool::new(false), - thread: Mutex::new(None), - epoch: AtomicU64::new(1), - } - } - - fn stop(&self) { - self.should_stop.store(true, Ordering::Relaxed); - if let Some(thread) = - std::mem::take(&mut *self.thread.lock().unwrap_or_else(|e| e.into_inner())) - { - thread.thread().unpark(); - - // If this isn't getting dropped on the cleaner thread, - // then wait for the background thread to finish exiting. - if std::thread::current().id() != thread.thread().id() { - // We expect this to terminate very quickly. - thread.join().unwrap(); - } - } - } - - fn spawn_thread(&self, state: Arc) { - let state = Arc::downgrade(&state); - let handle = std::thread::spawn(move || loop { - let Some(state) = state.upgrade() else { - break; - }; - if state.cleaner.should_stop.load(Ordering::Relaxed) { - break; - } - state.cleaner.clean(&state, EVICTION_CYCLES); - let pause = rand::thread_rng().gen_range(5..60); - drop(state); - std::thread::park_timeout(Duration::from_secs(pause)); - }); - *self.thread.lock().unwrap() = Some(handle); - } - - /// Periodic maintenance for various maps. - fn clean(&self, state: &State, eviction_cycles: u64) { - let current_epoch = self.epoch.fetch_add(1, Ordering::Relaxed); - let now = Instant::now(); - - // For non-retired entries, if it's time for them to handshake again, request a - // handshake to happen. This handshake will currently happen on the next request for this - // particular peer. - state.ids.retain(|_, entry| { - let retired_at = entry.retired.0.load(Ordering::Relaxed); - if retired_at == 0 { - if entry.rehandshake_time() <= now { - state.request_handshake(entry.peer); - } - - // always retain - true - } else { - // retain if we aren't yet ready to evict. - current_epoch.saturating_sub(retired_at) < eviction_cycles - } - }); - - // Drop IP entries if we no longer have the path secret ID entry. - // FIXME: Don't require a loop to do this. This is likely somewhat slow since it takes a - // write lock + read lock essentially per-entry, but should be near-constant-time. - state - .peers - .retain(|_, entry| state.ids.contains_key(entry.secret.id())); - - // Iteration order should be effectively random, so this effectively just prunes the list - // periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note - // that peers the application is actively interested in will typically bypass this list, so - // this is mostly a risk of delaying regular re-handshaking with very large cardinalities. - // - // FIXME: Long or mid-term it likely makes sense to replace this data structure with a - // fuzzy set of some kind and/or just moving to immediate background handshake attempts. - let mut count = 0; - state.requested_handshakes.pin().retain(|_| { - count += 1; - count < 5000 - }); - } - - fn epoch(&self) -> u64 { - self.epoch.load(Ordering::Relaxed) - } -} - -const EVICTION_CYCLES: u64 = if cfg!(test) { 0 } else { 10 }; - -impl State { - fn request_handshake(&self, peer: SocketAddr) { - // The length is reset as part of cleanup to 5000. - let handshakes = self.requested_handshakes.pin(); - if handshakes.len() <= 6000 { - handshakes.insert(peer); - } - } - - // for tests - #[allow(unused)] - fn set_max_capacity(&mut self, new: usize) { - self.max_capacity = new; - self.peers = fixed_map::Map::with_capacity(new, Default::default()); - self.ids = fixed_map::Map::with_capacity(new, Default::default()); - } + store: Arc, } impl Map { pub fn new(signer: stateless_reset::Signer, capacity: usize) -> Self { - // FIXME: Avoid unwrap and the whole socket. - // - // We only ever send on this socket - but we really should be sending on the same - // socket as used by an associated s2n-quic handshake runtime, and receiving control packets - // from that socket as well. Not exactly clear on how to achieve that yet though (both - // ownership wise since the map doesn't have direct access to handshakes and in terms - // of implementation). - let control_socket = std::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap(); - control_socket.set_nonblocking(true).unwrap(); - let state = State { - // This is around 500MB with current entry size. - max_capacity: capacity, - // FIXME: Allow configuring the rehandshake_period. - rehandshake_period: Duration::from_secs(3600 * 24), - peers: fixed_map::Map::with_capacity(capacity, Default::default()), - ids: fixed_map::Map::with_capacity(capacity, Default::default()), - requested_handshakes: Default::default(), - cleaner: Cleaner::new(), - signer, - - receiver_shared: receiver::Shared::new(), - - handled_control_packets: AtomicUsize::new(0), - control_socket, - }; - - let state = Arc::new(state); - - state.cleaner.spawn_thread(state.clone()); - - Self { state } + // TODO add the subscriber + let state = state::State::new(signer, capacity); + Self { store: state } } /// The number of trusted secrets. pub fn secrets_len(&self) -> usize { - self.state.ids.len() + self.store.secrets_len() } /// The number of trusted peers. /// /// This should be smaller than `secrets_len` (modulo momentary churn). pub fn peers_len(&self) -> usize { - self.state.peers.len() + self.store.peers_len() } pub fn secrets_capacity(&self) -> usize { - self.state.max_capacity + self.store.secrets_capacity() } pub fn drop_state(&self) { - self.state.peers.clear(); - self.state.ids.clear(); + self.store.drop_state(); } pub fn contains(&self, peer: SocketAddr) -> bool { - self.state.peers.contains_key(&peer) - && !self.state.requested_handshakes.pin().contains(&peer) + self.store.contains(peer) } pub fn seal_once( &self, peer: SocketAddr, - ) -> Option<(seal::Once, Credentials, ApplicationParams)> { - let state = self.state.peers.get_by_key(&peer)?; - let (sealer, credentials) = state.uni_sealer(); - Some((sealer, credentials, state.parameters.clone())) + ) -> Option<(seal::Once, Credentials, dc::ApplicationParams)> { + let entry = self.store.get_by_addr(&peer)?; + let (sealer, credentials) = entry.uni_sealer(); + Some((sealer, credentials, entry.parameters())) } /// Retrieve a sealer by path secret ID. @@ -327,10 +89,10 @@ impl Map { /// /// Note that unlike by-IP lookup this should typically not be done significantly after the /// original secret was used for decryption. - pub fn seal_once_id(&self, id: Id) -> Option<(seal::Once, Credentials, ApplicationParams)> { - let state = self.state.ids.get_by_key(&id)?; - let (sealer, credentials) = state.uni_sealer(); - Some((sealer, credentials, state.parameters.clone())) + pub fn seal_once_id(&self, id: Id) -> Option<(seal::Once, Credentials, dc::ApplicationParams)> { + let entry = self.store.get_by_id(&id)?; + let (sealer, credentials) = entry.uni_sealer(); + Some((sealer, credentials, entry.parameters())) } pub fn open_once( @@ -338,8 +100,8 @@ impl Map { credentials: &Credentials, control_out: &mut Vec, ) -> Option { - let state = self.pre_authentication(credentials, control_out)?; - let opener = state.uni_opener(self.clone(), credentials); + let entry = self.store.pre_authentication(credentials, control_out)?; + let opener = entry.uni_opener(self.clone(), credentials); Some(opener) } @@ -347,11 +109,11 @@ impl Map { &self, peer: SocketAddr, features: &TransportFeatures, - ) -> Option<(Bidirectional, ApplicationParams)> { - let state = self.state.peers.get_by_key(&peer)?; - let keys = state.bidi_local(features); + ) -> Option<(entry::Bidirectional, dc::ApplicationParams)> { + let entry = self.store.get_by_addr(&peer)?; + let keys = entry.bidi_local(features); - Some((keys, state.parameters.clone())) + Some((keys, entry.parameters())) } pub fn pair_for_credentials( @@ -359,11 +121,11 @@ impl Map { credentials: &Credentials, features: &TransportFeatures, control_out: &mut Vec, - ) -> Option<(Bidirectional, ApplicationParams)> { - let state = self.pre_authentication(credentials, control_out)?; + ) -> Option<(entry::Bidirectional, dc::ApplicationParams)> { + let entry = self.store.pre_authentication(credentials, control_out)?; - let params = state.parameters.clone(); - let keys = state.bidi_remote(self.clone(), credentials, features); + let params = entry.parameters(); + let keys = entry.bidi_remote(self.clone(), credentials, features); Some((keys, params)) } @@ -373,152 +135,24 @@ impl Map { /// For secret control packets, this will process those. /// For other packets, the map may collect metrics but will otherwise drop the packets. pub fn handle_unexpected_packet(&self, packet: &Packet) { - match packet { - Packet::Stream(_) => { - // no action for now. FIXME: Add metrics. - } - Packet::Datagram(_) => { - // no action for now. FIXME: Add metrics. - } - Packet::Control(_) => { - // no action for now. FIXME: Add metrics. - } - Packet::StaleKey(packet) => self.handle_control_packet(&(*packet).into()), - Packet::ReplayDetected(packet) => self.handle_control_packet(&(*packet).into()), - Packet::UnknownPathSecret(packet) => self.handle_control_packet(&(*packet).into()), - } - } - - pub fn handle_unknown_secret_packet(&self, packet: &control::unknown_path_secret::Packet) { - let Some(state) = self.state.ids.get_by_key(packet.credential_id()) else { - return; - }; - // Do not mark as live, this is lightly authenticated. - - // ensure the packet is authentic - if packet.authenticate(&state.sender.stateless_reset).is_none() { - return; - } - - self.state - .handled_control_packets - .fetch_add(1, Ordering::Relaxed); - - // FIXME: More actively schedule a new handshake. - // See comment on requested_handshakes for details. - self.state.request_handshake(state.peer); + self.store.handle_unexpected_packet(packet); } pub fn handle_control_packet(&self, packet: &control::Packet) { - if let control::Packet::UnknownPathSecret(ref packet) = &packet { - return self.handle_unknown_secret_packet(packet); - } - - let Some(state) = self.state.ids.get_by_key(packet.credential_id()) else { - // If we get a control packet we don't have a registered path secret for, ignore the - // packet. - return; - }; - - let key = state.sender.control_secret(&state.secret); - - match packet { - control::Packet::StaleKey(packet) => { - let Some(packet) = packet.authenticate(&key) else { - return; - }; - state.sender.update_for_stale_key(packet.min_key_id); - self.state - .handled_control_packets - .fetch_add(1, Ordering::Relaxed); - } - control::Packet::ReplayDetected(packet) => { - let Some(_packet) = packet.authenticate(&key) else { - return; - }; - self.state - .handled_control_packets - .fetch_add(1, Ordering::Relaxed); - - // If we see replay then we're going to assume that we should re-handshake in the - // background with this peer. Currently we can't handshake in the background (only - // in the foreground on next handshake_with). - // - // Note that there's no good way for us to prevent an attacker causing us to hit - // this code: they can always trivially replay a packet we send. At most we could - // de-duplicate *receiving* so there's one handshake per sent packet at most, but - // that's not particularly useful: we expect to send a lot of new packets that - // could be harvested. - // - // Handshaking will be rate limited per destination peer (and at least - // de-duplicated). - self.state.request_handshake(state.peer); - } - control::Packet::UnknownPathSecret(_) => unreachable!(), - } - } - - fn pre_authentication( - &self, - identity: &Credentials, - control_out: &mut Vec, - ) -> Option> { - let Some(state) = self.state.ids.get_by_key(&identity.id) else { - let packet = control::UnknownPathSecret { - wire_version: WireVersion::ZERO, - credential_id: identity.id, - }; - control_out.resize(control::UnknownPathSecret::PACKET_SIZE, 0); - let stateless_reset = self.state.signer.sign(&identity.id); - let encoder = EncoderBuffer::new(control_out); - packet.encode(encoder, &stateless_reset); - return None; - }; - - match state.receiver.pre_authentication(identity) { - Ok(()) => {} - Err(e) => { - self.send_control(&state, identity, e); - control_out.resize(control::UnknownPathSecret::PACKET_SIZE, 0); - - return None; - } - } - - Some(state.clone()) - } - - pub(super) fn on_new_path_secrets(&self, entry: Arc) { - // On insert clear our interest in a handshake. - self.state.requested_handshakes.pin().remove(&entry.peer); - if self.state.ids.insert(*entry.secret.id(), entry).is_some() { - // FIXME: Make insertion fallible and fail handshakes instead? - panic!("inserting a path secret ID twice"); - } - } - - pub(super) fn on_handshake_complete(&self, entry: Arc) { - let id = *entry.secret.id(); - - if let Some(prev) = self.state.peers.insert(entry.peer, entry) { - // This shouldn't happen due to the panic in on_new_path_secrets, but just - // in case something went wrong with the secret map we double check here. - // FIXME: Make insertion fallible and fail handshakes instead? - assert_ne!(*prev.secret.id(), id, "duplicate path secret id"); - - prev.retire(self.state.cleaner.epoch()); - } - } - - pub(super) fn signer(&self) -> &stateless_reset::Signer { - &self.state.signer + self.store.handle_control_packet(packet) } #[doc(hidden)] #[cfg(any(test, feature = "testing"))] pub fn for_test_with_peers( - peers: Vec<(schedule::Ciphersuite, dc::Version, SocketAddr)>, + peers: Vec<( + crate::path::secret::schedule::Ciphersuite, + dc::Version, + SocketAddr, + )>, ) -> (Self, Vec) { + use crate::path::secret::{receiver, schedule, sender}; + let provider = Self::new(stateless_reset::Signer::random(), peers.len() * 3); let mut secret = [0; 32]; aws_lc_rs::rand::fill(&mut secret).unwrap(); @@ -548,8 +182,7 @@ impl Map { dc::testing::TEST_REHANDSHAKE_PERIOD, ); let entry = Arc::new(entry); - provider.on_new_path_secrets(entry.clone()); - provider.on_handshake_complete(entry); + provider.store.test_insert(entry); } (provider, ids) @@ -558,530 +191,8 @@ impl Map { #[doc(hidden)] #[cfg(any(test, feature = "testing"))] pub fn test_insert(&self, peer: SocketAddr) { - let mut secret = [0; 32]; - aws_lc_rs::rand::fill(&mut secret).unwrap(); - let secret = schedule::Secret::new( - schedule::Ciphersuite::AES_GCM_128_SHA256, - dc::SUPPORTED_VERSIONS[0], - s2n_quic_core::endpoint::Type::Client, - &secret, - ); - let sender = sender::State::new([0; control::TAG_LEN]); - let receiver = self.state.receiver_shared.clone().new_receiver(); - let entry = Entry::new( - peer, - secret, - sender, - receiver, - dc::testing::TEST_APPLICATION_PARAMS, - dc::testing::TEST_REHANDSHAKE_PERIOD, - ); - let entry = Arc::new(entry); - self.on_new_path_secrets(entry.clone()); - self.on_handshake_complete(entry); - } - - fn send_control(&self, entry: &Entry, credentials: &Credentials, error: receiver::Error) { - let mut buffer = [0; control::MAX_PACKET_SIZE]; - let buffer = error.to_packet(entry, credentials, &mut buffer); - let dst = entry.peer; - self.send_control_packet(dst, buffer); - } - - pub(crate) fn send_control_packet(&self, dst: SocketAddr, buffer: &[u8]) { - match self.state.control_socket.send_to(buffer, dst) { - Ok(_) => { - // all done - } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // ignore would block -- we're not going to queue up control packet messages. - } - Err(e) => { - tracing::warn!("Failed to send control packet to {:?}: {:?}", dst, e); - } - } - } - - #[doc(hidden)] - #[cfg(any(test, feature = "testing"))] - pub fn handled_control_packets(&self) -> usize { - self.state.handled_control_packets.load(Ordering::Relaxed) - } -} - -impl receiver::Error { - pub(super) fn to_packet<'buffer>( - self, - entry: &Entry, - credentials: &Credentials, - buffer: &'buffer mut [u8; control::MAX_PACKET_SIZE], - ) -> &'buffer [u8] { - debug_assert_eq!(entry.secret.id(), &credentials.id); - let encoder = EncoderBuffer::new(&mut buffer[..]); - let length = match self { - receiver::Error::AlreadyExists => control::ReplayDetected { - wire_version: WireVersion::ZERO, - credential_id: credentials.id, - rejected_key_id: credentials.key_id, - } - .encode(encoder, &entry.secret.control_sealer()), - receiver::Error::Unknown => control::StaleKey { - wire_version: WireVersion::ZERO, - credential_id: credentials.id, - min_key_id: entry.receiver.minimum_unseen_key_id(), - } - .encode(encoder, &entry.secret.control_sealer()), - }; - &buffer[..length] - } -} - -#[derive(Debug)] -pub(super) struct Entry { - creation_time: Instant, - rehandshake_delta_secs: u32, - peer: SocketAddr, - secret: schedule::Secret, - retired: IsRetired, - sender: sender::State, - receiver: receiver::State, - parameters: ApplicationParams, -} - -impl SizeOf for Instant {} -impl SizeOf for u32 {} -impl SizeOf for SocketAddr {} -impl SizeOf for AtomicU64 {} - -impl SizeOf for IsRetired {} -impl SizeOf for ApplicationParams {} - -impl SizeOf for Entry { - fn size(&self) -> usize { - let Entry { - creation_time, - rehandshake_delta_secs, - peer, - secret, - retired, - sender, - receiver, - parameters, - } = self; - creation_time.size() - + rehandshake_delta_secs.size() - + peer.size() - + secret.size() - + retired.size() - + sender.size() - + receiver.size() - + parameters.size() - } -} - -/// Provide an approximation of the size of Self, including any heap indirection (e.g., a vec -/// backed by a megabyte is a megabyte in `size`, not 24 bytes). -/// -/// Approximation because we don't currently attempt to account for (as an example) padding. It's -/// too annoying to do that. -#[cfg_attr(not(test), allow(unused))] -pub(crate) trait SizeOf: Sized { - fn size(&self) -> usize { - // If we don't need drop, it's very likely that this type is fully contained in size_of - // Self. This simplifies implementing this trait for e.g. std types. - assert!( - !std::mem::needs_drop::(), - "{:?} requires custom SizeOf impl", - std::any::type_name::() - ); - std::mem::size_of::() - } -} - -// Retired is 0 if not yet retired. Otherwise it stores the background cleaner epoch at which it -// retired; that epoch increments roughly once per minute. -#[derive(Default)] -struct IsRetired(AtomicU64); - -impl fmt::Debug for IsRetired { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("IsRetired").field(&self.retired()).finish() - } -} - -impl IsRetired { - fn retired(&self) -> bool { - self.0.load(Ordering::Relaxed) != 0 - } -} - -impl Entry { - pub fn new( - peer: SocketAddr, - secret: schedule::Secret, - sender: sender::State, - receiver: receiver::State, - parameters: ApplicationParams, - rehandshake_time: Duration, - ) -> Self { - // clamp max datagram size to a well-known value - parameters - .max_datagram_size - .fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed); - - assert!(rehandshake_time.as_secs() <= u32::MAX as u64); - Self { - creation_time: Instant::now(), - // Schedule another handshake sometime in [5 minutes, rehandshake_time] from now. - rehandshake_delta_secs: rand::thread_rng().gen_range( - std::cmp::min(rehandshake_time.as_secs(), 360)..rehandshake_time.as_secs(), - ) as u32, - peer, - secret, - retired: Default::default(), - sender, - receiver, - parameters, - } - } - - fn retire(&self, at_epoch: u64) { - self.retired.0.store(at_epoch, Ordering::Relaxed); - } - - fn uni_sealer(&self) -> (seal::Once, Credentials) { - let key_id = self.sender.next_key_id(); - let credentials = Credentials { - id: *self.secret.id(), - key_id, - }; - let sealer = self.secret.application_sealer(key_id); - let sealer = seal::Once::new(sealer); - - (sealer, credentials) - } - - fn uni_opener(self: Arc, map: Map, credentials: &Credentials) -> open::Once { - let key_id = credentials.key_id; - let opener = self.secret.application_opener(key_id); - let dedup = Dedup::new(self, key_id, map); - open::Once::new(opener, dedup) - } - - fn bidi_local(&self, features: &TransportFeatures) -> Bidirectional { - let key_id = self.sender.next_key_id(); - let initiator = Initiator::Local; - - let application = ApplicationPair::new( - &self.secret, - key_id, - initiator, - // we don't need to dedup locally-initiated openers - Dedup::disabled(), - ); - - let control = if features.is_reliable() { - None - } else { - Some(ControlPair::new(&self.secret, key_id, initiator)) - }; - - Bidirectional { - credentials: Credentials { - id: *self.secret.id(), - key_id, - }, - application, - control, - } - } - - fn bidi_remote( - self: &Arc, - map: Map, - credentials: &Credentials, - features: &TransportFeatures, - ) -> Bidirectional { - let key_id = credentials.key_id; - let initiator = Initiator::Remote; - - let application = ApplicationPair::new( - &self.secret, - key_id, - initiator, - // Remote application keys need to be de-duplicated - Dedup::new(self.clone(), key_id, map), - ); - - let control = if features.is_reliable() { - None - } else { - Some(ControlPair::new(&self.secret, key_id, initiator)) - }; - - Bidirectional { - credentials: *credentials, - application, - control, - } - } - - fn rehandshake_time(&self) -> Instant { - self.creation_time + Duration::from_secs(u64::from(self.rehandshake_delta_secs)) - } -} - -pub struct Bidirectional { - pub credentials: Credentials, - pub application: ApplicationPair, - pub control: Option, -} - -pub struct ApplicationPair { - pub sealer: seal::Application, - pub opener: open::Application, -} - -impl ApplicationPair { - fn new(secret: &schedule::Secret, key_id: VarInt, initiator: Initiator, dedup: Dedup) -> Self { - let (sealer, sealer_ku, opener, opener_ku) = secret.application_pair(key_id, initiator); - - let sealer = seal::Application::new(sealer, sealer_ku); - - let opener = open::Application::new(opener, opener_ku, dedup); - - Self { sealer, opener } - } -} - -pub struct ControlPair { - pub sealer: seal::control::Stream, - pub opener: open::control::Stream, -} - -impl ControlPair { - fn new(secret: &schedule::Secret, key_id: VarInt, initiator: Initiator) -> Self { - let (sealer, opener) = secret.control_pair(key_id, initiator); - - Self { sealer, opener } - } -} - -pub struct Dedup { - cell: once_cell::sync::OnceCell, - init: core::cell::Cell, VarInt, Map)>>, -} - -/// SAFETY: `init` cell is synchronized by `OnceCell` -unsafe impl Sync for Dedup {} - -impl Dedup { - #[inline] - fn new(entry: Arc, key_id: VarInt, map: Map) -> Self { - // TODO potentially record a timestamp of when this was created to try and detect long - // delays of processing the first packet. - Self { - cell: Default::default(), - init: core::cell::Cell::new(Some((entry, key_id, map))), - } - } - - #[inline] - pub(crate) fn disabled() -> Self { - Self { - cell: once_cell::sync::OnceCell::with_value(Ok(())), - init: core::cell::Cell::new(None), - } - } - - #[inline] - pub(crate) fn disable(&self) { - // TODO - } - - #[inline] - pub fn check(&self) -> crypto::open::Result { - *self.cell.get_or_init(|| { - match self.init.take() { - Some((entry, key_id, map)) => { - let creds = &Credentials { - id: *entry.secret.id(), - key_id, - }; - match entry.receiver.post_authentication(creds) { - Ok(()) => Ok(()), - Err(receiver::Error::AlreadyExists) => { - map.send_control(&entry, creds, receiver::Error::AlreadyExists); - Err(crypto::open::Error::ReplayDefinitelyDetected) - } - Err(receiver::Error::Unknown) => { - map.send_control(&entry, creds, receiver::Error::Unknown); - Err(crypto::open::Error::ReplayPotentiallyDetected { - gap: Some( - (*entry.receiver.minimum_unseen_key_id()) - // This should never be negative, but saturate anyway to avoid - // wildly large numbers. - .saturating_sub(*creds.key_id), - ), - }) - } - } - } - None => { - // Dedup has been poisoned! TODO log this - Err(crypto::open::Error::ReplayPotentiallyDetected { gap: None }) - } - } - }) - } -} - -impl fmt::Debug for Dedup { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Dedup").field("cell", &self.cell).finish() + let receiver = self.store.receiver().clone().new_receiver(); + let entry = Entry::fake(peer, Some(receiver)); + self.store.test_insert(entry); } } - -pub struct HandshakingPath { - peer: SocketAddr, - dc_version: dc::Version, - parameters: ApplicationParams, - endpoint_type: s2n_quic_core::endpoint::Type, - secret: Option, - entry: Option>, - map: Map, -} - -impl HandshakingPath { - fn new(connection_info: &dc::ConnectionInfo, map: Map) -> Self { - let endpoint_type = match connection_info.endpoint_type { - EndpointType::Server { .. } => s2n_quic_core::endpoint::Type::Server, - EndpointType::Client { .. } => s2n_quic_core::endpoint::Type::Client, - }; - - Self { - peer: connection_info.remote_address.clone().into(), - dc_version: connection_info.dc_version, - parameters: connection_info.application_params.clone(), - endpoint_type, - secret: None, - entry: None, - map, - } - } -} - -impl dc::Endpoint for Map { - type Path = HandshakingPath; - - fn new_path(&mut self, connection_info: &dc::ConnectionInfo) -> Option { - Some(HandshakingPath::new(connection_info, self.clone())) - } - - fn on_possible_secret_control_packet( - &mut self, - // TODO: Maybe we should confirm that the sender IP at least matches the IP for the - // corresponding control secret? - _datagram_info: &DatagramInfo, - payload: &mut [u8], - ) -> bool { - let payload = s2n_codec::DecoderBufferMut::new(payload); - match control::Packet::decode(payload) { - Ok((packet, tail)) => { - // Probably a bug somewhere? There shouldn't be anything trailing in the buffer - // after we decode a secret control packet. - ensure!(tail.is_empty(), false); - - // If we successfully decoded a control packet, pass it into our map to handle. - self.handle_control_packet(&packet); - - true - } - Err(_) => false, - } - } -} - -impl dc::Path for HandshakingPath { - fn on_path_secrets_ready( - &mut self, - session: &impl s2n_quic_core::crypto::tls::TlsSession, - ) -> Result, s2n_quic_core::transport::Error> { - let mut material = Zeroizing::new([0; TLS_EXPORTER_LENGTH]); - session - .tls_exporter( - TLS_EXPORTER_LABEL.as_bytes(), - TLS_EXPORTER_CONTEXT.as_bytes(), - &mut *material, - ) - .unwrap(); - - let cipher_suite = match session.cipher_suite() { - s2n_quic_core::crypto::tls::CipherSuite::TLS_AES_128_GCM_SHA256 => { - schedule::Ciphersuite::AES_GCM_128_SHA256 - } - s2n_quic_core::crypto::tls::CipherSuite::TLS_AES_256_GCM_SHA384 => { - schedule::Ciphersuite::AES_GCM_256_SHA384 - } - _ => return Err(s2n_quic_core::transport::Error::INTERNAL_ERROR), - }; - - let secret = - schedule::Secret::new(cipher_suite, self.dc_version, self.endpoint_type, &material); - - let stateless_reset = self.map.signer().sign(secret.id()); - self.secret = Some(secret); - - Ok(vec![stateless_reset.into()]) - } - - fn on_peer_stateless_reset_tokens<'a>( - &mut self, - stateless_reset_tokens: impl Iterator, - ) { - // TODO: support multiple stateless reset tokens - let sender = sender::State::new( - stateless_reset_tokens - .into_iter() - .next() - .unwrap() - .into_inner(), - ); - - let receiver = self.map.state.receiver_shared.clone().new_receiver(); - - let entry = Entry::new( - self.peer, - self.secret - .take() - .expect("peer tokens are only received after secrets are ready"), - sender, - receiver, - self.parameters.clone(), - self.map.state.rehandshake_period, - ); - let entry = Arc::new(entry); - self.entry = Some(entry.clone()); - self.map.on_new_path_secrets(entry); - } - - fn on_dc_handshake_complete(&mut self) { - let entry = self.entry.clone().expect( - "the dc handshake cannot be complete without \ - on_peer_stateless_reset_tokens creating a map entry", - ); - self.map.on_handshake_complete(entry); - } - - fn on_mtu_updated(&mut self, mtu: u16) { - if let Some(entry) = self.entry.as_ref() { - entry - .parameters - .max_datagram_size - .store(mtu, Ordering::Relaxed); - } - } -} - -#[cfg(test)] -mod test; diff --git a/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs new file mode 100644 index 000000000..a5081407f --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs @@ -0,0 +1,116 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::state::State; +use rand::Rng as _; +use std::{ + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, Mutex, + }, + time::{Duration, Instant}, +}; + +const EVICTION_CYCLES: u64 = if cfg!(test) { 0 } else { 10 }; + +pub struct Cleaner { + should_stop: AtomicBool, + thread: Mutex>>, + epoch: AtomicU64, +} + +impl Drop for Cleaner { + fn drop(&mut self) { + self.stop(); + } +} + +impl Cleaner { + pub fn new() -> Cleaner { + Cleaner { + should_stop: AtomicBool::new(false), + thread: Mutex::new(None), + epoch: AtomicU64::new(1), + } + } + + pub fn stop(&self) { + self.should_stop.store(true, Ordering::Relaxed); + if let Some(thread) = + std::mem::take(&mut *self.thread.lock().unwrap_or_else(|e| e.into_inner())) + { + thread.thread().unpark(); + + // If this isn't getting dropped on the cleaner thread, + // then wait for the background thread to finish exiting. + if std::thread::current().id() != thread.thread().id() { + // We expect this to terminate very quickly. + thread.join().unwrap(); + } + } + } + + pub fn spawn_thread(&self, state: Arc) { + let state = Arc::downgrade(&state); + let handle = std::thread::spawn(move || loop { + let Some(state) = state.upgrade() else { + break; + }; + if state.cleaner().should_stop.load(Ordering::Relaxed) { + break; + } + state.cleaner().clean(&state, EVICTION_CYCLES); + let pause = rand::thread_rng().gen_range(5..60); + drop(state); + std::thread::park_timeout(Duration::from_secs(pause)); + }); + *self.thread.lock().unwrap() = Some(handle); + } + + /// Periodic maintenance for various maps. + pub fn clean(&self, state: &State, eviction_cycles: u64) { + let current_epoch = self.epoch.fetch_add(1, Ordering::Relaxed); + let now = Instant::now(); + + // For non-retired entries, if it's time for them to handshake again, request a + // handshake to happen. This handshake will currently happen on the next request for this + // particular peer. + state.ids.retain(|_, entry| { + if let Some(retired_at) = entry.retired_at() { + // retain if we aren't yet ready to evict. + current_epoch.saturating_sub(retired_at) < eviction_cycles + } else { + if entry.rehandshake_time() <= now { + state.request_handshake(*entry.peer()); + } + + // always retain + true + } + }); + + // Drop IP entries if we no longer have the path secret ID entry. + // FIXME: Don't require a loop to do this. This is likely somewhat slow since it takes a + // write lock + read lock essentially per-entry, but should be near-constant-time. + state + .peers + .retain(|_, entry| state.ids.contains_key(entry.id())); + + // Iteration order should be effectively random, so this effectively just prunes the list + // periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note + // that peers the application is actively interested in will typically bypass this list, so + // this is mostly a risk of delaying regular re-handshaking with very large cardinalities. + // + // FIXME: Long or mid-term it likely makes sense to replace this data structure with a + // fuzzy set of some kind and/or just moving to immediate background handshake attempts. + let mut count = 0; + state.requested_handshakes.pin().retain(|_| { + count += 1; + count < 5000 + }); + } + + pub fn epoch(&self) -> u64 { + self.epoch.load(Ordering::Relaxed) + } +} diff --git a/dc/s2n-quic-dc/src/path/secret/map/entry.rs b/dc/s2n-quic-dc/src/path/secret/map/entry.rs new file mode 100644 index 000000000..59c22ae8e --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/entry.rs @@ -0,0 +1,299 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{ + size_of::SizeOf, + status::{Dedup, IsRetired}, + Map, +}; +use crate::{ + credentials::{self, Credentials}, + packet::{secret_control as control, WireVersion}, + path::secret::{ + open, receiver, + schedule::{self, Initiator}, + seal, sender, + }, + stream::TransportFeatures, +}; +use rand::Rng as _; +use s2n_codec::EncoderBuffer; +use s2n_quic_core::{dc, varint::VarInt}; +use std::{ + net::SocketAddr, + sync::{atomic::Ordering, Arc}, + time::{Duration, Instant}, +}; + +#[cfg(test)] +mod tests; + +#[derive(Debug)] +pub(super) struct Entry { + creation_time: Instant, + rehandshake_delta_secs: u32, + peer: SocketAddr, + secret: schedule::Secret, + retired: IsRetired, + sender: sender::State, + receiver: receiver::State, + parameters: dc::ApplicationParams, +} + +impl SizeOf for Entry { + fn size(&self) -> usize { + let Entry { + creation_time, + rehandshake_delta_secs, + peer, + secret, + retired, + sender, + receiver, + parameters, + } = self; + creation_time.size() + + rehandshake_delta_secs.size() + + peer.size() + + secret.size() + + retired.size() + + sender.size() + + receiver.size() + + parameters.size() + } +} + +impl Entry { + pub fn new( + peer: SocketAddr, + secret: schedule::Secret, + sender: sender::State, + receiver: receiver::State, + parameters: dc::ApplicationParams, + rehandshake_time: Duration, + ) -> Self { + // clamp max datagram size to a well-known value + parameters + .max_datagram_size + .fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed); + + assert!(rehandshake_time.as_secs() <= u32::MAX as u64); + Self { + creation_time: Instant::now(), + // Schedule another handshake sometime in [5 minutes, rehandshake_time] from now. + rehandshake_delta_secs: rand::thread_rng().gen_range( + std::cmp::min(rehandshake_time.as_secs(), 360)..rehandshake_time.as_secs(), + ) as u32, + peer, + secret, + retired: Default::default(), + sender, + receiver, + parameters, + } + } + + #[cfg(any(test, feature = "testing"))] + pub fn fake(peer: SocketAddr, receiver: Option) -> Arc { + let receiver = receiver.unwrap_or_else(receiver::State::without_shared); + + let mut secret = [0; 32]; + aws_lc_rs::rand::fill(&mut secret).unwrap(); + + Arc::new(Entry::new( + peer, + schedule::Secret::new( + schedule::Ciphersuite::AES_GCM_128_SHA256, + dc::SUPPORTED_VERSIONS[0], + s2n_quic_core::endpoint::Type::Client, + &secret, + ), + sender::State::new([0; control::TAG_LEN]), + receiver, + dc::testing::TEST_APPLICATION_PARAMS, + dc::testing::TEST_REHANDSHAKE_PERIOD, + )) + } + + pub fn peer(&self) -> &SocketAddr { + &self.peer + } + + pub fn id(&self) -> &credentials::Id { + self.secret.id() + } + + pub fn retire(&self, at_epoch: u64) { + self.retired.retire(at_epoch); + } + + pub fn retired_at(&self) -> Option { + self.retired.retired_at() + } + + pub fn uni_sealer(&self) -> (seal::Once, Credentials) { + let key_id = self.sender.next_key_id(); + let credentials = Credentials { + id: *self.secret.id(), + key_id, + }; + let sealer = self.secret.application_sealer(key_id); + let sealer = seal::Once::new(sealer); + + (sealer, credentials) + } + + pub fn uni_opener(self: Arc, map: Map, credentials: &Credentials) -> open::Once { + let key_id = credentials.key_id; + let opener = self.secret.application_opener(key_id); + let dedup = Dedup::new(self, key_id, map); + open::Once::new(opener, dedup) + } + + pub fn bidi_local(&self, features: &TransportFeatures) -> Bidirectional { + let key_id = self.sender.next_key_id(); + let initiator = Initiator::Local; + + let application = ApplicationPair::new( + &self.secret, + key_id, + initiator, + // we don't need to dedup locally-initiated openers + Dedup::disabled(), + ); + + let control = if features.is_reliable() { + None + } else { + Some(ControlPair::new(&self.secret, key_id, initiator)) + }; + + Bidirectional { + credentials: Credentials { + id: *self.secret.id(), + key_id, + }, + application, + control, + } + } + + pub fn bidi_remote( + self: &Arc, + map: Map, + credentials: &Credentials, + features: &TransportFeatures, + ) -> Bidirectional { + let key_id = credentials.key_id; + let initiator = Initiator::Remote; + + let application = ApplicationPair::new( + &self.secret, + key_id, + initiator, + // Remote application keys need to be de-duplicated + Dedup::new(self.clone(), key_id, map), + ); + + let control = if features.is_reliable() { + None + } else { + Some(ControlPair::new(&self.secret, key_id, initiator)) + }; + + Bidirectional { + credentials: *credentials, + application, + control, + } + } + + pub fn parameters(&self) -> dc::ApplicationParams { + self.parameters.clone() + } + + pub fn update_max_datagram_size(&self, mtu: u16) { + self.parameters + .max_datagram_size + .store(mtu, Ordering::Relaxed); + } + + pub fn rehandshake_time(&self) -> Instant { + self.creation_time + Duration::from_secs(u64::from(self.rehandshake_delta_secs)) + } + + pub fn receiver(&self) -> &receiver::State { + &self.receiver + } + + pub fn sender(&self) -> &sender::State { + &self.sender + } + + pub fn control_secret(&self) -> crate::crypto::awslc::open::control::Secret { + self.sender.control_secret(&self.secret) + } +} + +impl receiver::Error { + pub(super) fn to_packet<'buffer>( + self, + entry: &Entry, + credentials: &Credentials, + buffer: &'buffer mut [u8; control::MAX_PACKET_SIZE], + ) -> &'buffer [u8] { + debug_assert_eq!(entry.secret.id(), &credentials.id); + let encoder = EncoderBuffer::new(&mut buffer[..]); + let length = match self { + receiver::Error::AlreadyExists => control::ReplayDetected { + wire_version: WireVersion::ZERO, + credential_id: credentials.id, + rejected_key_id: credentials.key_id, + } + .encode(encoder, &entry.secret.control_sealer()), + receiver::Error::Unknown => control::StaleKey { + wire_version: WireVersion::ZERO, + credential_id: credentials.id, + min_key_id: entry.receiver.minimum_unseen_key_id(), + } + .encode(encoder, &entry.secret.control_sealer()), + }; + &buffer[..length] + } +} + +pub struct Bidirectional { + pub credentials: Credentials, + pub application: ApplicationPair, + pub control: Option, +} + +pub struct ApplicationPair { + pub sealer: seal::Application, + pub opener: open::Application, +} + +impl ApplicationPair { + fn new(secret: &schedule::Secret, key_id: VarInt, initiator: Initiator, dedup: Dedup) -> Self { + let (sealer, sealer_ku, opener, opener_ku) = secret.application_pair(key_id, initiator); + + let sealer = seal::Application::new(sealer, sealer_ku); + + let opener = open::Application::new(opener, opener_ku, dedup); + + Self { sealer, opener } + } +} + +pub struct ControlPair { + pub sealer: seal::control::Stream, + pub opener: open::control::Stream, +} + +impl ControlPair { + fn new(secret: &schedule::Secret, key_id: VarInt, initiator: Initiator) -> Self { + let (sealer, opener) = secret.control_pair(key_id, initiator); + + Self { sealer, opener } + } +} diff --git a/dc/s2n-quic-dc/src/path/secret/map/entry/tests.rs b/dc/s2n-quic-dc/src/path/secret/map/entry/tests.rs new file mode 100644 index 000000000..451a193fc --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/entry/tests.rs @@ -0,0 +1,21 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; + +#[test] +fn entry_size() { + let mut should_check = true; + + should_check &= cfg!(target_pointer_width = "64"); + should_check &= cfg!(target_os = "linux"); + should_check &= std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok(); + + // This gates to running only on specific GHA to reduce false positives. + if should_check { + assert_eq!( + Entry::fake((std::net::Ipv4Addr::LOCALHOST, 0).into(), None).size(), + 238 + ); + } +} diff --git a/dc/s2n-quic-dc/src/path/secret/map/handshake.rs b/dc/s2n-quic-dc/src/path/secret/map/handshake.rs new file mode 100644 index 000000000..cb9ccf7aa --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/handshake.rs @@ -0,0 +1,156 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Entry, Map}; +use crate::{ + packet::secret_control as control, + path::secret::{schedule, sender}, +}; +use s2n_quic_core::{ + dc::{self, ApplicationParams, DatagramInfo}, + endpoint, ensure, event, +}; +use std::{net::SocketAddr, sync::Arc}; +use zeroize::Zeroizing; + +const TLS_EXPORTER_LABEL: &str = "EXPERIMENTAL EXPORTER s2n-quic-dc"; +const TLS_EXPORTER_CONTEXT: &str = ""; +const TLS_EXPORTER_LENGTH: usize = schedule::EXPORT_SECRET_LEN; + +pub struct HandshakingPath { + peer: SocketAddr, + dc_version: dc::Version, + parameters: ApplicationParams, + endpoint_type: s2n_quic_core::endpoint::Type, + secret: Option, + entry: Option>, + map: Map, +} + +impl HandshakingPath { + fn new(connection_info: &dc::ConnectionInfo, map: Map) -> Self { + let endpoint_type = match connection_info.endpoint_type { + event::api::EndpointType::Server { .. } => endpoint::Type::Server, + event::api::EndpointType::Client { .. } => endpoint::Type::Client, + }; + + Self { + peer: connection_info.remote_address.clone().into(), + dc_version: connection_info.dc_version, + parameters: connection_info.application_params.clone(), + endpoint_type, + secret: None, + entry: None, + map, + } + } +} + +impl dc::Endpoint for Map { + type Path = HandshakingPath; + + fn new_path(&mut self, connection_info: &dc::ConnectionInfo) -> Option { + Some(HandshakingPath::new(connection_info, self.clone())) + } + + fn on_possible_secret_control_packet( + &mut self, + // TODO: Maybe we should confirm that the sender IP at least matches the IP for the + // corresponding control secret? + _datagram_info: &DatagramInfo, + payload: &mut [u8], + ) -> bool { + let payload = s2n_codec::DecoderBufferMut::new(payload); + match control::Packet::decode(payload) { + Ok((packet, tail)) => { + // Probably a bug somewhere? There shouldn't be anything trailing in the buffer + // after we decode a secret control packet. + ensure!(tail.is_empty(), false); + + // If we successfully decoded a control packet, pass it into our map to handle. + self.handle_control_packet(&packet); + + true + } + Err(_) => false, + } + } +} + +impl dc::Path for HandshakingPath { + fn on_path_secrets_ready( + &mut self, + session: &impl s2n_quic_core::crypto::tls::TlsSession, + ) -> Result, s2n_quic_core::transport::Error> { + let mut material = Zeroizing::new([0; TLS_EXPORTER_LENGTH]); + session + .tls_exporter( + TLS_EXPORTER_LABEL.as_bytes(), + TLS_EXPORTER_CONTEXT.as_bytes(), + &mut *material, + ) + .unwrap(); + + let cipher_suite = match session.cipher_suite() { + s2n_quic_core::crypto::tls::CipherSuite::TLS_AES_128_GCM_SHA256 => { + schedule::Ciphersuite::AES_GCM_128_SHA256 + } + s2n_quic_core::crypto::tls::CipherSuite::TLS_AES_256_GCM_SHA384 => { + schedule::Ciphersuite::AES_GCM_256_SHA384 + } + _ => return Err(s2n_quic_core::transport::Error::INTERNAL_ERROR), + }; + + let secret = + schedule::Secret::new(cipher_suite, self.dc_version, self.endpoint_type, &material); + + let stateless_reset = self.map.store.signer().sign(secret.id()); + self.secret = Some(secret); + + Ok(vec![stateless_reset.into()]) + } + + fn on_peer_stateless_reset_tokens<'a>( + &mut self, + stateless_reset_tokens: impl Iterator, + ) { + // TODO: support multiple stateless reset tokens + let sender = sender::State::new( + stateless_reset_tokens + .into_iter() + .next() + .unwrap() + .into_inner(), + ); + + let receiver = self.map.store.receiver().clone().new_receiver(); + + let entry = Entry::new( + self.peer, + self.secret + .take() + .expect("peer tokens are only received after secrets are ready"), + sender, + receiver, + self.parameters.clone(), + self.map.store.rehandshake_period(), + ); + let entry = Arc::new(entry); + self.entry = Some(entry.clone()); + self.map.store.on_new_path_secrets(entry); + } + + fn on_dc_handshake_complete(&mut self) { + let entry = self.entry.clone().expect( + "the dc handshake cannot be complete without \ + on_peer_stateless_reset_tokens creating a map entry", + ); + self.map.store.on_handshake_complete(entry); + } + + fn on_mtu_updated(&mut self, mtu: u16) { + if let Some(entry) = self.entry.as_ref() { + entry.update_max_datagram_size(mtu); + } + } +} diff --git a/dc/s2n-quic-dc/src/path/secret/map/size_of.rs b/dc/s2n-quic-dc/src/path/secret/map/size_of.rs new file mode 100644 index 000000000..a41dc70f1 --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/size_of.rs @@ -0,0 +1,33 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::status::IsRetired; +use s2n_quic_core::dc; +use std::{net::SocketAddr, sync::atomic::AtomicU64, time::Instant}; + +/// Provide an approximation of the size of Self, including any heap indirection (e.g., a vec +/// backed by a megabyte is a megabyte in `size`, not 24 bytes). +/// +/// Approximation because we don't currently attempt to account for (as an example) padding. It's +/// too annoying to do that. +#[cfg_attr(not(test), allow(unused))] +pub(crate) trait SizeOf: Sized { + fn size(&self) -> usize { + // If we don't need drop, it's very likely that this type is fully contained in size_of + // Self. This simplifies implementing this trait for e.g. std types. + assert!( + !std::mem::needs_drop::(), + "{:?} requires custom SizeOf impl", + std::any::type_name::() + ); + std::mem::size_of::() + } +} + +impl SizeOf for Instant {} +impl SizeOf for u32 {} +impl SizeOf for SocketAddr {} +impl SizeOf for AtomicU64 {} + +impl SizeOf for IsRetired {} +impl SizeOf for dc::ApplicationParams {} diff --git a/dc/s2n-quic-dc/src/path/secret/map/state.rs b/dc/s2n-quic-dc/src/path/secret/map/state.rs new file mode 100644 index 000000000..141236f86 --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/state.rs @@ -0,0 +1,317 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{cleaner::Cleaner, stateless_reset, Entry, Store}; +use crate::{ + credentials::Id, + fixed_map::{self, ReadGuard}, + packet::{secret_control as control, Packet}, + path::secret::receiver, +}; +use std::{ + hash::{BuildHasherDefault, Hasher}, + net::{Ipv4Addr, SocketAddr}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +#[cfg(test)] +mod tests; + +// # Managing memory consumption +// +// For regular rotation with live peers, we retain at most two secrets: one derived from the most +// recent locally initiated handshake and the most recent remote initiated handshake (from our +// perspective). We guarantee that at most one handshake is ongoing for a given peer pair at a +// time, so both sides will have at least one mutually trusted entry after the handshake. If a peer +// is only acting as a client or only as a server, then one of the peer maps will always be empty. +// +// Previous entries can safely be removed after a grace period (EVICTION_TIME). EVICTION_TIME +// is only needed because a stream/datagram might be opening/sent concurrently with the new +// handshake (e.g., during regular rotation), and we don't want that to fail spuriously. +// +// We also need to manage secrets for no longer existing peers. These are peers where typically the +// underlying host has gone away and/or the address for it has changed. At 95% occupancy for the +// maximum size allowed, we will remove least recently used secrets (1% of these per minute). Usage +// is defined by access to the entry in the map. Unfortunately we lack any good way to authenticate +// a peer as *not* having credentials, especially after the peer is gone. It's possible that in the +// future information could also come from the TLS provider. +pub(super) struct State { + // This is in number of entries. + max_capacity: usize, + + rehandshake_period: Duration, + + // peers is the most recent entry originating from a locally *or* remote initiated handshake. + // + // Handshakes use s2n-quic and the SocketAddr is the address of the handshake socket. Since + // s2n-quic only has Client or Server endpoints, a given SocketAddr can only be used for + // exactly one of a locally initiated handshake or a remote initiated handshake. As a result we + // can use a single map to store both kinds and treat them identically. + // + // In the future it's likely we'll want to build bidirectional support in which case splitting + // this into two maps (per the discussion in "Managing memory consumption" above) will be + // needed. + pub(super) peers: fixed_map::Map>, + + // Stores the set of SocketAddr for which we received a UnknownPathSecret packet. + // When handshake_with is called we will allow a new handshake if this contains a socket, this + // is a temporary solution until we implement proper background handshaking. + pub(super) requested_handshakes: flurry::HashSet, + + // All known entries. + pub(super) ids: fixed_map::Map, BuildHasherDefault>, + + pub(super) signer: stateless_reset::Signer, + + // This socket is used *only* for sending secret control packets. + // FIXME: This will get replaced with sending on a handshake socket associated with the map. + pub(super) control_socket: std::net::UdpSocket, + + pub(super) receiver_shared: Arc, + + handled_control_packets: AtomicUsize, + + cleaner: Cleaner, +} + +impl State { + pub fn new(signer: stateless_reset::Signer, capacity: usize) -> Arc { + // FIXME: Avoid unwrap and the whole socket. + // + // We only ever send on this socket - but we really should be sending on the same + // socket as used by an associated s2n-quic handshake runtime, and receiving control packets + // from that socket as well. Not exactly clear on how to achieve that yet though (both + // ownership wise since the map doesn't have direct access to handshakes and in terms + // of implementation). + let control_socket = std::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap(); + control_socket.set_nonblocking(true).unwrap(); + + let state = Self { + // This is around 500MB with current entry size. + max_capacity: capacity, + // FIXME: Allow configuring the rehandshake_period. + rehandshake_period: Duration::from_secs(3600 * 24), + peers: fixed_map::Map::with_capacity(capacity, Default::default()), + ids: fixed_map::Map::with_capacity(capacity, Default::default()), + requested_handshakes: Default::default(), + cleaner: Cleaner::new(), + signer, + + receiver_shared: receiver::Shared::new(), + + handled_control_packets: AtomicUsize::new(0), + control_socket, + }; + + let state = Arc::new(state); + + state.cleaner.spawn_thread(state.clone()); + + state + } + + pub fn request_handshake(&self, peer: SocketAddr) { + // The length is reset as part of cleanup to 5000. + let handshakes = self.requested_handshakes.pin(); + if handshakes.len() <= 6000 { + handshakes.insert(peer); + } + } + + fn handle_unknown_secret_packet(&self, packet: &control::unknown_path_secret::Packet) { + let Some(entry) = self.get_by_id(packet.credential_id()) else { + return; + }; + // Do not mark as live, this is lightly authenticated. + + // ensure the packet is authentic + if packet + .authenticate(&entry.sender().stateless_reset) + .is_none() + { + return; + } + + self.handled_control_packets.fetch_add(1, Ordering::Relaxed); + + // FIXME: More actively schedule a new handshake. + // See comment on requested_handshakes for details. + self.request_handshake(*entry.peer()); + } + + pub fn cleaner(&self) -> &Cleaner { + &self.cleaner + } + + // for tests + #[allow(unused)] + fn set_max_capacity(&mut self, new: usize) { + self.max_capacity = new; + self.peers = fixed_map::Map::with_capacity(new, Default::default()); + self.ids = fixed_map::Map::with_capacity(new, Default::default()); + } +} + +impl Store for State { + fn secrets_len(&self) -> usize { + self.ids.len() + } + + fn peers_len(&self) -> usize { + self.peers.len() + } + + fn secrets_capacity(&self) -> usize { + self.max_capacity + } + + fn drop_state(&self) { + self.ids.clear(); + self.peers.clear(); + } + + fn contains(&self, peer: SocketAddr) -> bool { + self.peers.contains_key(&peer) && !self.requested_handshakes.pin().contains(&peer) + } + + fn on_new_path_secrets(&self, entry: Arc) { + // On insert clear our interest in a handshake. + self.requested_handshakes.pin().remove(entry.peer()); + let id = *entry.id(); + if self.ids.insert(id, entry.clone()).is_some() { + // FIXME: Make insertion fallible and fail handshakes instead? + panic!("inserting a path secret ID twice"); + } + } + + fn on_handshake_complete(&self, entry: Arc) { + let id = *entry.id(); + let peer = *entry.peer(); + if let Some(prev) = self.peers.insert(peer, entry) { + // This shouldn't happen due to the panic in on_new_path_secrets, but just + // in case something went wrong with the secret map we double check here. + // FIXME: Make insertion fallible and fail handshakes instead? + assert_ne!(*prev.id(), id, "duplicate path secret id"); + + prev.retire(self.cleaner.epoch()); + } + } + + fn get_by_addr(&self, peer: &SocketAddr) -> Option>> { + self.peers.get_by_key(peer) + } + + fn get_by_id(&self, id: &Id) -> Option>> { + self.ids.get_by_key(id) + } + + fn handle_control_packet(&self, packet: &control::Packet) { + if let control::Packet::UnknownPathSecret(ref packet) = &packet { + return self.handle_unknown_secret_packet(packet); + } + + let Some(entry) = self.ids.get_by_key(packet.credential_id()) else { + // If we get a control packet we don't have a registered path secret for, ignore the + // packet. + return; + }; + + let key = entry.control_secret(); + + match packet { + control::Packet::StaleKey(packet) => { + let Some(packet) = packet.authenticate(&key) else { + return; + }; + entry.sender().update_for_stale_key(packet.min_key_id); + self.handled_control_packets.fetch_add(1, Ordering::Relaxed); + } + control::Packet::ReplayDetected(packet) => { + let Some(_packet) = packet.authenticate(&key) else { + return; + }; + self.handled_control_packets.fetch_add(1, Ordering::Relaxed); + + // If we see replay then we're going to assume that we should re-handshake in the + // background with this peer. Currently we can't handshake in the background (only + // in the foreground on next handshake_with). + // + // Note that there's no good way for us to prevent an attacker causing us to hit + // this code: they can always trivially replay a packet we send. At most we could + // de-duplicate *receiving* so there's one handshake per sent packet at most, but + // that's not particularly useful: we expect to send a lot of new packets that + // could be harvested. + // + // Handshaking will be rate limited per destination peer (and at least + // de-duplicated). + self.request_handshake(*entry.peer()); + } + control::Packet::UnknownPathSecret(_) => unreachable!(), + } + } + + fn handle_unexpected_packet(&self, packet: &Packet) { + match packet { + Packet::Stream(_) => { + // no action for now. FIXME: Add metrics. + } + Packet::Datagram(_) => { + // no action for now. FIXME: Add metrics. + } + Packet::Control(_) => { + // no action for now. FIXME: Add metrics. + } + Packet::StaleKey(packet) => self.handle_control_packet(&(*packet).into()), + Packet::ReplayDetected(packet) => self.handle_control_packet(&(*packet).into()), + Packet::UnknownPathSecret(packet) => self.handle_control_packet(&(*packet).into()), + } + } + + fn signer(&self) -> &stateless_reset::Signer { + &self.signer + } + + fn receiver(&self) -> &Arc { + &self.receiver_shared + } + + fn send_control_packet(&self, dst: &SocketAddr, buffer: &[u8]) { + match self.control_socket.send_to(buffer, dst) { + Ok(_) => { + // all done + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // ignore would block -- we're not going to queue up control packet messages. + } + Err(e) => { + tracing::warn!("Failed to send control packet to {:?}: {:?}", dst, e); + } + } + } + + fn rehandshake_period(&self) -> Duration { + self.rehandshake_period + } +} + +#[derive(Default)] +pub(super) struct NoopIdHasher(Option); + +impl Hasher for NoopIdHasher { + fn finish(&self) -> u64 { + self.0.unwrap() + } + + fn write(&mut self, _bytes: &[u8]) { + unimplemented!() + } + + fn write_u64(&mut self, x: u64) { + debug_assert!(self.0.is_none()); + self.0 = Some(x); + } +} diff --git a/dc/s2n-quic-dc/src/path/secret/map/test.rs b/dc/s2n-quic-dc/src/path/secret/map/state/tests.rs similarity index 66% rename from dc/s2n-quic-dc/src/path/secret/map/test.rs rename to dc/s2n-quic-dc/src/path/secret/map/state/tests.rs index db9fac18e..7d43489aa 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/test.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/state/tests.rs @@ -1,70 +1,52 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::{receiver, sender}; +use super::*; +use crate::path::secret::{schedule, sender}; +use s2n_quic_core::dc; use std::{ collections::HashSet, + fmt, net::{Ipv4Addr, SocketAddrV4}, }; -use super::*; - -const VERSION: dc::Version = dc::SUPPORTED_VERSIONS[0]; - -fn fake_entry(peer: u16) -> Arc { - let mut secret = [0; 32]; - aws_lc_rs::rand::fill(&mut secret).unwrap(); - Arc::new(Entry::new( - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, peer)), - schedule::Secret::new( - schedule::Ciphersuite::AES_GCM_128_SHA256, - VERSION, - s2n_quic_core::endpoint::Type::Client, - &secret, - ), - sender::State::new([0; control::TAG_LEN]), - receiver::State::without_shared(), - dc::testing::TEST_APPLICATION_PARAMS, - dc::testing::TEST_REHANDSHAKE_PERIOD, - )) +fn fake_entry(port: u16) -> Arc { + Entry::fake((Ipv4Addr::LOCALHOST, port).into(), None) } #[test] fn cleans_after_delay() { let signer = stateless_reset::Signer::new(b"secret"); - let map = Map::new(signer, 50); + let map = State::new(signer, 50); // Stop background processing. We expect to manually invoke clean, and a background worker // might interfere with our state. - map.state.cleaner.stop(); + map.cleaner.stop(); let first = fake_entry(1); let second = fake_entry(1); let third = fake_entry(1); - map.on_new_path_secrets(first.clone()); - map.on_handshake_complete(first.clone()); - map.on_new_path_secrets(second.clone()); - map.on_handshake_complete(second.clone()); + map.test_insert(first.clone()); + map.test_insert(second.clone()); - assert!(map.state.ids.contains_key(first.secret.id())); - assert!(map.state.ids.contains_key(second.secret.id())); + assert!(map.ids.contains_key(first.id())); + assert!(map.ids.contains_key(second.id())); - map.state.cleaner.clean(&map.state, 1); - map.state.cleaner.clean(&map.state, 1); + map.cleaner.clean(&map, 1); + map.cleaner.clean(&map, 1); - map.on_new_path_secrets(third.clone()); - map.on_handshake_complete(third.clone()); + map.test_insert(third.clone()); - assert!(!map.state.ids.contains_key(first.secret.id())); - assert!(map.state.ids.contains_key(second.secret.id())); - assert!(map.state.ids.contains_key(third.secret.id())); + assert!(!map.ids.contains_key(first.id())); + assert!(map.ids.contains_key(second.id())); + assert!(map.ids.contains_key(third.id())); } #[test] fn thread_shutdown() { let signer = stateless_reset::Signer::new(b"secret"); - let map = Map::new(signer, 10); - let state = Arc::downgrade(&map.state); + let map = State::new(signer, 10); + let state = Arc::downgrade(&map); drop(map); let iterations = 10; @@ -89,10 +71,9 @@ struct Model { #[derive(bolero::TypeGenerator, Debug, Copy, Clone)] enum Operation { - NewPathSecret { ip: u8, path_secret_id: TestId }, + Insert { ip: u8, path_secret_id: TestId }, AdvanceTime, ReceiveUnknown { path_secret_id: TestId }, - HandshakeComplete { path_secret_id: TestId }, } #[derive(bolero::TypeGenerator, PartialEq, Eq, Hash, Copy, Clone)] @@ -113,7 +94,7 @@ impl TestId { export_secret[0] = self.0; schedule::Secret::new( schedule::Ciphersuite::AES_GCM_128_SHA256, - VERSION, + dc::SUPPORTED_VERSIONS[0], s2n_quic_core::endpoint::Type::Client, &export_secret, ) @@ -132,42 +113,33 @@ enum Invariant { } impl Model { - fn perform(&mut self, operation: Operation, state: &Map) { + fn perform(&mut self, operation: Operation, state: &State) { match operation { - Operation::NewPathSecret { ip, path_secret_id } => { + Operation::Insert { ip, path_secret_id } => { let ip = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from([0, 0, 0, ip]), 0)); let secret = path_secret_id.secret(); let id = *secret.id(); - let stateless_reset = state.state.signer.sign(&id); - state.on_new_path_secrets(Arc::new(Entry::new( + let stateless_reset = state.signer().sign(&id); + state.test_insert(Arc::new(Entry::new( ip, secret, sender::State::new(stateless_reset), - state.state.receiver_shared.clone().new_receiver(), + state.receiver().clone().new_receiver(), dc::testing::TEST_APPLICATION_PARAMS, dc::testing::TEST_REHANDSHAKE_PERIOD, ))); + self.invariants.insert(Invariant::ContainsIp(ip)); self.invariants.insert(Invariant::ContainsId(id)); } - Operation::HandshakeComplete { path_secret_id } => { - if let Some(entry) = state.state.ids.get_by_key(&path_secret_id.id()) { - if !state.state.peers.contains_key(&entry.peer) { - state.on_handshake_complete(entry.clone()); - } - self.invariants.insert(Invariant::ContainsIp(entry.peer)); - } - } Operation::AdvanceTime => { let mut invalidated = Vec::new(); self.invariants.retain(|invariant| { if let Invariant::ContainsId(id) = invariant { if state - .state - .ids - .get_by_key(id) - .map_or(true, |v| v.retired.retired()) + .get_by_id(id) + .map_or(true, |v| v.retired_at().is_some()) { invalidated.push(*id); return false; @@ -181,13 +153,13 @@ impl Model { } // Evict all stale records *now*. - state.state.cleaner.clean(&state.state, 0); + state.cleaner.clean(state, 0); } Operation::ReceiveUnknown { path_secret_id } => { let id = path_secret_id.id(); // This is signing with the "wrong" signer, but currently all of the signers used // in this test are keyed the same way so it doesn't matter. - let stateless_reset = state.state.signer.sign(&id); + let stateless_reset = state.signer.sign(&id); let packet = crate::packet::secret_control::unknown_path_secret::Packet::new_for_test( id, @@ -243,7 +215,7 @@ fn has_duplicate_pids(ops: &[Operation]) -> bool { let mut ids = HashSet::new(); for op in ops.iter() { match op { - Operation::NewPathSecret { + Operation::Insert { ip: _, path_secret_id, } => { @@ -255,10 +227,6 @@ fn has_duplicate_pids(ops: &[Operation]) -> bool { Operation::ReceiveUnknown { path_secret_id: _ } => { // no-op, we're fine receiving unknown pids. } - Operation::HandshakeComplete { .. } => { - // no-op, a handshake complete for the same pid as a - // new path secret is expected - } } } @@ -278,18 +246,18 @@ fn check_invariants() { let mut model = Model::default(); let signer = stateless_reset::Signer::new(b"secret"); - let mut map = Map::new(signer, 10_000); + let mut map = State::new(signer, 10_000); // Avoid background work interfering with testing. - map.state.cleaner.stop(); + map.cleaner.stop(); - Arc::get_mut(&mut map.state).unwrap().set_max_capacity(5); + Arc::get_mut(&mut map).unwrap().set_max_capacity(5); - model.check_invariants(&map.state); + model.check_invariants(&map); for op in input { model.perform(*op, &map); - model.check_invariants(&map.state); + model.check_invariants(&map); } }) } @@ -308,16 +276,16 @@ fn check_invariants_no_overflow() { let mut model = Model::default(); let signer = stateless_reset::Signer::new(b"secret"); - let map = Map::new(signer, 10_000); + let map = State::new(signer, 10_000); // Avoid background work interfering with testing. - map.state.cleaner.stop(); + map.cleaner.stop(); - model.check_invariants(&map.state); + model.check_invariants(&map); for op in input { model.perform(*op, &map); - model.check_invariants(&map.state); + model.check_invariants(&map); } }) } @@ -331,26 +299,11 @@ fn check_invariants_no_overflow() { #[ignore = "memory growth takes a long time to run"] fn no_memory_growth() { let signer = stateless_reset::Signer::new(b"secret"); - let map = Map::new(signer, 100_000); - map.state.cleaner.stop(); + let map = State::new(signer, 100_000); + map.cleaner.stop(); + for idx in 0..500_000 { // FIXME: this ends up 2**16 peers in the `peers` map - let entry = fake_entry(idx as u16); - map.on_new_path_secrets(entry.clone()); - map.on_handshake_complete(entry) - } -} - -#[test] -fn entry_size() { - let mut should_check = true; - - should_check &= cfg!(target_pointer_width = "64"); - should_check &= cfg!(target_os = "linux"); - should_check &= std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok(); - - // This gates to running only on specific GHA to reduce false positives. - if should_check { - assert_eq!(fake_entry(0).size(), 238); + map.test_insert(fake_entry(idx as u16)); } } diff --git a/dc/s2n-quic-dc/src/path/secret/map/status.rs b/dc/s2n-quic-dc/src/path/secret/map/status.rs new file mode 100644 index 000000000..76df63ccf --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/status.rs @@ -0,0 +1,118 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Entry, Map}; +use crate::{credentials::Credentials, crypto, path::secret::receiver}; +use core::{ + fmt, + sync::atomic::{AtomicU64, Ordering}, +}; +use s2n_quic_core::varint::VarInt; +use std::sync::Arc; + +// Retired is 0 if not yet retired. Otherwise it stores the background cleaner epoch at which it +// retired; that epoch increments roughly once per minute. +#[derive(Default)] +pub struct IsRetired(AtomicU64); + +impl fmt::Debug for IsRetired { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("IsRetired") + .field(&self.is_retired()) + .finish() + } +} + +impl IsRetired { + pub fn retire(&self, at_epoch: u64) { + self.0.store(at_epoch, Ordering::Relaxed); + } + + pub fn retired_at(&self) -> Option { + Some(self.0.load(Ordering::Relaxed)).filter(|v| *v > 0) + } + + pub fn is_retired(&self) -> bool { + self.retired_at().is_some() + } +} + +pub struct Dedup { + cell: once_cell::sync::OnceCell, + init: core::cell::Cell, VarInt, Map)>>, +} + +/// SAFETY: `init` cell is synchronized by `OnceCell` +unsafe impl Sync for Dedup {} + +impl Dedup { + #[inline] + pub(super) fn new(entry: Arc, key_id: VarInt, map: Map) -> Self { + // TODO potentially record a timestamp of when this was created to try and detect long + // delays of processing the first packet. + Self { + cell: Default::default(), + init: core::cell::Cell::new(Some((entry, key_id, map))), + } + } + + #[inline] + pub(crate) fn disabled() -> Self { + Self { + cell: once_cell::sync::OnceCell::with_value(Ok(())), + init: core::cell::Cell::new(None), + } + } + + #[inline] + pub(crate) fn disable(&self) { + // TODO + } + + #[inline] + pub fn check(&self) -> crypto::open::Result { + *self.cell.get_or_init(|| { + match self.init.take() { + Some((entry, key_id, map)) => { + let creds = &Credentials { + id: *entry.id(), + key_id, + }; + match entry.receiver().post_authentication(creds) { + Ok(()) => Ok(()), + Err(receiver::Error::AlreadyExists) => { + map.store.send_control_error( + &entry, + creds, + receiver::Error::AlreadyExists, + ); + Err(crypto::open::Error::ReplayDefinitelyDetected) + } + Err(receiver::Error::Unknown) => { + map.store + .send_control_error(&entry, creds, receiver::Error::Unknown); + Err(crypto::open::Error::ReplayPotentiallyDetected { + gap: Some( + (*entry.receiver().minimum_unseen_key_id()) + // This should never be negative, but saturate anyway to avoid + // wildly large numbers. + .saturating_sub(*creds.key_id), + ), + }) + } + } + } + None => { + // Dedup has been poisoned! TODO log this + Err(crypto::open::Error::ReplayPotentiallyDetected { gap: None }) + } + } + }) + } +} + +impl fmt::Debug for Dedup { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Dedup").field("cell", &self.cell).finish() + } +} diff --git a/dc/s2n-quic-dc/src/path/secret/map/store.rs b/dc/s2n-quic-dc/src/path/secret/map/store.rs new file mode 100644 index 000000000..40f15aa45 --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/store.rs @@ -0,0 +1,90 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Entry; +use crate::{ + credentials::{Credentials, Id}, + fixed_map::ReadGuard, + packet::{secret_control as control, Packet, WireVersion}, + path::secret::{receiver, stateless_reset}, +}; +use core::time::Duration; +use s2n_codec::EncoderBuffer; +use std::{net::SocketAddr, sync::Arc}; + +pub trait Store: 'static + Send + Sync { + fn secrets_len(&self) -> usize; + + fn peers_len(&self) -> usize; + + fn secrets_capacity(&self) -> usize; + + fn drop_state(&self); + + fn contains(&self, peer: SocketAddr) -> bool; + + fn on_new_path_secrets(&self, entry: Arc); + + fn on_handshake_complete(&self, entry: Arc); + + #[cfg(any(test, feature = "testing"))] + fn test_insert(&self, entry: Arc) { + self.on_new_path_secrets(entry.clone()); + self.on_handshake_complete(entry); + } + + fn get_by_addr(&self, peer: &SocketAddr) -> Option>>; + + fn get_by_id(&self, id: &Id) -> Option>>; + + fn handle_unexpected_packet(&self, packet: &Packet); + + fn handle_control_packet(&self, packet: &control::Packet); + + fn signer(&self) -> &stateless_reset::Signer; + + fn receiver(&self) -> &Arc; + + fn send_control_packet(&self, dst: &SocketAddr, buffer: &[u8]); + + fn rehandshake_period(&self) -> Duration; + + #[inline] + fn send_control_error(&self, entry: &Entry, credentials: &Credentials, error: receiver::Error) { + let mut buffer = [0; control::MAX_PACKET_SIZE]; + let buffer = error.to_packet(entry, credentials, &mut buffer); + let dst = entry.peer(); + self.send_control_packet(dst, buffer); + } + + #[inline] + fn pre_authentication( + &self, + identity: &Credentials, + control_out: &mut Vec, + ) -> Option> { + let Some(state) = self.get_by_id(&identity.id) else { + let packet = control::UnknownPathSecret { + wire_version: WireVersion::ZERO, + credential_id: identity.id, + }; + control_out.resize(control::UnknownPathSecret::PACKET_SIZE, 0); + let stateless_reset = self.signer().sign(&identity.id); + let encoder = EncoderBuffer::new(control_out); + packet.encode(encoder, &stateless_reset); + return None; + }; + + match state.receiver().pre_authentication(identity) { + Ok(()) => {} + Err(e) => { + self.send_control_error(&state, identity, e); + control_out.resize(control::UnknownPathSecret::PACKET_SIZE, 0); + + return None; + } + } + + Some(state.clone()) + } +}