Skip to content

Commit

Permalink
feat(s2n-quic-core): add USDT probes
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Aug 10, 2023
1 parent 9110f50 commit db04f5a
Show file tree
Hide file tree
Showing 19 changed files with 954 additions and 31 deletions.
2 changes: 2 additions & 0 deletions quic/s2n-quic-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ checked-counters = []
event-tracing = ["tracing"]
# This feature enables support for third party congestion controller implementations
unstable-congestion-controller = []
usdt = ["dep:probe"]

[dependencies]
atomic-waker = { version = "1", optional = true }
Expand All @@ -34,6 +35,7 @@ insta = { version = ">=1.12", features = ["json"], optional = true }
num-rational = { version = "0.4", default-features = false }
num-traits = { version = "0.2", default-features = false, features = ["libm"] }
pin-project-lite = { version = "0.2" }
probe = { version = "0.5", optional = true }
s2n-codec = { version = "=0.6.1", path = "../../common/s2n-codec", default-features = false }
subtle = { version = "2", default-features = false }
tracing = { version = "0.1", default-features = false, optional = true }
Expand Down
20 changes: 20 additions & 0 deletions quic/s2n-quic-core/src/buffer/receive_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ use bytes::BytesMut;
mod request;
mod slot;

mod probes {
crate::extern_probe!(
extern "probe" {
/// Emitted when a buffer is allocated for a particular offset
#[link_name = s2n_quic_core__buffer__receive_buffer__alloc]
pub fn alloc(offset: u64, capacity: usize);

/// Emitted when a chunk is read from the beginning of the buffer
#[link_name = s2n_quic_core__buffer__receive_buffer__pop]
pub fn pop(offset: u64, len: usize);

/// Emitted when a chunk of data is written at an offset
#[link_name = s2n_quic_core__buffer__receive_buffer__write]
pub fn write(offset: u64, len: usize);
}
);
}

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -349,6 +367,8 @@ impl ReceiveBuffer {
self.slots.pop_front();
}

probes::pop(self.start_offset, out.len());

self.start_offset += out.len() as u64;

