diff --git a/src/proxy.rs b/src/proxy.rs index 25895721bb..73edd51362 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -70,7 +70,8 @@ impl DownstreamReceiveWorkerConfig { tokio::select! { result = socket.recv_from(&mut buf) => { match result { - Ok((size, source)) => { + Ok((size, mut source)) => { + crate::utils::net::to_canonical(&mut source); let packet = DownstreamPacket { received_at: chrono::Utc::now().timestamp_nanos_opt().unwrap(), asn_info: crate::maxmind_db::MaxmindDb::lookup(source.ip()), diff --git a/src/proxy/sessions.rs b/src/proxy/sessions.rs index 97a726f389..a581d65a48 100644 --- a/src/proxy/sessions.rs +++ b/src/proxy/sessions.rs @@ -117,10 +117,10 @@ impl SessionPool { }, Ok((size, mut recv_addr)) => { let received_at = chrono::Utc::now().timestamp_nanos_opt().unwrap(); + crate::utils::net::to_canonical(&mut recv_addr); tracing::trace!(%recv_addr, %size, "received packet"); let (downstream_addr, asn_info): (SocketAddr, Option) = { let storage = pool.storage.read(); - to_canonical(&mut recv_addr); let Some(downstream_addr) = storage.destination_to_sources.get(&(recv_addr, port)) else { tracing::warn!(address=%recv_addr, "received traffic from a server that has no downstream"); continue; @@ -315,11 +315,10 @@ impl SessionPool { /// Sends packet data to the appropiate session based on its `key`. pub async fn send( self: &Arc, - mut key: SessionKey, + key: SessionKey, asn_info: Option, packet: &[u8], ) -> Result { - to_canonical(&mut key.source); self.get(key, asn_info) .await? .send(packet) @@ -484,21 +483,6 @@ impl Loggable for Error { } } -fn to_canonical(addr: &mut SocketAddr) { - let ip = match addr.ip() { - std::net::IpAddr::V6(ip) => { - if let Some(mapped) = ip.to_ipv4_mapped() { - std::net::IpAddr::V4(mapped) - } else { - std::net::IpAddr::V6(ip) - } - } - addr => addr, - }; - - addr.set_ip(ip); -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/test_utils.rs b/src/test_utils.rs index b61e44c5a8..e09d847ddc 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -239,7 +239,8 @@ impl TestHelper { { let socket = create_socket().await; // sometimes give ipv6, sometimes ipv4. - let addr = get_address(address_type, &socket); + let mut addr = get_address(address_type, &socket); + crate::test_utils::map_addr_to_localhost(&mut addr); let mut shutdown = self.get_shutdown_subscriber().await; let local_addr = addr; tokio::spawn(async move { diff --git a/src/utils/net.rs b/src/utils/net.rs index 38e11ee2b4..8d41afe8da 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -205,3 +205,21 @@ mod tests { ); } } + +/// Converts a a socket address to its canonical version. +/// This is just a copy of the method available in std but that is currently +/// nightly only. +pub fn to_canonical(addr: &mut SocketAddr) { + let ip = match addr.ip() { + std::net::IpAddr::V6(ip) => { + if let Some(mapped) = ip.to_ipv4_mapped() { + std::net::IpAddr::V4(mapped) + } else { + std::net::IpAddr::V6(ip) + } + } + addr => addr, + }; + + addr.set_ip(ip); +} diff --git a/tests/capture.rs b/tests/capture.rs index 344cdefba4..626166df7d 100644 --- a/tests/capture.rs +++ b/tests/capture.rs @@ -88,7 +88,7 @@ async fn token_router() { assert_eq!( "helloabc", - timeout(Duration::from_secs(5), recv_chan.recv()) + timeout(Duration::from_millis(500), recv_chan.recv()) .await .expect("should have received a packet") .unwrap() @@ -98,6 +98,6 @@ async fn token_router() { let msg = b"helloxyz"; socket.send_to(msg, &local_addr).await.unwrap(); - let result = timeout(Duration::from_secs(3), recv_chan.recv()).await; + let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; assert!(result.is_err(), "should not have received a packet"); } diff --git a/tests/compress.rs b/tests/compress.rs index d537e0364e..6c9d05da62 100644 --- a/tests/compress.rs +++ b/tests/compress.rs @@ -29,7 +29,8 @@ async fn client_and_server() { let echo = t.run_echo_server(&AddressType::Random).await; // create server configuration as - let server_addr = available_addr(&AddressType::Random).await; + let mut server_addr = available_addr(&AddressType::Random).await; + quilkin::test_utils::map_addr_to_localhost(&mut server_addr); let yaml = " on_read: DECOMPRESS on_write: COMPRESS @@ -84,7 +85,7 @@ on_write: DECOMPRESS let (mut rx, tx) = t.open_socket_and_recv_multiple_packets().await; tx.send_to(b"hello", &client_addr).await.unwrap(); - let expected = timeout(Duration::from_secs(5), rx.recv()) + let expected = timeout(Duration::from_millis(500), rx.recv()) .await .expect("should have received a packet") .unwrap(); diff --git a/tests/filter_order.rs b/tests/filter_order.rs index 23e99a389d..58401267b1 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -44,7 +44,7 @@ on_read: COMPRESS on_write: DECOMPRESS "; - let echo = t + let mut echo = t .run_echo_server_with_tap(&AddressType::Random, move |_, bytes, _| { assert!( from_utf8(bytes).is_err(), @@ -53,6 +53,7 @@ on_write: DECOMPRESS }) .await; + quilkin::test_utils::map_to_localhost(&mut echo).await; let server_port = 12346; let server_proxy = quilkin::cli::Proxy { port: server_port, @@ -94,7 +95,7 @@ on_write: DECOMPRESS assert_eq!( "helloxyzabc", - timeout(Duration::from_secs(5), recv_chan.recv()) + timeout(Duration::from_millis(500), recv_chan.recv()) .await .expect("should have received a packet") .unwrap() diff --git a/tests/firewall.rs b/tests/firewall.rs index bdc8a19cb9..10ac11f882 100644 --- a/tests/firewall.rs +++ b/tests/firewall.rs @@ -51,7 +51,7 @@ on_write: assert_eq!( "hello", - timeout(Duration::from_secs(5), rx.recv()) + timeout(Duration::from_millis(500), rx.recv()) .await .expect("should have received a packet") .unwrap() @@ -81,7 +81,7 @@ on_write: assert_eq!( "hello", - timeout(Duration::from_secs(5), rx.recv()) + timeout(Duration::from_millis(500), rx.recv()) .await .expect("should have received a packet") .unwrap() @@ -109,7 +109,7 @@ on_write: "; let mut rx = test(&mut t, port, yaml, &address_type).await; - let result = timeout(Duration::from_secs(3), rx.recv()).await; + let result = timeout(Duration::from_millis(500), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); } @@ -134,7 +134,7 @@ on_write: "; let mut rx = test(&mut t, port, yaml, &address_type).await; - let result = timeout(Duration::from_secs(3), rx.recv()).await; + let result = timeout(Duration::from_millis(500), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); } @@ -159,7 +159,7 @@ on_write: "; let mut rx = test(&mut t, port, yaml, &address_type).await; - let result = timeout(Duration::from_secs(3), rx.recv()).await; + let result = timeout(Duration::from_millis(500), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); } @@ -184,7 +184,7 @@ on_write: "; let mut rx = test(&mut t, port, yaml, &address_type).await; - let result = timeout(Duration::from_secs(3), rx.recv()).await; + let result = timeout(Duration::from_millis(500), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); } diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs index fb2cf0f8e8..7b93007643 100644 --- a/tests/local_rate_limit.rs +++ b/tests/local_rate_limit.rs @@ -54,6 +54,7 @@ period: 1 .unwrap(), ); t.run_server(server_config, server_proxy, None); + tokio::time::sleep(Duration::from_millis(50)).await; let msg = "hello"; let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; diff --git a/tests/match.rs b/tests/match.rs index 6462b9693c..265920c654 100644 --- a/tests/match.rs +++ b/tests/match.rs @@ -95,7 +95,7 @@ on_read: assert_eq!( "helloxyz", - timeout(Duration::from_secs(5), recv_chan.recv()) + timeout(Duration::from_millis(500), recv_chan.recv()) .await .expect("should have received a packet") .unwrap() @@ -107,7 +107,7 @@ on_read: assert_eq!( "helloabc", - timeout(Duration::from_secs(5), recv_chan.recv()) + timeout(Duration::from_millis(500), recv_chan.recv()) .await .expect("should have received a packet") .unwrap() @@ -119,7 +119,7 @@ on_read: assert_eq!( "hellodef", - timeout(Duration::from_secs(5), recv_chan.recv()) + timeout(Duration::from_millis(500), recv_chan.recv()) .await .expect("should have received a packet") .unwrap() @@ -131,7 +131,7 @@ on_read: assert_eq!( "hellodef", - timeout(Duration::from_secs(5), recv_chan.recv()) + timeout(Duration::from_millis(500), recv_chan.recv()) .await .expect("should have received a packet") .unwrap() diff --git a/tests/metrics.rs b/tests/metrics.rs index a04b3c6a69..708b44f921 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -32,7 +32,8 @@ async fn metrics_server() { .port(); // create server configuration - let server_addr = quilkin::test_utils::available_addr(&AddressType::Random).await; + let mut server_addr = quilkin::test_utils::available_addr(&AddressType::Random).await; + quilkin::test_utils::map_addr_to_localhost(&mut server_addr); let server_proxy = quilkin::cli::Proxy { port: server_addr.port(), ..<_>::default() @@ -44,7 +45,7 @@ async fn metrics_server() { t.run_server( server_config, server_proxy, - Some(Some((std::net::Ipv6Addr::UNSPECIFIED, metrics_port).into())), + Some(Some((std::net::Ipv4Addr::UNSPECIFIED, metrics_port).into())), ); // create a local client diff --git a/tests/no_filter.rs b/tests/no_filter.rs index 022eefbf41..a034d7a5bf 100644 --- a/tests/no_filter.rs +++ b/tests/no_filter.rs @@ -53,19 +53,19 @@ async fn echo() { let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; socket.send_to(b"hello", &local_addr).await.unwrap(); - let value = timeout(Duration::from_secs(5), recv_chan.recv()) + let value = timeout(Duration::from_millis(500), recv_chan.recv()) .await .unwrap() .unwrap(); assert_eq!("hello", value); - let value = timeout(Duration::from_secs(5), recv_chan.recv()) + let value = timeout(Duration::from_millis(500), recv_chan.recv()) .await .unwrap() .unwrap(); assert_eq!("hello", value); // should only be two returned items - assert!(timeout(Duration::from_secs(2), recv_chan.recv()) + assert!(timeout(Duration::from_millis(500), recv_chan.recv()) .await .is_err()); } diff --git a/tests/token_router.rs b/tests/token_router.rs index 2e6f4edfcd..8d52a3f7ea 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -90,7 +90,7 @@ quilkin.dev: assert_eq!( "hello", - timeout(Duration::from_secs(5), recv_chan.recv()) + timeout(Duration::from_millis(500), recv_chan.recv()) .await .expect("should have received a packet") .unwrap() @@ -100,6 +100,6 @@ quilkin.dev: let msg = b"helloxyz"; socket.send_to(msg, &local_addr).await.unwrap(); - let result = timeout(Duration::from_secs(3), recv_chan.recv()).await; + let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; assert!(result.is_err(), "should not have received a packet"); }