Skip to content

Commit

Permalink
Just use epoll for qcmp (#1012)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle authored Sep 5, 2024
1 parent 9acd7c0 commit 3392942
Showing 1 changed file with 53 additions and 228 deletions.
281 changes: 53 additions & 228 deletions src/codec/qcmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::{
net::{
phoenix::{DistanceMeasure, Measurement},
DualStackEpollSocket, DualStackLocalSocket,
DualStackEpollSocket,
},
time::{DurationNanos, UtcTimestamp},
};
Expand Down Expand Up @@ -192,248 +192,73 @@ impl Measurement for QcmpMeasurement {
}
}

#[cfg(not(target_os = "linux"))]
pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
let port = crate::net::socket_port(&socket);

uring_spawn!(uring_span!(tracing::debug_span!("qcmp")), async move {
let mut input_buf = vec![0; 1 << 16];
let socket = DualStackLocalSocket::new(port).unwrap();
let mut output_buf = QcmpPacket::default();

loop {
let result = tokio::select! {
result = socket.recv_from(input_buf) => result,
_ = shutdown_rx.changed() => return,
};
match result {
(Ok((size, source)), new_input_buf) => {
input_buf = new_input_buf;
let received_at = UtcTimestamp::now();
let command = match Protocol::parse(&input_buf[..size]) {
Ok(Some(command)) => command,
Ok(None) => {
tracing::debug!("rejected non-qcmp packet");
continue;
}
Err(error) => {
tracing::debug!(%error, "rejected malformed packet");
continue;
}
};

let Protocol::Ping {
client_timestamp,
nonce,
} = command
else {
tracing::warn!("rejected unsupported QCMP packet");
continue;
};

Protocol::ping_reply(nonce, client_timestamp, received_at)
.encode(&mut output_buf);

tracing::debug!("sending ping reply {:?}", &output_buf.buf[..output_buf.len]);

output_buf = match socket.send_to(output_buf, source).await {
(Ok(_), buf) => buf,
(Err(error), buf) => {
tracing::warn!(%error, "error responding to ping");
buf
}
};
}
(Err(error), new_input_buf) => {
tracing::warn!(%error, "error receiving packet");
input_buf = new_input_buf
}
};
}
});

Ok(())
}

#[cfg(target_os = "linux")]
pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
use crate::components::proxy::io_uring_shared::EventFd;
use eyre::Context as _;
use tracing::{instrument::WithSubscriber as _, Instrument as _};

let port = crate::net::socket_port(&socket);

// Create an eventfd so we can signal to the qcmp loop when we want to exit
let mut shutdown_event = EventFd::new()?;
let shutdown = shutdown_event.writer();

// Spawn a task on the main loop whose sole purpose is to signal the eventfd
tokio::task::spawn(async move {
let _ = shutdown_rx.changed().await;
shutdown.write(1);
});

let _thread_span = uring_span!(tracing::debug_span!("qcmp").or_current());
let dispatcher = tracing::dispatcher::get_default(|d| d.clone());