self.check_consistency();
Expand Down
2 changes: 2 additions & 0 deletions quic/s2n-quic-core/src/buffer/receive_buffer/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ impl<'a> Request<'a> {

#[inline]
pub fn write(self, buffer: &mut BytesMut) {
super::probes::write(self.offset, self.data.len());

let chunk = buffer.chunk_mut();
unsafe {
let len = self.data.len();
Expand Down
2 changes: 2 additions & 0 deletions quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ pub struct Outcome<'a> {
}

impl Slot {
#[inline]
pub fn new(start: u64, end: u64, data: BytesMut) -> Self {
super::probes::alloc(start, data.capacity());
Self { start, end, data }
}

Expand Down
97 changes: 97 additions & 0 deletions quic/s2n-quic-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,103 @@ macro_rules! assume {
};
}

#[macro_export]
macro_rules! extern_probe {
(extern "probe" {
$(
$(
#[doc = $doc:tt]
)*
#[link_name = $link_name:ident]
$vis:vis fn $fun:ident($($arg:ident: $arg_t:ty),* $(,)?);
)*
}) => {
$(
$(
#[doc = $doc]
)*
#[inline(always)]
$vis fn $fun($($arg: $arg_t),*) {
#[cfg(s2n_quic_probe_print)]
{
struct Args {
$(
$arg: $arg_t,
)*
}

impl ::core::fmt::Display for Args {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
let mut is_first = true;
$(
if !core::mem::take(&mut is_first) {
write!(f, ", ")?;
}
write!(f, "{}: {:?}", stringify!($arg), self.$arg)?;
)*
Ok(())
}
}

let args = Args { $($arg),* };

::std::eprintln!("{}: {}", stringify!($link_name), args);
}

#[cfg(test)]
{
let expected = concat!(module_path!(), "::", stringify!($fun))
.split("::")
.filter(|v| *v != "probes")
.collect::<Vec<_>>().join("__");

let actual = stringify!($link_name);

assert_eq!(&expected, actual);
}

$crate::probe!(
s2n_quic,
$link_name,
$(
$arg
),*
);
}
)*
}
}

#[cfg(feature = "usdt")]
#[doc(hidden)]
pub use probe::probe as __probe;

#[cfg(feature = "usdt")]
#[macro_export]
macro_rules! probe {
($provider:ident, $name:ident $(, $arg:expr)* $(,)?) => {{
// define a function with inline(never) to consolidate probes
let probe = {
#[inline(never)]
|| {
$crate::__probe!($provider, $name, $($arg),*);
}
};
probe();
}}
}

#[cfg(not(feature = "usdt"))]
#[macro_export]
macro_rules! probe {
($provider:ident, $name:ident $(, $arg:expr)* $(,)?) => {{
$(
let _ = $arg;
)*
}}
}

/// Implements a future that wraps `T::poll_ready` and yields after ready
macro_rules! impl_ready_future {
($name:ident, $fut:ident, $output:ty) => {
Expand Down
35 changes: 35 additions & 0 deletions quic/s2n-quic-core/src/sync/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ impl<T: Copy> Builder<T> {
}
}

#[derive(Clone, Copy)]
pub struct AbsoluteIndex {
index: Wrapping<u32>,
mask: u32,
}

impl AbsoluteIndex {
/// Returns the absolute index for the given relative index
#[inline]
pub fn from_relative(&self, relative_index: u32) -> u32 {
// Wrap the cursor around the size of the ring
//
// Masking with a `2^N - 1` value is the same as a mod operation, just more efficient
(self.index + Wrapping(relative_index)).0 & self.mask
}
}

/// A structure for tracking a ring shared between a producer and consumer
///
/// See [xsk.h](https://github.com/xdp-project/xdp-tools/blob/a76e7a2b156b8cfe38992206abe9df1df0a29e38/headers/xdp/xsk.h#L34-L42).
Expand Down Expand Up @@ -188,6 +205,15 @@ impl<T: Copy> Cursor<T> {
self.cached_producer.0 & self.mask
}

/// Returns the absolute producer index for the relative index
#[inline]
pub fn producer_abs_index(&self) -> AbsoluteIndex {
AbsoluteIndex {
index: self.cached_producer,
mask: self.mask,
}
}

/// Returns the cached number of available entries for the consumer
///
/// See [xsk.h](https://github.com/xdp-project/xdp-tools/blob/a76e7a2b156b8cfe38992206abe9df1df0a29e38/headers/xdp/xsk.h#L94).
Expand Down Expand Up @@ -258,6 +284,15 @@ impl<T: Copy> Cursor<T> {
self.cached_consumer.0 & self.mask
}

/// Returns the absolute consumer index for the relative index
#[inline]
pub fn consumer_abs_index(&self) -> AbsoluteIndex {
AbsoluteIndex {
index: self.cached_consumer,
mask: self.mask,
}
}

/// Returns the cached number of available entries for the consumer
///
/// See [xsk.h](https://github.com/xdp-project/xdp-tools/blob/a76e7a2b156b8cfe38992206abe9df1df0a29e38/headers/xdp/xsk.h#L114).
Expand Down
62 changes: 58 additions & 4 deletions quic/s2n-quic-platform/src/socket/io/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,38 @@ use s2n_quic_core::{
path::{LocalAddress, MaxMtu},
};

mod probes {
s2n_quic_core::extern_probe!(
extern "probe" {
/// Emitted when a channel tries to acquire messages
#[link_name = s2n_quic_platform__socket__io__rx__acquire]
pub fn acquire(channel: *const (), count: u32);

/// Emitted when a message is read from a channel
#[link_name = s2n_quic_platform__socket__io__rx__read]
pub fn read(
channel: *const (),
message: u32,
segment_index: usize,
segment_size: usize,
);

/// Emitted when a message is finished being read from a channel
#[link_name = s2n_quic_platform__socket__io__rx__finish_read]
pub fn finish_read(
channel: *const (),
message: u32,
segment_count: usize,
segment_size: usize,
);

/// Emitted when all acquired messages are read from the channel
#[link_name = s2n_quic_platform__socket__io__rx__release]
pub fn release(channel: *const (), count: u32);
}
);
}

/// Structure for receiving messages from consumer channels
pub struct Rx<T: Message> {
channels: Vec<Consumer<T>>,
Expand Down Expand Up @@ -39,13 +71,15 @@ impl<T: Message> rx::Rx for Rx<T> {
let mut is_all_closed = true;

// try to acquire any messages we can from the set of channels
for channel in &mut self.channels {
for channel in self.channels.iter_mut() {
match channel.poll_acquire(u32::MAX, cx) {
Poll::Ready(_) => {
Poll::Ready(count) => {
probes::acquire(channel.as_ptr(), count);
is_all_closed = false;
is_any_ready = true;
}
Poll::Pending => {
probes::acquire(channel.as_ptr(), 0);
is_all_closed &= !channel.is_open();
}
}
Expand Down Expand Up @@ -114,17 +148,36 @@ impl<'a, T: Message> rx::Queue for RxQueue<'a, T> {
for channel in self.channels.iter_mut() {
// one last effort to acquire items if some were received since we last polled
let len = channel.acquire(u32::MAX);
let channel_ptr = channel.as_ptr();

probes::acquire(channel_ptr, len);

let absolute_index = channel.absolute_index();

let data = channel.data();
debug_assert_eq!(data.len(), len as usize);

for message in data {
for (message_idx, message) in data.iter_mut().enumerate() {
let message_idx = absolute_index.from_relative(message_idx as _);

// call the `on_packet` function for each message received
//
// NOTE: it's important that we process all of the messages in the queue as the
// channel is completely drained here.
if let Some(message) = message.rx_read(self.local_address) {
message.for_each(&mut on_packet);
let mut segment_index = 0;
let mut segment_size = 0;
message.for_each(|header, payload| {
probes::read(channel_ptr, message_idx, segment_index, payload.len());

segment_size = segment_size.max(payload.len());

on_packet(header, payload);

segment_index += 1;
});

probes::finish_read(channel_ptr, message_idx, segment_index, segment_size as _);
}

unsafe {
Expand All @@ -134,6 +187,7 @@ impl<'a, T: Message> rx::Queue for RxQueue<'a, T> {
}

// release the messages back to the producer
probes::release(channel_ptr, len);
channel.release(len);
}
}
Expand Down
Loading

0 comments on commit db04f5a

Please sign in to comment.