Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-core): add USDT probes #1866

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quic/s2n-quic-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ checked-counters = []
event-tracing = ["tracing"]
# This feature enables support for third party congestion controller implementations
unstable-congestion-controller = []
usdt = ["dep:probe"]

[dependencies]
atomic-waker = { version = "1", optional = true }
Expand All @@ -34,6 +35,7 @@ insta = { version = ">=1.12", features = ["json"], optional = true }
num-rational = { version = "0.4", default-features = false }
num-traits = { version = "0.2", default-features = false, features = ["libm"] }
pin-project-lite = { version = "0.2" }
probe = { version = "0.5", optional = true }
s2n-codec = { version = "=0.6.1", path = "../../common/s2n-codec", default-features = false }
subtle = { version = "2", default-features = false }
tracing = { version = "0.1", default-features = false, optional = true }
Expand Down
20 changes: 20 additions & 0 deletions quic/s2n-quic-core/src/buffer/receive_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ use bytes::BytesMut;
mod request;
mod slot;

mod probes {
crate::extern_probe!(
extern "probe" {
/// Emitted when a buffer is allocated for a particular offset
#[link_name = s2n_quic_core__buffer__receive_buffer__alloc]
pub fn alloc(offset: u64, capacity: usize);

/// Emitted when a chunk is read from the beginning of the buffer
#[link_name = s2n_quic_core__buffer__receive_buffer__pop]
pub fn pop(offset: u64, len: usize);

/// Emitted when a chunk of data is written at an offset
#[link_name = s2n_quic_core__buffer__receive_buffer__write]
pub fn write(offset: u64, len: usize);
}
);
}

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -349,6 +367,8 @@ impl ReceiveBuffer {
self.slots.pop_front();
}

probes::pop(self.start_offset, out.len());

self.start_offset += out.len() as u64;

