Skip to content

Commit

Permalink
Fix typo
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Aug 8, 2024
1 parent b94b360 commit ce225a3
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 75 deletions.
4 changes: 2 additions & 2 deletions src/codec/qcmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
},
time::{DurationNanos, UtcTimestamp},
};
use eyre::Context as _;
use std::sync::Arc;
#[cfg(test)]
use std::time::Duration;
Expand Down Expand Up @@ -258,6 +257,7 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra

#[cfg(target_os = "linux")]
pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
use eyre::Context as _;
use std::os::fd::{AsRawFd, FromRawFd};

let port = crate::net::socket_port(&socket);
Expand Down Expand Up @@ -400,7 +400,7 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra
tracing::debug!("sending QCMP ping reply");

// Update the iovec with the actual length of the pong
iov.iov_len = buf.buf.len();
iov.iov_len = buf.len;

// Note we don't have to do anything else with the msghdr
// as the recv has already filled in the socket address
Expand Down
50 changes: 30 additions & 20 deletions src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! The reference implementation is used for non-Linux targets
impl super::DownstreamReceiveWorkerConfig {
pub async fn spawn(self) -> eyre::Result<tokio::sync::oneshot::Receiver<crate::Result<()>>> {
pub async fn spawn(
self,
_shutdown: crate::ShutdownRx,
) -> eyre::Result<tokio::sync::oneshot::Receiver<()>> {
let Self {
worker_id,
upstream_receiver,
Expand All @@ -27,7 +30,7 @@ impl super::DownstreamReceiveWorkerConfig {
let send_socket = socket.clone();

let inner_task = async move {
tx.send(Ok(()));
tx.send(());

loop {
tokio::select! {
Expand All @@ -38,32 +41,36 @@ impl super::DownstreamReceiveWorkerConfig {
crate::metrics::errors_total(
crate::metrics::WRITE,
&error.to_string(),
None,
&crate::metrics::EMPTY,
)
.inc();
}
Ok((data, asn_info, send_addr)) => {
let (result, _) = send_socket.send_to(data, send_addr).await;
let asn_info = asn_info.as_ref();
Ok(crate::components::proxy::SendPacket {
destination,
asn_info,
data,
}) => {
let (result, _) = send_socket.send_to(data, destination).await;
let asn_info = asn_info.as_ref().into();
match result {
Ok(size) => {
crate::metrics::packets_total(crate::metrics::WRITE, asn_info)
crate::metrics::packets_total(crate::metrics::WRITE, &asn_info)
.inc();
crate::metrics::bytes_total(crate::metrics::WRITE, asn_info)
crate::metrics::bytes_total(crate::metrics::WRITE, &asn_info)
.inc_by(size as u64);
}
Err(error) => {
let source = error.to_string();
crate::metrics::errors_total(
crate::metrics::WRITE,
&source,
asn_info,
&asn_info,
)
.inc();
crate::metrics::packets_dropped_total(
crate::metrics::WRITE,
&source,
asn_info,
&asn_info,
)
.inc();
}
Expand All @@ -83,29 +90,32 @@ impl super::DownstreamReceiveWorkerConfig {
}
}

let mut error_acc =
crate::components::proxy::error::ErrorAccumulator::new(error_sender);

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of 16 a bit integer.
let buffer = buffer_pool.clone().alloc();

let (result, contents) = socket.recv_from(buffer).await;
let received_at = crate::time::UtcTimestamp::now();

match result {
Ok((_size, mut source)) => {
source.set_ip(source.ip().to_canonical());
let packet = super::DownstreamPacket {
received_at: crate::time::UtcTimestamp::now(),
contents,
source,
};
let packet = super::DownstreamPacket { contents, source };

if let Some(last_received_at) = last_received_at {
crate::metrics::packet_jitter(crate::metrics::READ, None)
.set((packet.received_at - last_received_at).nanos());
crate::metrics::packet_jitter(
crate::metrics::READ,
&crate::metrics::EMPTY,
)
.set((received_at - last_received_at).nanos());
}
last_received_at = Some(packet.received_at);
last_received_at = Some(received_at);

Self::process_task(packet, worker_id, &config, &sessions, &error_sender)
Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc)
.await;
}
Err(error) => {
Expand All @@ -117,7 +127,7 @@ impl super::DownstreamReceiveWorkerConfig {
});

use eyre::WrapErr as _;
worker.await.context("failed to spawn receiver task")??;
worker.await.context("failed to spawn receiver task")?;
Ok(rx)
}
}
23 changes: 13 additions & 10 deletions src/components/proxy/sessions/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ impl super::SessionPool {
self: std::sync::Arc<Self>,
raw_socket: socket2::Socket,
port: u16,
downstream_receiver: tokio::sync::mpsc::Receiver<crate::components::proxy::SendPacket>,
mut downstream_receiver: tokio::sync::mpsc::Receiver<crate::components::proxy::SendPacket>,
) -> Result<tokio::sync::oneshot::Receiver<()>, crate::components::proxy::PipelineError> {
let pool = self;

Expand All @@ -25,7 +25,7 @@ impl super::SessionPool {
crate::metrics::errors_total(
crate::metrics::WRITE,
"downstream channel closed",
None,
&crate::metrics::EMPTY,
)
.inc();
break;
Expand All @@ -35,32 +35,35 @@ impl super::SessionPool {
data,
asn_info,
}) => {
tracing::trace!(%dest, length = data.len(), "sending packet upstream");
tracing::trace!(%destination, length = data.len(), "sending packet upstream");
let (result, _) = socket2.send_to(data, destination).await;
let asn_info = asn_info.as_ref();
let asn_info = asn_info.as_ref().into();
match result {
Ok(size) => {
crate::metrics::packets_total(
crate::metrics::READ,
asn_info,
&asn_info,
)
.inc();
crate::metrics::bytes_total(crate::metrics::READ, asn_info)
.inc_by(size as u64);
crate::metrics::bytes_total(
crate::metrics::READ,
&asn_info,
)
.inc_by(size as u64);
}
Err(error) => {
tracing::trace!(%error, "sending packet upstream failed");
let source = error.to_string();
crate::metrics::errors_total(
crate::metrics::READ,
&source,
asn_info,
&asn_info,
)
.inc();
crate::metrics::packets_dropped_total(
crate::metrics::READ,
&source,
asn_info,
&asn_info,
)
.inc();
}
Expand All @@ -80,7 +83,7 @@ impl super::SessionPool {
match result {
Err(error) => {
tracing::trace!(%error, "error receiving packet");
crate::metrics::errors_total(crate::metrics::WRITE, &error.to_string(), None).inc();
crate::metrics::errors_total(crate::metrics::WRITE, &error.to_string(), &crate::metrics::EMPTY).inc();
},
Ok((_size, recv_addr)) => pool.process_received_upstream_packet(buf, recv_addr, port, &mut last_received_at).await,
}
Expand Down
56 changes: 13 additions & 43 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,20 @@
#[cfg(not(target_os = "linux"))]
macro_rules! uring_spawn {
($span:expr, $future:expr) => {{
let (tx, rx) = tokio::sync::oneshot::channel::<Result<(), std::io::Error>>();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
use tracing::Instrument as _;

cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let dispatcher = tracing::dispatcher::get_default(|d| d.clone());
std::thread::Builder::new().name("io-uring".into()).spawn(move || {
let _guard = tracing::dispatcher::set_default(&dispatcher);

match tokio_uring::Runtime::new(&tokio_uring::builder().entries(2048)) {
Ok(runtime) => {
let _ = tx.send(Ok(()));

if let Some(span) = $span {
runtime.block_on($future.instrument(span));
} else {
runtime.block_on($future);
}
}
Err(error) => {
let _ = tx.send(Err(error));
}
};
}).expect("failed to spawn io-uring thread");
} else {
use tracing::instrument::WithSubscriber as _;

let fut = async move {
let _ = tx.send(Ok(()));
$future.await
};

if let Some(span) = $span {
tokio::spawn(fut.instrument(span).with_current_subscriber());
} else {
tokio::spawn(fut.with_current_subscriber());
}
}
use tracing::instrument::WithSubscriber as _;

let fut = async move {
let _ = tx.send(());
$future.await
};

if let Some(span) = $span {
tokio::spawn(fut.instrument(span).with_current_subscriber());
} else {
tokio::spawn(fut.with_current_subscriber());
}
rx
}};
Expand All @@ -65,13 +41,7 @@ macro_rules! uring_spawn {
#[cfg(not(target_os = "linux"))]
macro_rules! uring_inner_spawn {
($future:expr) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
tokio_uring::spawn($future);
} else {
tokio::spawn($future);
}
}
tokio::spawn($future);
};
}

Expand Down

0 comments on commit ce225a3

Please sign in to comment.