std::thread::Builder::new()
.name("qcmp".into())
.spawn(move || -> eyre::Result<()> {
let _guard = tracing::dispatcher::set_default(&dispatcher);

let mut ring = io_uring::IoUring::new(3).context("unable to create io uring")?;
let (submitter, mut sq, mut cq) = ring.split();

const RECV: u64 = 0;
const SEND: u64 = 1;
const SHUTDOWN: u64 = 2;

// Queue the read from the shutdown eventfd used to signal when the loop
// should exit
let entry = shutdown_event.io_uring_entry().user_data(SHUTDOWN);
// SAFETY: the memory being written to is located on the stack inside the shutdown event, and is alive
// at least as long as the uring loop
unsafe {
sq.push(&entry).context("unable to insert io-uring entry")?;
}

// Our loop is simple and only ever processes one ping/pong pair at a time
// so we just reuse the same buffer for both receives and sends
let mut buf = QcmpPacket::default();
// SAFETY: msghdr is POD
let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };
// SAFETY: msghdr is POD
let addr = unsafe {
socket2::SockAddr::new(
std::mem::zeroed(),
std::mem::size_of::<libc::sockaddr_storage>() as _,
)
};

let mut iov = libc::iovec {
iov_base: buf.buf.as_mut_ptr() as *mut _,
iov_len: 0,
};

msghdr.msg_iov = std::ptr::addr_of_mut!(iov);
msghdr.msg_iovlen = 1;
msghdr.msg_name = addr.as_ptr() as *mut libc::sockaddr_storage as *mut _;
msghdr.msg_namelen = addr.len();

let msghdr_mut = std::ptr::addr_of_mut!(msghdr);

let socket = DualStackLocalSocket::new(port)
.context("failed to create already bound qcmp socket")?;
let socket_fd = socket.raw_fd();

let enqueue_recv =
|sq: &mut io_uring::SubmissionQueue, iov: &mut libc::iovec| -> eyre::Result<()> {
iov.iov_len = MAX_QCMP_PACKET_LEN;
let entry = io_uring::opcode::RecvMsg::new(socket_fd, msghdr_mut)
.build()
.user_data(RECV);
// SAFETY: the memory being written to is located on the stack and outlives the uring loop
unsafe { sq.push(&entry) }.context("unable to insert io-uring entry")?;
Ok(())
};

enqueue_recv(&mut sq, &mut iov)?;

sq.sync();
tokio::task::spawn(
async move {
let mut input_buf = [0u8; MAX_QCMP_PACKET_LEN];
let socket = DualStackEpollSocket::new(port).unwrap();
let mut output_buf = QcmpPacket::default();

loop {
match submitter.submit_and_wait(1) {
Ok(_) => {}
Err(ref err) if err.raw_os_error() == Some(libc::EBUSY) => {}
Err(err) => {
return Err(err).context("failed to submit io-uring operations");
}
}
cq.sync();

let mut has_pending_send = false;
for cqe in &mut cq {
let ret = cqe.result();

match cqe.user_data() {
RECV => {
if ret < 0 {
let error = std::io::Error::from_raw_os_error(-ret).to_string();
tracing::error!(%error, "failed to send QCMP response");
let result = tokio::select! {
result = socket.recv_from(&mut input_buf) => result,
_ = shutdown_rx.changed() => return,
};
match result {
Ok((size, source)) => {
let received_at = UtcTimestamp::now();
let command = match Protocol::parse(&input_buf[..size]) {
Ok(Some(command)) => command,
Ok(None) => {
tracing::debug!("rejected non-qcmp packet");
continue;
}

buf.len = ret as _;
let received_at = UtcTimestamp::now();
let command = match Protocol::parse(&buf) {
Ok(Some(command)) => command,
Ok(None) => {
tracing::debug!("rejected non-QCMP packet");
continue;
}
Err(error) => {
tracing::debug!(%error, "rejected malformed packet");
continue;
}
};

let Protocol::Ping {
client_timestamp,
nonce,
} = command
else {
tracing::warn!("rejected unsupported QCMP packet");
Err(error) => {
tracing::debug!(%error, "rejected malformed packet");
continue;
};

Protocol::ping_reply(nonce, client_timestamp, received_at)
.encode(&mut buf);

tracing::debug!("sending QCMP ping reply");

// Update the iovec with the actual length of the pong
iov.iov_len = buf.len;

// Note we don't have to do anything else with the msghdr
// as the recv has already filled in the socket address
// of the sender, which is also our destination

{
let entry = io_uring::opcode::SendMsg::new(
socket_fd,
std::ptr::addr_of!(msghdr),
)
.build()
.user_data(SEND);
// SAFETY: the memory being read from is located on the stack and outlives the uring loop
if unsafe { sq.push(&entry) }.is_err() {
tracing::error!("failed to enqueue QCMP pong response");
continue;
}
}
};

let Protocol::Ping {
client_timestamp,
nonce,
} = command
else {
tracing::warn!("rejected unsupported QCMP packet");
continue;
};

has_pending_send = true;
}
SEND => {
if ret < 0 {
let error = std::io::Error::from_raw_os_error(-ret).to_string();
tracing::error!(%error, "failed to send QCMP response");
Protocol::ping_reply(nonce, client_timestamp, received_at)
.encode(&mut output_buf);

tracing::debug!(
"sending QCMP pong",
);

match socket.send_to(&output_buf, source).await {
Ok(len) => {
if len != output_buf.len() {
tracing::error!("failed to send entire QCMP pong response, expected {} but only sent {len}", output_buf.len());
}
}
Err(error) => {
tracing::warn!(%error, "error responding to ping");
}
}
SHUTDOWN => {
tracing::info!("QCMP thread was signaled to shutdown");
return Ok(());
}
ud => unreachable!("io-uring user data {ud} is invalid"),
}
}

if !has_pending_send {
enqueue_recv(&mut sq, &mut iov)?;
}

sq.sync();
Err(error) => {
tracing::warn!(%error, "error receiving packet");
}
};
}
})?;
}
.instrument(tracing::debug_span!("qcmp"))
.with_current_subscriber(),
);

Ok(())
}
Expand Down

0 comments on commit 3392942

Please sign in to comment.