self.check_consistency();
Expand Down
2 changes: 2 additions & 0 deletions quic/s2n-quic-core/src/buffer/receive_buffer/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ impl<'a> Request<'a> {

#[inline]
pub fn write(self, buffer: &mut BytesMut) {
super::probes::write(self.offset, self.data.len());

let chunk = buffer.chunk_mut();
unsafe {
let len = self.data.len();
Expand Down
2 changes: 2 additions & 0 deletions quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ pub struct Outcome<'a> {
}

impl Slot {
#[inline]
pub fn new(start: u64, end: u64, data: BytesMut) -> Self {
super::probes::alloc(start, data.capacity());
Self { start, end, data }
}

Expand Down
97 changes: 97 additions & 0 deletions quic/s2n-quic-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,103 @@ macro_rules! assume {
};
}

#[macro_export]
macro_rules! extern_probe {
(extern "probe" {
$(
$(
#[doc = $doc:tt]
)*
#[link_name = $link_name:ident]
$vis:vis fn $fun:ident($($arg:ident: $arg_t:ty),* $(,)?);
)*
}) => {
$(
$(
#[doc = $doc]
)*
#[inline(always)]
$vis fn $fun($($arg: $arg_t),*) {
#[cfg(s2n_quic_probe_print)]
{
struct Args {
$(
$arg: $arg_t,
)*
}

impl ::core::fmt::Display for Args {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
let mut is_first = true;
$(
if !core::mem::take(&mut is_first) {
write!(f, ", ")?;
}
write!(f, "{}: {:?}", stringify!($arg), self.$arg)?;
)*
Ok(())
}
}

let args = Args { $($arg),* };

::std::eprintln!("{}: {}", stringify!($link_name), args);
}

#[cfg(test)]
{
let expected = concat!(module_path!(), "::", stringify!($fun))
.split("::")
.filter(|v| *v != "probes")
.collect::<Vec<_>>().join("__");

let actual = stringify!($link_name);

assert_eq!(&expected, actual);
}

$crate::probe!(
s2n_quic,
$link_name,
$(
$arg
),*
);
}
)*
}
}

#[cfg(feature = "usdt")]
#[doc(hidden)]
pub use probe::probe as __probe;

#[cfg(feature = "usdt")]
#[macro_export]
macro_rules! probe {
($provider:ident, $name:ident $(, $arg:expr)* $(,)?) => {{
// define a function with inline(never) to consolidate probes
let probe = {
#[inline(never)]
|| {
$crate::__probe!($provider, $name, $($arg),*);
}
};
probe();
}}
}

#[cfg(not(feature = "usdt"))]
#[macro_export]
macro_rules! probe {
($provider:ident, $name:ident $(, $arg:expr)* $(,)?) => {{
$(
let _ = $arg;
)*
}}
}

/// Implements a future that wraps `T::poll_ready` and yields after ready
macro_rules! impl_ready_future {
($name:ident, $fut:ident, $output:ty) => {
Expand Down
35 changes: 35 additions & 0 deletions quic/s2n-quic-core/src/sync/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ impl<T: Copy> Builder<T> {
}
}

#[derive(Clone, Copy)]
pub struct AbsoluteIndex {
index: Wrapping<u32>,
mask: u32,
}

impl AbsoluteIndex {
/// Returns the absolute index for the given relative index
#[inline]
pub fn from_relative(&self, relative_index: u32) -> u32 {
// Wrap the cursor around the size of the ring
//
// Masking with a `2^N - 1` value is the same as a mod operation, just more efficient
(self.index + Wrapping(relative_index)).0 & self.mask
}
}

/// A structure for tracking a ring shared between a producer and consumer
///
/// See [xsk.h](https://github.com/xdp-project/xdp-tools/blob/a76e7a2b156b8cfe38992206abe9df1df0a29e38/headers/xdp/xsk.h#L34-L42).
Expand Down Expand Up @@ -188,6 +205,15 @@ impl<T: Copy> Cursor<T> {
self.cached_producer.0 & self.mask
}

/// Returns the absolute producer index for the relative index
#[inline]
pub fn producer_abs_index(&self) -> AbsoluteIndex {
AbsoluteIndex {
index: self.cached_producer,
mask: self.mask,
}
}

/// Returns the cached number of available entries for the consumer
///
/// See [xsk.h](https://github.com/xdp-project/xdp-tools/blob/a76e7a2b156b8cfe38992206abe9df1df0a29e38/headers/xdp/xsk.h#L94).
Expand Down Expand Up @@ -258,6 +284,15 @@ impl<T: Copy> Cursor<T> {
self.cached_consumer.0 & self.mask
}

/// Returns the absolute consumer index for the relative index
#[inline]
pub fn consumer_abs_index(&self) -> AbsoluteIndex {
AbsoluteIndex {
index: self.cached_consumer,
mask: self.mask,
}
}

/// Returns the cached number of available entries for the consumer
///
/// See [xsk.h](https://github.com/xdp-project/xdp-tools/blob/a76e7a2b156b8cfe38992206abe9df1df0a29e38/headers/xdp/xsk.h#L114).
Expand Down
62 changes: 58 additions & 4 deletions quic/s2n-quic-platform/src/socket/io/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,38 @@ use s2n_quic_core::{
path::{LocalAddress, MaxMtu},
};

mod probes {
s2n_quic_core::extern_probe!(
extern "probe" {
/// Emitted when a channel tries to acquire messages
#[link_name = s2n_quic_platform__socket__io__rx__acquire]
pub fn acquire(channel: *const (), count: u32);

/// Emitted when a message is read from a channel
#[link_name = s2n_quic_platform__socket__io__rx__read]
pub fn read(
channel: *const (),
message: u32,
segment_index: usize,
segment_size: usize,
);

/// Emitted when a message is finished being read from a channel
#[link_name = s2n_quic_platform__socket__io__rx__finish_read]
pub fn finish_read(
channel: *const (),
message: u32,
segment_count: usize,
segment_size: usize,
);

/// Emitted when all acquired messages are read from the channel
#[link_name = s2n_quic_platform__socket__io__rx__release]
pub fn release(channel: *const (), count: u32);
}
);
}

/// Structure for receiving messages from consumer channels
pub struct Rx<T: Message> {
channels: Vec<Consumer<T>>,
Expand Down Expand Up @@ -39,13 +71,15 @@ impl<T: Message> rx::Rx for Rx<T> {
let mut is_all_closed = true;

// try to acquire any messages we can from the set of channels
for channel in &mut self.channels {
for channel in self.channels.iter_mut() {
match channel.poll_acquire(u32::MAX, cx) {
Poll::Ready(_) => {
Poll::Ready(count) => {
probes::acquire(channel.as_ptr(), count);
is_all_closed = false;
is_any_ready = true;
}
Poll::Pending => {
probes::acquire(channel.as_ptr(), 0);
is_all_closed &= !channel.is_open();
}
}
Expand Down Expand Up @@ -114,17 +148,36 @@ impl<'a, T: Message> rx::Queue for RxQueue<'a, T> {
for channel in self.channels.iter_mut() {
// one last effort to acquire items if some were received since we last polled
let len = channel.acquire(u32::MAX);
let channel_ptr = channel.as_ptr();

probes::acquire(channel_ptr, len);

let absolute_index = channel.absolute_index();

let data = channel.data();
debug_assert_eq!(data.len(), len as usize);

for message in data {
for (message_idx, message) in data.iter_mut().enumerate() {
let message_idx = absolute_index.from_relative(message_idx as _);

// call the `on_packet` function for each message received
//
// NOTE: it's important that we process all of the messages in the queue as the
// channel is completely drained here.
if let Some(message) = message.rx_read(self.local_address) {
message.for_each(&mut on_packet);
let mut segment_index = 0;
let mut segment_size = 0;
message.for_each(|header, payload| {
probes::read(channel_ptr, message_idx, segment_index, payload.len());

segment_size = segment_size.max(payload.len());

on_packet(header, payload);

segment_index += 1;
});

probes::finish_read(channel_ptr, message_idx, segment_index, segment_size as _);
}

unsafe {
Expand All @@ -134,6 +187,7 @@ impl<'a, T: Message> rx::Queue for RxQueue<'a, T> {
}

// release the messages back to the producer
probes::release(channel_ptr, len);
channel.release(len);
}
}
Expand Down
Loading
Loading