Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s2n-quic-transport): decrement handshake count for non-accepted finalized connections #1911

Merged
merged 1 commit into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions quic/s2n-quic-transport/src/connection/connection_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ impl<C: connection::Trait, L: connection::Lock<C>> InterestLists<C, L> {
}
}

if node.inner.read(|conn| !conn.is_accepted())? {
// Decrement the inflight handshakes because the connection has
// been finalized before it was handed back to the application
// and thus this count was not decremented previously
self.handshake_connections -= 1;
}

insert_interest!(done_connections, push_back);
} else {
unreachable!("Done connections should never report not done later");
Expand Down Expand Up @@ -867,35 +874,24 @@ impl<C: connection::Trait, L: connection::Lock<C>> ConnectionContainer<C, L> {
self.interest_lists.handshake_connections = self.count_handshaking_connections();
}

self.ensure_counter_consistency();
self.finalize_done_connections();
self.ensure_counter_consistency();

Some((result, interests))
}

/// Removes all Connections in the `done` state from the `ConnectionContainer`.
pub fn finalize_done_connections(&mut self) {
fn finalize_done_connections(&mut self) {
debug_assert_eq!(
self.interest_lists.handshake_connections + self.count_done_handshaking_connections(),
self.count_handshaking_connections()
);

for connection in self.interest_lists.done_connections.take() {
self.remove_node(&connection);

// If the connection is still handshaking then it must have timed out.
let result = connection.inner.read(|conn| conn.is_handshaking());
match result {
Ok(true) => {
self.interest_lists.handshake_connections -= 1;
self.ensure_counter_consistency();
}
Ok(false) => {
// nothing to do
}
Err(_) => {
// The connection panicked so we need to recompute all of the handshaking
// connections
self.interest_lists.handshake_connections =
self.count_handshaking_connections();
}
}
}

debug_assert_eq!(0, self.count_done_handshaking_connections());
}

fn count_handshaking_connections(&self) -> usize {
Expand All @@ -910,6 +906,19 @@ impl<C: connection::Trait, L: connection::Lock<C>> ConnectionContainer<C, L> {
.count()
}

fn count_done_handshaking_connections(&self) -> usize {
self.interest_lists
.done_connections
.iter()
.filter(|conn| {
conn.inner
.read(|conn| conn.is_handshaking())
.ok()
.unwrap_or(false)
})
.count()
}

fn ensure_counter_consistency(&self) {
if cfg!(debug_assertions) {
let expected = self.count_handshaking_connections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
use super::*;
use crate::{
connection::{
self, connection_interests::ConnectionInterests,
self, connection_impl::AcceptState, connection_interests::ConnectionInterests,
internal_connection_id::InternalConnectionId, InternalConnectionIdGenerator,
ProcessingError,
ProcessingError, Trait,
},
endpoint, path, stream,
};
Expand Down Expand Up @@ -36,8 +36,7 @@ use s2n_quic_core::{
use std::sync::Mutex;

struct TestConnection {
is_handshaking: bool,
has_been_accepted: bool,
accept_state: AcceptState,
is_closed: bool,
interests: ConnectionInterests,
close_timer: Timer,
Expand All @@ -46,8 +45,7 @@ struct TestConnection {
impl Default for TestConnection {
fn default() -> Self {
Self {
is_handshaking: true,
has_been_accepted: false,
accept_state: AcceptState::Handshaking,
is_closed: false,
interests: ConnectionInterests {
transmission: true,
Expand All @@ -70,7 +68,11 @@ impl connection::Trait for TestConnection {
}

fn is_handshaking(&self) -> bool {
self.is_handshaking
self.accept_state == AcceptState::Handshaking
}

fn is_accepted(&self) -> bool {
self.accept_state == AcceptState::Active
}

fn close(
Expand All @@ -88,8 +90,8 @@ impl connection::Trait for TestConnection {
}

fn mark_as_accepted(&mut self) {
assert!(!self.has_been_accepted);
self.has_been_accepted = true;
assert!(!self.is_accepted());
self.accept_state = AcceptState::Active;
self.interests.accept = false;
}

Expand Down Expand Up @@ -391,6 +393,10 @@ enum Operation {
timeout: Option<u16>,
},
CloseApp,
HandshakeCompleted {
index: usize,
closed: bool,
},
Receive,
Timeout(u16),
Transmit(u16),
Expand Down Expand Up @@ -421,12 +427,6 @@ fn container_test() {
let connection = TestConnection::default();
container.insert_connection(connection, id);
connections.push(id);

let mut was_called = false;
container.with_connection(id, |_conn| {
was_called = true;
});
assert!(was_called);
}
Operation::UpdateInterests {
index,
Expand Down Expand Up @@ -459,12 +459,9 @@ fn container_test() {
}

i.closing = *closing;
if !conn.has_been_accepted {
if conn.accept_state == AcceptState::HandshakeCompleted {
i.accept = *accept;
}
if *accept {
conn.is_handshaking = false;
}
i.transmission = *transmission;
i.new_connection_id = *new_connection_id;
i.timeout = timeout.map(|ms| now + Duration::from_millis(ms as _));
Expand All @@ -484,6 +481,34 @@ fn container_test() {
Operation::CloseApp => {
handle = None;
}
Operation::HandshakeCompleted { index, closed } => {
if connections.is_empty() {
continue;
}
let index = index % connections.len();
let id = connections[index];

let mut was_called = false;
container.with_connection(id, |conn| {
if conn.is_handshaking() {
conn.accept_state = AcceptState::HandshakeCompleted;
if *closed {
// The connection was closed immediately following the
// handshake being completed and before it could be accepted
conn.is_closed = true;
conn.interests = ConnectionInterests {
finalization: true,
..Default::default()
};
connections.remove(index);
} else {
conn.interests.accept = true;
}
}
was_called = true;
});
assert!(was_called);
}
Operation::Receive => {
if let Some(handle) = handle.as_mut() {
while let Poll::Ready(Some(_accepted)) = handle
Expand Down Expand Up @@ -566,7 +591,7 @@ fn container_test() {
let mut cursor = container.connection_map.front();

while let Some(conn) = cursor.get() {
assert_eq!(conn.internal_connection_id, connections.next().unwrap());
assert_eq!(Some(conn.internal_connection_id), connections.next());
cursor.move_next();
}

Expand Down
6 changes: 5 additions & 1 deletion quic/s2n-quic-transport/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use s2n_quic_core::{
/// Possible states for handing over a connection from the endpoint to the
/// application.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum AcceptState {
pub(crate) enum AcceptState {
/// The connection is handshaking on the server side and not yet visible
/// to the application.
Handshaking,
Expand Down Expand Up @@ -555,6 +555,10 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
self.accept_state == AcceptState::Handshaking
}

fn is_accepted(&self) -> bool {
self.accept_state == AcceptState::Active
}

/// Creates a new `Connection` instance with the given configuration
fn new(parameters: ConnectionParameters<Self::Config>) -> Result<Self, connection::Error> {
let mut event_context = EventContext {
Expand Down
4 changes: 4 additions & 0 deletions quic/s2n-quic-transport/src/connection/connection_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub trait ConnectionTrait: 'static + Send + Sized {
/// Returns whether the connection is in the handshake state
fn is_handshaking(&self) -> bool;

/// Returns true if the handshake has completed and the connection
/// has been handed off to the application
fn is_accepted(&self) -> bool;

/// Initiates closing the connection as described in
/// https://www.rfc-editor.org/rfc/rfc9000#section-10
fn close(
Expand Down
Loading