Skip to content

Commit

Permalink
feat(s2n-quic-platform): emit socket events
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Oct 25, 2024
1 parent ed9db08 commit 25f2f9b
Show file tree
Hide file tree
Showing 19 changed files with 682 additions and 260 deletions.
148 changes: 142 additions & 6 deletions quic/s2n-quic-core/src/event/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,16 @@ pub mod api {
pub struct PlatformTx {
#[doc = " The number of packets sent"]
pub count: usize,
#[doc = " The number of syscalls performed"]
pub syscalls: usize,
#[doc = " The number of syscalls that got blocked"]
pub blocked_syscalls: usize,
#[doc = " The total number of errors encountered since the last event"]
pub total_errors: usize,
#[doc = " The number of specific error codes dropped"]
#[doc = ""]
#[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"]
pub dropped_errors: usize,
}
impl Event for PlatformTx {
const NAME: &'static str = "platform:tx";
Expand All @@ -1168,6 +1178,16 @@ pub mod api {
pub struct PlatformRx {
#[doc = " The number of packets received"]
pub count: usize,
#[doc = " The number of syscalls performed"]
pub syscalls: usize,
#[doc = " The number of syscalls that got blocked"]
pub blocked_syscalls: usize,
#[doc = " The total number of errors encountered since the last event"]
pub total_errors: usize,
#[doc = " The number of specific error codes dropped"]
#[doc = ""]
#[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"]
pub dropped_errors: usize,
}
impl Event for PlatformRx {
const NAME: &'static str = "platform:rx";
Expand Down Expand Up @@ -1215,6 +1235,15 @@ pub mod api {
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct PlatformStarted<'a> {
#[doc = " The local address of the socket"]
pub local_address: SocketAddress<'a>,
}
impl<'a> Event for PlatformStarted<'a> {
const NAME: &'static str = "platform:started";
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum PlatformFeatureConfiguration {
#[non_exhaustive]
#[doc = " Emitted when segment offload was configured"]
Expand Down Expand Up @@ -2397,8 +2426,14 @@ pub mod tracing {
api::EndpointType::Client {} => self.client.id(),
api::EndpointType::Server {} => self.server.id(),
};
let api::PlatformTx { count } = event;
tracing :: event ! (target : "platform_tx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count));
let api::PlatformTx {
count,
syscalls,
blocked_syscalls,
total_errors,
dropped_errors,
} = event;
tracing :: event ! (target : "platform_tx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count) , syscalls = tracing :: field :: debug (syscalls) , blocked_syscalls = tracing :: field :: debug (blocked_syscalls) , total_errors = tracing :: field :: debug (total_errors) , dropped_errors = tracing :: field :: debug (dropped_errors));
}
#[inline]
fn on_platform_tx_error(&mut self, meta: &api::EndpointMeta, event: &api::PlatformTxError) {
Expand All @@ -2415,8 +2450,14 @@ pub mod tracing {
api::EndpointType::Client {} => self.client.id(),
api::EndpointType::Server {} => self.server.id(),
};
let api::PlatformRx { count } = event;
tracing :: event ! (target : "platform_rx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count));
let api::PlatformRx {
count,
syscalls,
blocked_syscalls,
total_errors,
dropped_errors,
} = event;
tracing :: event ! (target : "platform_rx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count) , syscalls = tracing :: field :: debug (syscalls) , blocked_syscalls = tracing :: field :: debug (blocked_syscalls) , total_errors = tracing :: field :: debug (total_errors) , dropped_errors = tracing :: field :: debug (dropped_errors));
}
#[inline]
fn on_platform_rx_error(&mut self, meta: &api::EndpointMeta, event: &api::PlatformRxError) {
Expand Down Expand Up @@ -2474,6 +2515,15 @@ pub mod tracing {
} = event;
tracing :: event ! (target : "platform_event_loop_sleep" , parent : parent , tracing :: Level :: DEBUG , timeout = tracing :: field :: debug (timeout) , processing_duration = tracing :: field :: debug (processing_duration));
}
#[inline]
fn on_platform_started(&mut self, meta: &api::EndpointMeta, event: &api::PlatformStarted) {
let parent = match meta.endpoint_type {
api::EndpointType::Client {} => self.client.id(),
api::EndpointType::Server {} => self.server.id(),
};
let api::PlatformStarted { local_address } = event;
tracing :: event ! (target : "platform_started" , parent : parent , tracing :: Level :: DEBUG , local_address = tracing :: field :: debug (local_address));
}
}
}
pub mod builder {
Expand Down Expand Up @@ -4512,13 +4562,33 @@ pub mod builder {
pub struct PlatformTx {
#[doc = " The number of packets sent"]
pub count: usize,
#[doc = " The number of syscalls performed"]
pub syscalls: usize,
#[doc = " The number of syscalls that got blocked"]
pub blocked_syscalls: usize,
#[doc = " The total number of errors encountered since the last event"]
pub total_errors: usize,
#[doc = " The number of specific error codes dropped"]
#[doc = ""]
#[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"]
pub dropped_errors: usize,
}
impl IntoEvent<api::PlatformTx> for PlatformTx {
#[inline]
fn into_event(self) -> api::PlatformTx {
let PlatformTx { count } = self;
let PlatformTx {
count,
syscalls,
blocked_syscalls,
total_errors,
dropped_errors,
} = self;
api::PlatformTx {
count: count.into_event(),
syscalls: syscalls.into_event(),
blocked_syscalls: blocked_syscalls.into_event(),
total_errors: total_errors.into_event(),
dropped_errors: dropped_errors.into_event(),
}
}
}
Expand All @@ -4542,13 +4612,33 @@ pub mod builder {
pub struct PlatformRx {
#[doc = " The number of packets received"]
pub count: usize,
#[doc = " The number of syscalls performed"]
pub syscalls: usize,
#[doc = " The number of syscalls that got blocked"]
pub blocked_syscalls: usize,
#[doc = " The total number of errors encountered since the last event"]
pub total_errors: usize,
#[doc = " The number of specific error codes dropped"]
#[doc = ""]
#[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"]
pub dropped_errors: usize,
}
impl IntoEvent<api::PlatformRx> for PlatformRx {
#[inline]
fn into_event(self) -> api::PlatformRx {
let PlatformRx { count } = self;
let PlatformRx {
count,
syscalls,
blocked_syscalls,
total_errors,
dropped_errors,
} = self;
api::PlatformRx {
count: count.into_event(),
syscalls: syscalls.into_event(),
blocked_syscalls: blocked_syscalls.into_event(),
total_errors: total_errors.into_event(),
dropped_errors: dropped_errors.into_event(),
}
}
}
Expand Down Expand Up @@ -4626,6 +4716,20 @@ pub mod builder {
}
}
#[derive(Clone, Debug)]
pub struct PlatformStarted<'a> {
#[doc = " The local address of the socket"]
pub local_address: SocketAddress<'a>,
}
impl<'a> IntoEvent<api::PlatformStarted<'a>> for PlatformStarted<'a> {
#[inline]
fn into_event(self) -> api::PlatformStarted<'a> {
let PlatformStarted { local_address } = self;
api::PlatformStarted {
local_address: local_address.into_event(),
}
}
}
#[derive(Clone, Debug)]
pub enum PlatformFeatureConfiguration {
#[doc = " Emitted when segment offload was configured"]
Gso {
Expand Down Expand Up @@ -5478,6 +5582,12 @@ mod traits {
let _ = meta;
let _ = event;
}
#[doc = "Called when the `PlatformStarted` event is triggered"]
#[inline]
fn on_platform_started(&mut self, meta: &EndpointMeta, event: &PlatformStarted) {
let _ = meta;
let _ = event;
}
#[doc = r" Called for each event that relates to the endpoint and all connections"]
#[inline]
fn on_event<M: Meta, E: Event>(&mut self, meta: &M, event: &E) {
Expand Down Expand Up @@ -6106,6 +6216,11 @@ mod traits {
(self.1).on_platform_event_loop_sleep(meta, event);
}
#[inline]
fn on_platform_started(&mut self, meta: &EndpointMeta, event: &PlatformStarted) {
(self.0).on_platform_started(meta, event);
(self.1).on_platform_started(meta, event);
}
#[inline]
fn on_event<M: Meta, E: Event>(&mut self, meta: &M, event: &E) {
self.0.on_event(meta, event);
self.1.on_event(meta, event);
Expand Down Expand Up @@ -6173,6 +6288,8 @@ mod traits {
fn on_platform_event_loop_wakeup(&mut self, event: builder::PlatformEventLoopWakeup);
#[doc = "Publishes a `PlatformEventLoopSleep` event to the publisher's subscriber"]
fn on_platform_event_loop_sleep(&mut self, event: builder::PlatformEventLoopSleep);
#[doc = "Publishes a `PlatformStarted` event to the publisher's subscriber"]
fn on_platform_started(&mut self, event: builder::PlatformStarted);
#[doc = r" Returns the QUIC version, if any"]
fn quic_version(&self) -> Option<u32>;
}
Expand Down Expand Up @@ -6300,6 +6417,12 @@ mod traits {
self.subscriber.on_event(&self.meta, &event);
}
#[inline]
fn on_platform_started(&mut self, event: builder::PlatformStarted) {
let event = event.into_event();
self.subscriber.on_platform_started(&self.meta, &event);
self.subscriber.on_event(&self.meta, &event);
}
#[inline]
fn quic_version(&self) -> Option<u32> {
self.quic_version
}
Expand Down Expand Up @@ -7595,6 +7718,7 @@ pub mod testing {
pub platform_feature_configured: u32,
pub platform_event_loop_wakeup: u32,
pub platform_event_loop_sleep: u32,
pub platform_started: u32,
}
impl Drop for Subscriber {
fn drop(&mut self) {
Expand Down Expand Up @@ -7676,6 +7800,7 @@ pub mod testing {
platform_feature_configured: 0,
platform_event_loop_wakeup: 0,
platform_event_loop_sleep: 0,
platform_started: 0,
}
}
}
Expand Down Expand Up @@ -8257,6 +8382,10 @@ pub mod testing {
self.platform_event_loop_sleep += 1;
self.output.push(format!("{meta:?} {event:?}"));
}
fn on_platform_started(&mut self, meta: &api::EndpointMeta, event: &api::PlatformStarted) {
self.platform_started += 1;
self.output.push(format!("{meta:?} {event:?}"));
}
}
#[derive(Clone, Debug)]
pub struct Publisher {
Expand Down Expand Up @@ -8319,6 +8448,7 @@ pub mod testing {
pub platform_feature_configured: u32,
pub platform_event_loop_wakeup: u32,
pub platform_event_loop_sleep: u32,
pub platform_started: u32,
}
impl Publisher {
#[doc = r" Creates a publisher with snapshot assertions enabled"]
Expand Down Expand Up @@ -8390,6 +8520,7 @@ pub mod testing {
platform_feature_configured: 0,
platform_event_loop_wakeup: 0,
platform_event_loop_sleep: 0,
platform_started: 0,
}
}
}
Expand Down Expand Up @@ -8467,6 +8598,11 @@ pub mod testing {
let event = event.into_event();
self.output.push(format!("{event:?}"));
}
fn on_platform_started(&mut self, event: builder::PlatformStarted) {
self.platform_started += 1;
let event = event.into_event();
self.output.push(format!("{event:?}"));
}
fn quic_version(&self) -> Option<u32> {
Some(1)
}
Expand Down
32 changes: 24 additions & 8 deletions quic/s2n-quic-core/src/io/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use crate::{
endpoint::Endpoint,
event::{self, EndpointPublisher},
event::{self, EndpointPublisher, IntoEvent as _},
inet::SocketAddress,
io::{rx::Rx, tx::Tx},
task::cooldown::Cooldown,
time::clock::{ClockWithTimer, Timer},
Expand All @@ -13,29 +14,36 @@ use core::pin::Pin;
pub mod select;
use select::Select;

pub struct EventLoop<E, C, R, T> {
pub trait Stats {
fn publish<P: event::EndpointPublisher>(&mut self, publisher: &mut P);
}

pub struct EventLoop<E, C, R, T, S> {
pub endpoint: E,
pub clock: C,
pub rx: R,
pub tx: T,
pub cooldown: Cooldown,
pub stats: S,
}

impl<E, C, R, T> EventLoop<E, C, R, T>
impl<E, C, R, T, S> EventLoop<E, C, R, T, S>
where
E: Endpoint,
C: ClockWithTimer,
R: Rx<PathHandle = E::PathHandle>,
T: Tx<PathHandle = E::PathHandle>,
S: Stats,
{
/// Starts running the endpoint event loop in an async task
pub async fn start(self) {
pub async fn start(self, local_addr: SocketAddress) {
let Self {
mut endpoint,
clock,
mut rx,
mut tx,
mut cooldown,
mut stats,
} = self;

/// Creates a event publisher with the endpoint's subscriber
Expand All @@ -54,6 +62,10 @@ where
}};
}

publisher!(clock.get_time()).on_platform_started(event::builder::PlatformStarted {
local_address: local_addr.into_event(),
});

let mut timer = clock.timer();

loop {
Expand Down Expand Up @@ -97,14 +109,18 @@ where

// notify the application that we woke up and why
let wakeup_timestamp = clock.get_time();
publisher!(wakeup_timestamp).on_platform_event_loop_wakeup(
event::builder::PlatformEventLoopWakeup {
{
let mut publisher = publisher!(wakeup_timestamp);

publisher.on_platform_event_loop_wakeup(event::builder::PlatformEventLoopWakeup {
timeout_expired,
rx_ready: rx_result.is_some(),
tx_ready: tx_result.is_some(),
application_wakeup,
},
);
});

stats.publish(&mut publisher);
}

match rx_result {
Some(Ok(())) => {
Expand Down
Loading

0 comments on commit 25f2f9b

Please sign in to comment.