Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add QCMP support in XDP I/O loop #1084

Merged
merged 7 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 1 addition & 64 deletions crates/ebpf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/ebpf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2021"

[dependencies]
aya-ebpf = "0.1.1"
aya-log-ebpf = "0.1.1"
network-types = "0.0.7"

[[bin]]
Expand Down
4 changes: 4 additions & 0 deletions crates/ebpf/src/ebpf-main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ static mut COUNTER: u32 = 0;
/// The external port used by clients. Network order.
#[no_mangle]
static EXTERNAL_PORT_NO: u16 = u16::to_be(7777);
/// The port used to respond to QCMP messages. Network order.
#[no_mangle]
static QCMP_PORT_NO: u16 = u16::to_be(7600);

/// The beginning of the port range quilkin will use for server sessions, we
/// take advantage of the fact that, by default, the range Linux uses for
Expand Down Expand Up @@ -116,6 +119,7 @@ pub fn packet_router(ctx: &XdpContext) -> Result<(), ()> {

if dest_port == unsafe { core::ptr::read_volatile(&EXTERNAL_PORT_NO) }
|| u16::from_be(dest_port) >= EPHEMERAL_PORT_START
|| dest_port == unsafe { core::ptr::read_volatile(&QCMP_PORT_NO) }
{
Ok(())
} else {
Expand Down
131 changes: 128 additions & 3 deletions crates/test/tests/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use quilkin::{
filters::{self, StaticFilter as _},
net::{
self,
xdp::process::{self, xdp},
xdp::process::{
self,
xdp::{self, packet::net_types as nt},
},
},
};
use std::{
Expand Down Expand Up @@ -56,6 +59,7 @@ async fn simple_forwarding() {

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -138,6 +142,7 @@ async fn changes_ip_version() {

let mut state = process::State {
external_port: PROXY4.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -269,6 +274,7 @@ async fn packet_manipulation() {

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -330,6 +336,7 @@ async fn packet_manipulation() {

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -399,6 +406,7 @@ async fn packet_manipulation() {

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -491,6 +499,7 @@ async fn multiple_servers() {

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -542,8 +551,6 @@ async fn multiple_servers() {
/// Ensures that surpassing the session limits doesn't completely break
#[tokio::test]
async fn many_sessions() {
use xdp::packet::net_types as nt;

const SERVER: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 1111);
const PROXY: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 7777);

Expand All @@ -568,6 +575,7 @@ async fn many_sessions() {

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -695,6 +703,7 @@ async fn frees_dropped_packets() {

let mut state = process::State {
external_port: PROXY4.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -777,3 +786,119 @@ async fn frees_dropped_packets() {
unsafe { umem.alloc().expect("umem should have available memory") };
}
}

/// Validates we can process QCMP packets with the same loop as regular packets
#[tokio::test]
async fn qcmp() {
use quilkin::{codec::qcmp, time::UtcTimestamp};

const PROXY: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 2020);
const CLIENT: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 9999);

let mut state = process::State {
external_port: 7777.into(),
qcmp_port: PROXY.port().into(),
config: Arc::new(quilkin::Config::default_non_agent()),
destinations: Vec::with_capacity(1),
sessions: Arc::new(Default::default()),
local_ipv4: *PROXY.ip(),
local_ipv6: Ipv6Addr::from_bits(0),
};

let mut umem = xdp::Umem::map(
xdp::umem::UmemCfgBuilder {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: 0,
frame_count: 1,
tx_metadata: false,
}
.build()
.unwrap(),
)
.unwrap();

let mut rx_slab = xdp::HeapSlab::with_capacity(1);
let mut tx_slab = xdp::HeapSlab::with_capacity(1);

// sanity check the umem won't allow more than 1 packet at a time
unsafe {
let first = umem.alloc().unwrap();
assert!(umem.alloc().is_none());
umem.free_packet(first);
};

let mut qp = qcmp::QcmpPacket::default();

let ping_time = UtcTimestamp::from_nanos(100000);

// Valid ping packet
{
// If this fails, the dropped packet wasn't freed
let mut ping_packet = unsafe { umem.alloc().expect("umem has no available packets") };

let ping = qcmp::Protocol::Ping {
client_timestamp: ping_time,
nonce: 99,
};

ping.encode(&mut qp);

etherparse::PacketBuilder::ethernet2([3, 3, 3, 3, 3, 3], [4, 4, 4, 4, 4, 4])
.ipv4(CLIENT.ip().octets(), PROXY.ip().octets(), 64)
.udp(CLIENT.port(), PROXY.port())
.write(&mut ping_packet, &qp)
.unwrap();

rx_slab.push_front(ping_packet);
process::process_packets(&mut rx_slab, &mut umem, &mut tx_slab, &mut state);

let pong_packet = tx_slab.pop_back().unwrap();
let udp = nt::UdpPacket::parse_packet(&pong_packet).unwrap().unwrap();
let pong = qcmp::Protocol::parse(
pong_packet
.slice_at_offset(udp.data_offset, udp.data_length)
.unwrap(),
)
.unwrap()
.unwrap();

match pong {
qcmp::Protocol::PingReply {
client_timestamp,
nonce,
..
} => {
assert_eq!(ping_time, client_timestamp);
assert_eq!(nonce, 99);
}
_ => unreachable!(),
}

umem.free_packet(pong_packet);
}

// A pong packet, should be rejected
{
let mut bad_packet = unsafe { umem.alloc().expect("umem has no available packets") };

let pong = qcmp::Protocol::PingReply {
client_timestamp: ping_time,
nonce: 200,
server_start_timestamp: UtcTimestamp::from_nanos(100001),
server_transmit_timestamp: UtcTimestamp::from_nanos(100002),
};
pong.encode(&mut qp);

etherparse::PacketBuilder::ethernet2([3, 3, 3, 3, 3, 3], [4, 4, 4, 4, 4, 4])
.ipv4(CLIENT.ip().octets(), PROXY.ip().octets(), 64)
.udp(CLIENT.port(), PROXY.port())
.write(&mut bad_packet, &qp)
.unwrap();

rx_slab.push_front(bad_packet);
process::process_packets(&mut rx_slab, &mut umem, &mut tx_slab, &mut state);

assert!(tx_slab.is_empty());
unsafe { umem.alloc().expect("umem should have available memory") };
}
}
Binary file modified crates/xdp/bin/packet-router.bin
Binary file not shown.
8 changes: 7 additions & 1 deletion crates/xdp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,23 @@ pub struct EbpfProgram {
/// be the same port used in the I/O loop to determine if the packet is sent
/// from a client or a server
pub external_port: xdp::packet::net_types::NetworkU16,
/// The port QCMP packets are sent to
pub qcmp_port: xdp::packet::net_types::NetworkU16,
}

impl EbpfProgram {
/// Loads the XDP program.
///
/// The external port, the port used by clients, must be passed in due to
/// how globals work in eBPF.
pub fn load(external_port: u16) -> Result<Self, LoadError> {
pub fn load(external_port: u16, qcmp_port: u16) -> Result<Self, LoadError> {
let mut loader = aya::EbpfLoader::new();
let external_port_no = external_port.to_be();
loader.set_global("EXTERNAL_PORT_NO", &external_port_no, true);

let qcmp_port_no = qcmp_port.to_be();
loader.set_global("QCMP_PORT_NO", &qcmp_port_no, true);

// We exploit the fact that Linux by default does not assign ephemeral
// ports in the full range allowed by IANA, but we want to sanity check
// it here, as otherwise something else could have been assigned an
Expand Down Expand Up @@ -133,6 +138,7 @@ impl EbpfProgram {
Ok(Self {
bpf: loader.load(PROGRAM)?,
external_port: xdp::packet::net_types::NetworkU16(external_port_no),
qcmp_port: xdp::packet::net_types::NetworkU16(qcmp_port_no),
})
}

Expand Down
Loading
Loading