diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index 33f879055..934d53eca 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -252,18 +252,20 @@ pub enum PacketProcessorCtx { Router { config: Arc, sessions: Arc, - error_sender: super::error::ErrorSender, - /// Receiver for upstream packets being sent to this downstream - upstream_receiver: crate::components::proxy::sessions::DownstreamReceiver, + error_acc: super::error::ErrorAccumulator, worker_id: usize, }, SessionPool { pool: Arc, - downstream_receiver: tokio::sync::mpsc::Receiver, port: u16, }, } +pub enum PacketReceiver { + Router(crate::components::proxy::sessions::DownstreamReceiver), + SessionPool(tokio::sync::mpsc::Receiver), +} + /// Spawns worker tasks /// /// One task processes received packets, notifying the io-uring loop when a @@ -271,14 +273,11 @@ pub enum PacketProcessorCtx { /// the io-uring loop when there are 1 or more packets available to be sent fn spawn_workers( rt: &tokio::runtime::Runtime, - ctx: PacketProcessorCtx, + receiver: PacketReceiver, pending_sends: PendingSends, - packet_processed_event: EventFdWriter, mut shutdown_rx: crate::ShutdownRx, shutdown_event: EventFdWriter, -) -> tokio::sync::mpsc::Sender { - let (tx, mut rx) = tokio::sync::mpsc::channel::(1); - +) { // Spawn a task that just monitors the shutdown receiver to notify the io-uring loop to exit rt.spawn(async move { // The result is uninteresting, either a shutdown has been signalled, or all senders have been dropped @@ -287,44 +286,8 @@ fn spawn_workers( shutdown_event.write(1); }); - match ctx { - PacketProcessorCtx::Router { - config, - sessions, - error_sender, - worker_id, - upstream_receiver, - } => { - rt.spawn(async move { - let mut last_received_at = None; - - let mut error_acc = super::error::ErrorAccumulator::new(error_sender); - - while let Some(packet) = rx.recv().await { - let received_at = UtcTimestamp::now(); - if let Some(last_received_at) = last_received_at { - metrics::packet_jitter(metrics::READ, &metrics::EMPTY) - .set((received_at - last_received_at).nanos()); - } - last_received_at = Some(received_at); - - let ds_packet = proxy::packet_router::DownstreamPacket { - contents: packet.buffer, - source: packet.source, - }; - - crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task( - ds_packet, - worker_id, - &config, - &sessions, - &mut error_acc, - ); - - packet_processed_event.write(1); - } - }); - + match receiver { + PacketReceiver::Router(upstream_receiver) => { rt.spawn(async move { while let Ok(packet) = upstream_receiver.recv().await { let packet = SendPacket { @@ -336,27 +299,7 @@ fn spawn_workers( } }); } - PacketProcessorCtx::SessionPool { - pool, - port, - mut downstream_receiver, - } => { - rt.spawn(async move { - let mut last_received_at = None; - - while let Some(packet) = rx.recv().await { - pool.process_received_upstream_packet( - packet.buffer, - packet.source, - port, - &mut last_received_at, - ) - .await; - - packet_processed_event.write(1); - } - }); - + PacketReceiver::SessionPool(mut downstream_receiver) => { rt.spawn(async move { while let Some(packet) = downstream_receiver.recv().await { let packet = SendPacket { @@ -369,8 +312,52 @@ fn spawn_workers( }); } } +} - tx +fn process_packet( + ctx: &mut PacketProcessorCtx, + packet_processed_event: &EventFdWriter, + packet: RecvPacket, + last_received_at: &mut Option, +) { + match ctx { + PacketProcessorCtx::Router { + config, + sessions, + worker_id, + error_acc, + } => { + let received_at = UtcTimestamp::now(); + if let Some(last_received_at) = last_received_at { + metrics::packet_jitter(metrics::READ, &metrics::EMPTY) + .set((received_at - *last_received_at).nanos()); + } + *last_received_at = Some(received_at); + + let ds_packet = proxy::packet_router::DownstreamPacket { + contents: packet.buffer, + source: packet.source, + }; + + crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task( + ds_packet, *worker_id, config, sessions, error_acc, + ); + + packet_processed_event.write(1); + } + PacketProcessorCtx::SessionPool { pool, port, .. } => { + let mut last_received_at = None; + + pool.process_received_upstream_packet( + packet.buffer, + packet.source, + *port, + &mut last_received_at, + ); + + packet_processed_event.write(1); + } + } } #[inline] @@ -542,7 +529,8 @@ impl IoUringLoop { pub fn spawn( self, thread_name: String, - ctx: PacketProcessorCtx, + mut ctx: PacketProcessorCtx, + receiver: PacketReceiver, buffer_pool: Arc, shutdown: crate::ShutdownRx, ) -> Result, PipelineError> { @@ -588,11 +576,10 @@ impl IoUringLoop { // Spawn the worker tasks that process in an async context unlike // our io-uring loop below - let process_packet_tx = spawn_workers( + spawn_workers( &rt, - ctx, + receiver, pending_sends.clone(), - process_event.writer(), shutdown, shutdown_event.writer(), ); @@ -618,6 +605,8 @@ impl IoUringLoop { // Notify that we have set everything up let _ = tx.send(()); + let mut last_received_at = None; + let process_event_writer = process_event.writer(); // The core io uring loop 'io: loop { @@ -659,9 +648,12 @@ impl IoUringLoop { } let packet = packet.finalize_recv(ret as usize); - if process_packet_tx.blocking_send(packet).is_err() { - unreachable!("packet process thread has a pending packet"); - } + process_packet( + &mut ctx, + &process_event_writer, + packet, + &mut last_received_at, + ); // Queue the wait for the processing of the packet to finish loop_ctx.push_with_token( diff --git a/src/components/proxy/packet_router/io_uring.rs b/src/components/proxy/packet_router/io_uring.rs index 54475ea7c..22244b4fd 100644 --- a/src/components/proxy/packet_router/io_uring.rs +++ b/src/components/proxy/packet_router/io_uring.rs @@ -43,10 +43,10 @@ impl super::DownstreamReceiveWorkerConfig { io_uring_shared::PacketProcessorCtx::Router { config, sessions, - error_sender, - upstream_receiver, + error_acc: super::super::error::ErrorAccumulator::new(error_sender), worker_id, }, + io_uring_shared::PacketReceiver::Router(upstream_receiver), buffer_pool, shutdown, ) diff --git a/src/components/proxy/sessions.rs b/src/components/proxy/sessions.rs index b926bc55b..f9678bbb7 100644 --- a/src/components/proxy/sessions.rs +++ b/src/components/proxy/sessions.rs @@ -159,7 +159,7 @@ impl SessionPool { self.create_session_from_existing_socket(key, downstream_sender, port) } - pub(crate) async fn process_received_upstream_packet( + pub(crate) fn process_received_upstream_packet( self: &Arc, packet: PoolBuffer, mut recv_addr: SocketAddr, diff --git a/src/components/proxy/sessions/io_uring.rs b/src/components/proxy/sessions/io_uring.rs index 2ad33a610..ce709f8e4 100644 --- a/src/components/proxy/sessions/io_uring.rs +++ b/src/components/proxy/sessions/io_uring.rs @@ -40,11 +40,8 @@ impl super::SessionPool { io_loop.spawn( format!("session-{id}"), - io_uring_shared::PacketProcessorCtx::SessionPool { - pool, - downstream_receiver, - port, - }, + io_uring_shared::PacketProcessorCtx::SessionPool { pool, port }, + io_uring_shared::PacketReceiver::SessionPool(downstream_receiver), buffer_pool, shutdown, )