diff --git a/benches/throughput.rs b/benches/throughput.rs index 8bf8e0d016..db5ed804a2 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -5,6 +5,7 @@ use std::time; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use once_cell::sync::Lazy; +use quilkin::test_utils::AddressType; const MESSAGE_SIZE: usize = 0xffff; const DEFAULT_MESSAGE: [u8; 0xffff] = [0xff; 0xffff]; @@ -35,7 +36,7 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) { let proxy = quilkin::cli::Proxy { port, qcmp_port: runtime - .block_on(quilkin::test_utils::available_addr()) + .block_on(quilkin::test_utils::available_addr(&AddressType::Random)) .port(), ..<_>::default() }; diff --git a/build/Makefile b/build/Makefile index ac63f1518c..a0793842ab 100644 --- a/build/Makefile +++ b/build/Makefile @@ -85,7 +85,9 @@ test-quilkin: ensure-build-image --entrypoint=cargo $(BUILD_IMAGE_TAG) clippy --tests -- -D warnings docker run --rm $(common_rust_args) \ --entrypoint=cargo $(BUILD_IMAGE_TAG) fmt -- --check - docker run --rm $(common_rust_args) \ + # --network=host because docker containers are not great at ipv6. + docker run --rm $(common_rust_args) \ + --network=host \ -e RUST_BACKTRACE=1 --entrypoint=cargo $(BUILD_IMAGE_TAG) test -- --nocapture # Run tests against the examples @@ -241,8 +243,10 @@ docs: # Start an interactive shell inside the build image # Useful for testing, or adhoc cargo, gcloud, kubectl or terraform commands shell: ensure-gcloud-dirs ensure-kube-dirs ensure-build-image + # we --network=host because docker containers are not great at ipv6. docker run --rm -it $(DOCKER_RUN_ARGS) $(common_rust_args) \ $(gcloud_mount_args) $(kube_mount_args) \ + --network=host \ --entrypoint=bash $(BUILD_IMAGE_TAG) ensure-build-image: ensure-cargo-registry diff --git a/docs/src/deployment/admin.md b/docs/src/deployment/admin.md index 026e6221de..4180f82f78 100644 --- a/docs/src/deployment/admin.md +++ b/docs/src/deployment/admin.md @@ -1,8 +1,8 @@ # Administration -| services | ports | Protocol | -|----------|-------|-----------| -| Administration | 8000 | HTTP (IPv4 OR IPv6) | +| services | ports | Protocol | +|----------------|-------|---------------------| +| Administration | 8000 | HTTP (IPv4 OR IPv6) | ## Logging By default, Quilkin will log `INFO` level events, you can change this by setting diff --git a/docs/src/services/agent.md b/docs/src/services/agent.md index 4e2b610c4f..cc6a266621 100644 --- a/docs/src/services/agent.md +++ b/docs/src/services/agent.md @@ -2,7 +2,7 @@ | services | ports | Protocol | |----------|-------|-----------| -| QCMP | 7600 | UDP(IPv4 && IPv6) | +| QCMP | 7600 | UDP(IPv4 OR IPv6) | > **Note:** This service is currently in active experimentation and development so there may be bugs which cause it to be unusable for production, as always diff --git a/docs/src/services/proxy.md b/docs/src/services/proxy.md index 16f4d0b11d..84a61b6630 100644 --- a/docs/src/services/proxy.md +++ b/docs/src/services/proxy.md @@ -2,8 +2,8 @@ | Services | Ports | Protocol | |----------|-------|--------------------| -| Proxy | 7777 | UDP (IPv4) | -| QCMP | 7600 | UDP (IPv4 && IPv6) | +| Proxy | 7777 | UDP (IPv4 OR IPv6) | +| QCMP | 7600 | UDP (IPv4 OR IPv6) | "Proxy" is the primary Quilkin service, which acts as a non-transparent UDP proxy. diff --git a/docs/src/services/proxy/filters/firewall.md b/docs/src/services/proxy/filters/firewall.md index 996be96ea2..48c7ffe37e 100644 --- a/docs/src/services/proxy/filters/firewall.md +++ b/docs/src/services/proxy/filters/firewall.md @@ -17,15 +17,17 @@ filters: config: on_read: - action: ALLOW - source: 192.168.51.0/24 + sources: + - 192.168.51.0/24 ports: - - 10 - - 1000-7000 + - 10 + - 1000-7000 on_write: - action: DENY - source: 192.168.51.0/24 + sources: + - 192.168.51.0/24 ports: - - 7000 + - 7000 clusters: default: localities: diff --git a/docs/src/services/proxy/qcmp.md b/docs/src/services/proxy/qcmp.md index 44e44a881e..6a39ff7837 100644 --- a/docs/src/services/proxy/qcmp.md +++ b/docs/src/services/proxy/qcmp.md @@ -2,7 +2,7 @@ | services | ports | Protocol | |----------|-------|-----------| -| QCMP | 7600 | UDP (IPv4 && IPv6) | +| QCMP | 7600 | UDP (IPv4 OR IPv6) | In addition to the TCP based administration API, Quilkin provides a meta API over UDP. The purpose of this API is to provide meta operations that can be diff --git a/docs/src/services/xds.md b/docs/src/services/xds.md index 7371e36037..358a1b9ba6 100644 --- a/docs/src/services/xds.md +++ b/docs/src/services/xds.md @@ -1,8 +1,8 @@ # xDS Control Plane -| services | ports | Protocol | -|----------|-------|------------| -| xDS | 7800 | gRPC(IPv4) | +| services | ports | Protocol | +|----------|-------|---------------------| +| xDS | 7800 | gRPC (IPv4 OR IPv6) | For multi-cluster integration, Quilkin provides a `manage` service, that can be used with a number of configuration discovery providers to provide cluster diff --git a/proto/quilkin/filters/firewall/v1alpha1/firewall.proto b/proto/quilkin/filters/firewall/v1alpha1/firewall.proto index 2a9cc81897..32ef6b5aa2 100644 --- a/proto/quilkin/filters/firewall/v1alpha1/firewall.proto +++ b/proto/quilkin/filters/firewall/v1alpha1/firewall.proto @@ -31,7 +31,7 @@ message Firewall { message Rule { Action action = 1; - string source = 2; + repeated string sources = 2; repeated PortRange ports = 3; } diff --git a/src/cli.rs b/src/cli.rs index 556f06f103..20f56ca3c6 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -262,26 +262,21 @@ impl Cli { #[cfg(test)] mod tests { use super::*; - use std::net::Ipv4Addr; + use std::net::{Ipv4Addr, SocketAddr}; - use tokio::{ - net::UdpSocket, - time::{timeout, Duration}, - }; + use tokio::time::{timeout, Duration}; use crate::{ config::{Filter, Providers}, endpoint::{Endpoint, LocalityEndpoints}, filters::{Capture, StaticFilter, TokenRouter}, + test_utils::{create_socket, AddressType, TestHelper}, }; #[tokio::test] async fn relay_routing() { - let server_port = crate::test_utils::available_addr().await.port(); - let server_socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, server_port)) - .await - .map(Arc::new) - .unwrap(); + let mut t = TestHelper::default(); + let (mut rx, server_socket) = t.open_socket_and_recv_multiple_packets().await; let filters_file = tempfile::NamedTempFile::new().unwrap(); let config = Config::default(); @@ -320,7 +315,11 @@ mod tests { .write() .default_cluster_mut() .insert(LocalityEndpoints::from(vec![Endpoint::with_metadata( - (std::net::Ipv4Addr::LOCALHOST, server_port).into(), + ( + std::net::Ipv4Addr::LOCALHOST, + server_socket.local_ipv4_addr().unwrap().port(), + ) + .into(), crate::endpoint::Metadata { tokens: vec!["abc".into()].into_iter().collect(), }, @@ -329,7 +328,9 @@ mod tests { }) .unwrap(); - let relay_admin_port = crate::test_utils::available_addr().await.port(); + let relay_admin_port = crate::test_utils::available_addr(&AddressType::Random) + .await + .port(); let relay = Cli { admin_address: Some((Ipv4Addr::LOCALHOST, relay_admin_port).into()), config: <_>::default(), @@ -344,7 +345,9 @@ mod tests { log_format: LogFormats::default(), }; - let control_plane_admin_port = crate::test_utils::available_addr().await.port(); + let control_plane_admin_port = crate::test_utils::available_addr(&AddressType::Random) + .await + .port(); let control_plane = Cli { no_admin: false, quiet: true, @@ -363,7 +366,9 @@ mod tests { log_format: LogFormats::default(), }; - let proxy_admin_port = crate::test_utils::available_addr().await.port(); + let proxy_admin_port = crate::test_utils::available_addr(&AddressType::Random) + .await + .port(); let proxy = Cli { no_admin: false, quiet: true, @@ -381,12 +386,9 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; tokio::spawn(proxy.drive()); tokio::time::sleep(Duration::from_millis(500)).await; - let local_addr = crate::test_utils::available_addr().await; - let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, local_addr.port())) - .await - .map(Arc::new) - .unwrap(); + let socket = create_socket().await; let config = Config::default(); + let proxy_address: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, 7777).into(); for _ in 0..5 { let token = random_three_characters(); @@ -398,7 +400,11 @@ mod tests { .write() .default_cluster_mut() .insert(LocalityEndpoints::from(vec![Endpoint::with_metadata( - (std::net::Ipv4Addr::LOCALHOST, server_port).into(), + ( + std::net::Ipv4Addr::LOCALHOST, + server_socket.local_ipv4_addr().unwrap().port(), + ) + .into(), crate::endpoint::Metadata { tokens: vec![token.clone()].into_iter().collect(), }, @@ -410,22 +416,14 @@ mod tests { let mut msg = Vec::from(*b"hello"); msg.extend_from_slice(&token); tracing::info!(?token, "sending packet"); - socket - .send_to(&msg, &(std::net::Ipv4Addr::LOCALHOST, 7777)) - .await - .unwrap(); - - let recv = |socket: Arc| async move { - let mut buf = [0; u16::MAX as usize]; - let length = socket.recv(&mut buf).await.unwrap(); - buf[0..length].to_vec() - }; + socket.send_to(&msg, &proxy_address).await.unwrap(); assert_eq!( - b"hello", - &&*timeout(Duration::from_secs(5), (recv)(server_socket.clone())) + "hello", + timeout(Duration::from_secs(5), rx.recv()) .await .expect("should have received a packet") + .unwrap() ); tracing::info!(?token, "received packet"); @@ -433,9 +431,9 @@ mod tests { tracing::info!(?token, "sending bad packet"); // send an invalid packet let msg = b"hello\xFF\xFF\xFF"; - socket.send_to(msg, &local_addr).await.unwrap(); + socket.send_to(msg, &proxy_address).await.unwrap(); - let result = timeout(Duration::from_secs(3), (recv)(server_socket.clone())).await; + let result = timeout(Duration::from_secs(3), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); tracing::info!(?token, "didn't receive bad packet"); } diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index a067e9e659..41b26bc2a4 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -18,10 +18,11 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use tonic::transport::Endpoint; -use crate::{proxy::SessionMap, utils::net, xds::ResourceType, Config, Result}; +use crate::{proxy::SessionMap, xds::ResourceType, Config, Result}; #[cfg(doc)] use crate::filters::FilterFactory; +use crate::utils::net::DualStackLocalSocket; define_port!(7777); @@ -153,7 +154,7 @@ impl Proxy { // Contains config for each worker task. let mut workers = Vec::with_capacity(num_workers); for worker_id in 0..num_workers { - let socket = Arc::new(net::socket_with_reuse(self.port)?); + let socket = Arc::new(DualStackLocalSocket::new(self.port)?); workers.push(crate::proxy::DownstreamReceiveWorkerConfig { worker_id, socket: socket.clone(), @@ -181,7 +182,7 @@ mod tests { use crate::{ config, endpoint::Endpoint, - test_utils::{available_addr, create_socket, load_test_filters, TestHelper}, + test_utils::{available_addr, create_socket, load_test_filters, AddressType, TestHelper}, }; #[tokio::test] @@ -191,7 +192,7 @@ mod tests { let endpoint1 = t.open_socket_and_recv_single_packet().await; let endpoint2 = t.open_socket_and_recv_single_packet().await; - let local_addr = available_addr().await; + let local_addr = available_addr(&AddressType::Random).await; let proxy = crate::cli::Proxy { port: local_addr.port(), ..<_>::default() @@ -200,8 +201,8 @@ mod tests { let config = Arc::new(crate::Config::default()); config.clusters.modify(|clusters| { clusters.insert_default(vec![ - Endpoint::new(endpoint1.socket.local_addr().unwrap().into()), - Endpoint::new(endpoint2.socket.local_addr().unwrap().into()), + Endpoint::new(endpoint1.socket.local_ipv4_addr().unwrap().into()), + Endpoint::new(endpoint2.socket.local_ipv6_addr().unwrap().into()), ]) }); @@ -234,8 +235,7 @@ mod tests { let mut t = TestHelper::default(); let endpoint = t.open_socket_and_recv_single_packet().await; - - let local_addr = available_addr().await; + let local_addr = available_addr(&AddressType::Random).await; let proxy = crate::cli::Proxy { port: local_addr.port(), ..<_>::default() @@ -243,7 +243,7 @@ mod tests { let config = Arc::new(Config::default()); config.clusters.modify(|clusters| { clusters.insert_default(vec![Endpoint::new( - endpoint.socket.local_addr().unwrap().into(), + endpoint.socket.local_ipv4_addr().unwrap().into(), )]) }); t.run_server(config, proxy, None); @@ -269,7 +269,7 @@ mod tests { load_test_filters(); let endpoint = t.open_socket_and_recv_single_packet().await; - let local_addr = available_addr().await; + let local_addr = available_addr(&AddressType::Random).await; let config = Arc::new(Config::default()); config.filters.store( crate::filters::FilterChain::try_from(vec![config::Filter { @@ -282,7 +282,7 @@ mod tests { ); config.clusters.modify(|clusters| { clusters.insert_default(vec![Endpoint::new( - endpoint.socket.local_addr().unwrap().into(), + endpoint.socket.local_ipv4_addr().unwrap().into(), )]) }); t.run_server( @@ -315,12 +315,12 @@ mod tests { let t = TestHelper::default(); let socket = Arc::new(create_socket().await); - let addr = socket.local_addr().unwrap(); + let addr = socket.local_ipv6_addr().unwrap(); let endpoint = t.open_socket_and_recv_single_packet().await; let msg = "hello"; let config = Arc::new(Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()]) + clusters.insert_default(vec![endpoint.socket.local_ipv6_addr().unwrap()]) }); // we'll test a single DownstreamReceiveWorkerConfig @@ -350,7 +350,7 @@ mod tests { let msg = "hello"; let endpoint = t.open_socket_and_recv_single_packet().await; - let local_addr = available_addr().await; + let local_addr = available_addr(&AddressType::Random).await; let proxy = crate::cli::Proxy { port: local_addr.port(), ..<_>::default() @@ -358,7 +358,7 @@ mod tests { let config = Arc::new(crate::Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()]) + clusters.insert_default(vec![endpoint.socket.local_ipv4_addr().unwrap()]) }); proxy.run_recv_from(&config, <_>::default()).unwrap(); diff --git a/src/config.rs b/src/config.rs index b524703455..48ab1b8654 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,12 +23,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use uuid::Uuid; -mod config_type; -mod error; -pub mod providers; -mod slot; -pub mod watch; - use crate::{ cluster::{Cluster, ClusterMap}, filters::prelude::*, @@ -43,6 +37,12 @@ pub use self::{ config_type::ConfigType, error::ValidationError, providers::Providers, slot::Slot, watch::Watch, }; +mod config_type; +mod error; +pub mod providers; +mod slot; +pub mod watch; + base64_serde_type!(pub Base64Standard, base64::engine::general_purpose::STANDARD); pub(crate) const BACKOFF_INITIAL_DELAY_MILLISECONDS: u64 = 500; @@ -325,11 +325,14 @@ impl From<(String, FilterInstance)> for Filter { #[cfg(test)] mod tests { + use std::net::Ipv6Addr; + use serde_json::json; - use super::*; use crate::endpoint::{Endpoint, Metadata}; + use super::*; + fn parse_config(yaml: &str) -> Config { Config::from_reader(yaml.as_bytes()).unwrap() } @@ -405,6 +408,37 @@ id: server-proxy ) } + #[test] + fn parse_ipv6_endpoint() { + let config: Config = serde_json::from_value(json!({ + "version": "v1alpha1", + "clusters":{ + "default":{ + "localities": [{ + "endpoints": [{ + "address": "[2345:0425:2CA1:0000:0000:0567:5673:24b5]:25999" + }], + }] + } + } + })) + .unwrap(); + + let value = config.clusters.read(); + assert_eq!( + &*value, + &ClusterMap::new_with_default_cluster(vec![Endpoint::new( + ( + "2345:0425:2CA1:0000:0000:0567:5673:24b5" + .parse::() + .unwrap(), + 25999 + ) + .into() + )]) + ) + } + #[test] fn parse_server() { let config: Config = serde_json::from_value(json!({ @@ -422,7 +456,7 @@ id: server-proxy } }, { - "address" : "127.0.0.1:26001", + "address" : "[2345:0425:2CA1:0000:0000:0567:5673:24b5]:25999", "metadata": { "quilkin.dev": { "tokens": ["bmt1eTcweA=="], @@ -450,7 +484,9 @@ id: server-proxy }, ), Endpoint::with_metadata( - "127.0.0.1:26001".parse().unwrap(), + "[2345:0425:2CA1:0000:0000:0567:5673:24b5]:25999" + .parse() + .unwrap(), Metadata { tokens: vec!["nkuy70x"].into_iter().map(From::from).collect(), }, diff --git a/src/endpoint.rs b/src/endpoint.rs index 9bb447725a..145a4a9967 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -16,7 +16,7 @@ //! Types representing where the data is the sent. -mod address; +pub(crate) mod address; mod locality; use serde::{Deserialize, Serialize}; diff --git a/src/endpoint/address.rs b/src/endpoint/address.rs index b23088b672..875aa9ff3d 100644 --- a/src/endpoint/address.rs +++ b/src/endpoint/address.rs @@ -289,7 +289,11 @@ impl TryFrom for EndpointAddress { impl fmt::Display for EndpointAddress { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:{}", self.host, self.port) + if let AddressKind::Ip(IpAddr::V6(ip)) = self.host { + write!(f, "[{}]:{}", ip, self.port) + } else { + write!(f, "{}:{}", self.host, self.port) + } } } @@ -318,7 +322,15 @@ impl FromStr for AddressKind { type Err = std::convert::Infallible; fn from_str(s: &str) -> Result { - Ok(s.parse() + // check for wrapping "[..]" in an ipv6 host + let mut host = s.to_string(); + let len = host.len(); + if len > 2 && s.starts_with('[') && s.ends_with(']') { + host = host[1..len - 1].to_string(); + } + + Ok(host + .parse() .map(Self::Ip) .unwrap_or_else(|_| Self::Name(s.to_owned()))) } @@ -341,3 +353,70 @@ impl Serialize for EndpointAddress { serializer.serialize_str(&self.to_string()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn endpoint_from_string() { + // ipv + let endpoint = "127.0.12.1:4567".parse::().unwrap(); + match endpoint.host { + AddressKind::Name(_) => panic!("Shouldn't be a name"), + AddressKind::Ip(ip) => assert_eq!("127.0.12.1", ip.to_string()), + }; + assert_eq!(4567, endpoint.port); + + // ipv6 + let endpoint = "[2345:0425:2ca1:0000:0000:0567:5673:24b5]:25999" + .parse::() + .unwrap(); + match endpoint.host { + AddressKind::Name(_) => panic!("Shouldn't be a name"), + AddressKind::Ip(ip) => { + assert_eq!("2345:425:2ca1::567:5673:24b5", ip.to_string()) + } + }; + assert_eq!(25999, endpoint.port); + } + + #[test] + fn address_kind_from_string() { + let ak = "127.0.12.1".parse::().unwrap(); + + match ak { + AddressKind::Name(_) => panic!("Shouldn't be a name"), + AddressKind::Ip(ip) => assert_eq!("127.0.12.1", ip.to_string()), + } + + // ipv6 + let ak = "[2345:0425:2ca1:0000:0000:0567:5673:24b5]" + .parse::() + .unwrap(); + match ak { + AddressKind::Name(_) => panic!("Shouldn't be a name"), + AddressKind::Ip(ip) => { + assert_eq!("2345:425:2ca1::567:5673:24b5", ip.to_string()) + } + }; + + let ak = "2345:0425:2ca1:0000:0000:0567:5673:24b5" + .parse::() + .unwrap(); + match ak { + AddressKind::Name(_) => panic!("Shouldn't be a name"), + AddressKind::Ip(ip) => { + assert_eq!("2345:425:2ca1::567:5673:24b5", ip.to_string()) + } + }; + + let ak = "my.domain.com".parse::().unwrap(); + match ak { + AddressKind::Name(name) => { + assert_eq!("my.domain.com", name) + } + AddressKind::Ip(_) => panic!("shouldn't be an ip"), + }; + } +} diff --git a/src/filters/firewall.rs b/src/filters/firewall.rs index 535cc52378..996803aa33 100644 --- a/src/filters/firewall.rs +++ b/src/filters/firewall.rs @@ -130,7 +130,7 @@ mod tests { let firewall = Firewall { on_read: vec![Rule { action: Action::Allow, - source: "192.168.75.0/24".parse().unwrap(), + sources: vec!["192.168.75.0/24".parse().unwrap()], ports: vec![PortRange::new(10, 100).unwrap()], }], on_write: vec![], @@ -161,7 +161,7 @@ mod tests { on_read: vec![], on_write: vec![Rule { action: Action::Allow, - source: "192.168.75.0/24".parse().unwrap(), + sources: vec!["192.168.75.0/24".parse().unwrap()], ports: vec![PortRange::new(10, 100).unwrap()], }], }; diff --git a/src/filters/firewall/config.rs b/src/filters/firewall/config.rs index 76357121dd..fd58c465f9 100644 --- a/src/filters/firewall/config.rs +++ b/src/filters/firewall/config.rs @@ -14,9 +14,11 @@ * limitations under the License. */ -use std::{convert::TryFrom, fmt, fmt::Formatter, net::SocketAddr, ops::Range}; +use std::net::IpAddr; +use std::str::FromStr; +use std::{convert::TryFrom, fmt, fmt::Formatter, net::SocketAddr, ops::Range, vec}; -use ipnetwork::IpNetwork; +use ipnetwork::{IpNetwork, IpNetworkError}; use schemars::JsonSchema; use serde::de::{self, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -63,18 +65,49 @@ impl From for Action { } } +/// Cidr notation for an ipv6 or ipv4 netmask +#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize, JsonSchema)] +pub struct Cidr(#[schemars(with = "String")] IpNetwork); + +impl FromStr for Cidr { + type Err = IpNetworkError; + + fn from_str(s: &str) -> Result { + let ip = IpNetwork::from_str(s)?; + Ok(Self(ip)) + } +} + +impl Cidr { + /// Does this Address match the netmask? + /// If the mask is ipv4 and the address is ipv6, this will attempt to see if it's a + /// compatible or mapped ipv4->ipv6 address and match that. + pub fn contains(&self, ip: IpAddr) -> bool { + // if we have a v4 mask, but a v6 address, let's see if it's a compatible or mapped + // ipv4->ipv6 address + if let IpNetwork::V4(v4network) = self.0 { + if let IpAddr::V6(v6) = ip { + if let Some(ipv4) = v6.to_ipv4() { + return v4network.contains(ipv4); + } + } + } + + self.0.contains(ip) + } +} + /// Combination of CIDR range, port range and action to take. #[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize, JsonSchema)] pub struct Rule { pub action: Action, /// ipv4 or ipv6 CIDR address. - #[schemars(with = "String")] - pub source: IpNetwork, + pub sources: Vec, pub ports: Vec, } impl Rule { - /// Returns `true` if `address` matches the provided CIDR address as well + /// Returns `true` if any `address` matches the provided CIDR addresses as well /// as at least one of the port ranges in the [Rule]. /// /// # Examples @@ -83,7 +116,7 @@ impl Rule { /// /// let rule = quilkin::filters::firewall::Rule { /// action: Action::Allow, - /// source: "192.168.75.0/24".parse().unwrap(), + /// sources: vec!["192.168.75.0/24".parse().unwrap()], /// ports: vec![PortRange::new(10, 100).unwrap()], /// }; /// @@ -97,13 +130,17 @@ impl Rule { /// assert!(!rule.contains(([192, 168, 76, 10], 40).into())); /// ``` pub fn contains(&self, address: SocketAddr) -> bool { - if !self.source.contains(address.ip()) { - return false; - } - - self.ports + if self + .sources .iter() - .any(|range| range.contains(&address.port())) + .any(|source| source.contains(address.ip())) + { + return self + .ports + .iter() + .any(|range| range.contains(&address.port())); + } + false } } @@ -111,7 +148,11 @@ impl From for proto::firewall::Rule { fn from(rule: Rule) -> Self { Self { action: proto::firewall::Action::from(rule.action) as i32, - source: rule.source.to_string(), + sources: rule + .sources + .into_iter() + .map(|cidr| cidr.0.to_string()) + .collect(), ports: rule.ports.into_iter().map(From::from).collect(), } } @@ -246,14 +287,24 @@ impl TryFrom for Config { .map_err(|err| ConvertProtoConfigError::new(format!("{err}"), Some("ports".into()))) } - fn convert_rule(rule: &proto::firewall::Rule) -> Result { - let action = Action::from(rule.action()); - let source = IpNetwork::try_from(rule.source.as_str()).map_err(|err| { + fn convert_source(s: &str) -> Result { + let i = IpNetwork::try_from(s).map_err(|err| { ConvertProtoConfigError::new( format!("invalid source: {err:?}"), Some("source".into()), ) })?; + Ok(Cidr(i)) + } + + fn convert_rule(rule: &proto::firewall::Rule) -> Result { + let action = Action::from(rule.action()); + + let sources = rule + .sources + .iter() + .map(|s| convert_source(s.as_str())) + .collect::, ConvertProtoConfigError>>()?; let ports = rule .ports @@ -263,7 +314,7 @@ impl TryFrom for Config { Ok(Rule { action, - source, + sources, ports, }) } @@ -286,19 +337,22 @@ impl TryFrom for Config { #[cfg(test)] mod tests { use super::*; + use std::net::IpAddr; #[test] fn deserialize_yaml() { let yaml = " on_read: - action: ALLOW - source: 192.168.51.0/24 + sources: + - 192.168.51.0/24 ports: - 10 - 1000-7000 on_write: - action: DENY - source: 192.168.75.0/24 + sources: + - 192.168.75.0/24 ports: - 7000 "; @@ -307,7 +361,7 @@ on_write: let rule1 = config.on_read[0].clone(); assert_eq!(rule1.action, Action::Allow); - assert_eq!(rule1.source, "192.168.51.0/24".parse().unwrap()); + assert_eq!(rule1.sources[0].0, "192.168.51.0/24".parse().unwrap()); assert_eq!(2, rule1.ports.len()); assert_eq!(10, rule1.ports[0].0.start); assert_eq!(11, rule1.ports[0].0.end); @@ -316,7 +370,7 @@ on_write: let rule2 = config.on_write[0].clone(); assert_eq!(rule2.action, Action::Deny); - assert_eq!(rule2.source, "192.168.75.0/24".parse().unwrap()); + assert_eq!(rule2.sources[0].0, "192.168.75.0/24".parse().unwrap()); assert_eq!(1, rule2.ports.len()); assert_eq!(7000, rule2.ports[0].0.start); assert_eq!(7001, rule2.ports[0].0.end); @@ -342,12 +396,12 @@ on_write: let proto_config = proto::Firewall { on_read: vec![proto::firewall::Rule { action: proto::firewall::Action::Allow as i32, - source: "192.168.75.0/24".into(), + sources: vec!["192.168.75.0/24".into()], ports: vec![proto::firewall::PortRange { min: 10, max: 100 }], }], on_write: vec![proto::firewall::Rule { action: proto::firewall::Action::Deny as i32, - source: "192.168.124.0/24".into(), + sources: vec!["192.168.124.0/24".into()], ports: vec![proto::firewall::PortRange { min: 50, max: 51 }], }], }; @@ -356,14 +410,14 @@ on_write: let rule1 = config.on_read[0].clone(); assert_eq!(rule1.action, Action::Allow); - assert_eq!(rule1.source, "192.168.75.0/24".parse().unwrap()); + assert_eq!(rule1.sources[0].0, "192.168.75.0/24".parse().unwrap()); assert_eq!(1, rule1.ports.len()); assert_eq!(10, rule1.ports[0].0.start); assert_eq!(100, rule1.ports[0].0.end); let rule2 = config.on_write[0].clone(); assert_eq!(rule2.action, Action::Deny); - assert_eq!(rule2.source, "192.168.124.0/24".parse().unwrap()); + assert_eq!(rule2.sources[0].0, "192.168.124.0/24".parse().unwrap()); assert_eq!(1, rule2.ports.len()); assert_eq!(50, rule2.ports[0].0.start); assert_eq!(51, rule2.ports[0].0.end); @@ -371,19 +425,47 @@ on_write: #[test] fn rule_contains() { + fn ipv4_test(rule: &Rule) { + let ip = [192, 168, 75, 10]; + assert!(rule.contains((ip, 50).into())); + assert!(rule.contains((ip, 99).into())); + assert!(rule.contains((ip, 10).into())); + + assert!(!rule.contains((ip, 5).into())); + assert!(!rule.contains((ip, 1000).into())); + assert!(!rule.contains(([192, 168, 76, 10], 40).into())); + } + + // test with a single mask + let rule = Rule { + action: Action::Allow, + sources: vec!["192.168.75.0/24".parse().unwrap()], + ports: vec![PortRange::new(10, 100).unwrap()], + }; + ipv4_test(&rule); + + // test with multiple masks. let rule = Rule { action: Action::Allow, - source: "192.168.75.0/24".parse().unwrap(), + sources: vec![ + "192.168.75.0/24".parse().unwrap(), + "198.168.75.0/24".parse().unwrap(), + ], ports: vec![PortRange::new(10, 100).unwrap()], }; + ipv4_test(&rule); + + // test ipv4 to ipv6 compatible + let ip = "::ffff:192.168.75.10".parse::().unwrap(); + assert!(rule.contains((ip, 50).into())); + let ip = "::ffff:197.168.75.10".parse::().unwrap(); + assert!(!rule.contains((ip, 50).into())); - let ip = [192, 168, 75, 10]; + // test ipv4 to ipv6 mapped + let ip = "::ffff:c0a8:4b0a".parse::().unwrap(); assert!(rule.contains((ip, 50).into())); - assert!(rule.contains((ip, 99).into())); - assert!(rule.contains((ip, 10).into())); - assert!(!rule.contains((ip, 5).into())); - assert!(!rule.contains((ip, 1000).into())); - assert!(!rule.contains(([192, 168, 76, 10], 40).into())); + let ip = "::ffff:c5a8:4b0a".parse::().unwrap(); + assert!(!rule.contains((ip, 50).into())); } } diff --git a/src/protocol.rs b/src/protocol.rs index b0a52f5fa7..95993ed18c 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -17,10 +17,9 @@ //! Logic for parsing and generating Quilkin Control Message Protocol (QCMP) messages. use nom::bytes::complete; -use std::net::SocketAddr; use tracing::Instrument; -use crate::utils::net; +use crate::utils::net::DualStackLocalSocket; // Magic number to distinguish control packets from regular traffic. const MAGIC_NUMBER: &[u8] = b"QLKN"; @@ -34,29 +33,23 @@ const DISCRIMINANT_LEN: usize = 1; type Result = std::result::Result; pub async fn spawn(port: u16) -> crate::Result<()> { - let socket = net::DualStackLocalSocket::new(port)?; + let socket = DualStackLocalSocket::new(port)?; let v4_addr = socket.local_ipv4_addr()?; - let v6_addr = socket.local_ip6_addr()?; + let v6_addr = socket.local_ipv6_addr()?; tokio::spawn( async move { // Initialize a buffer for the UDP packet. We use the maximum size of a UDP // packet, which is the maximum value of 16 a bit integer. - let mut v4_buf = vec![0; 1 << 16]; - let mut v6_buf = vec![0; 1 << 16]; + let mut buf = vec![0; 1 << 16]; let mut output_buf = Vec::new(); loop { tracing::info!(%v4_addr, %v6_addr, "awaiting qcmp packets"); - match socket.recv_from(&mut v4_buf, &mut v6_buf).await { + match socket.recv_from(&mut buf).await { Ok((size, source)) => { let received_at = chrono::Utc::now().timestamp_nanos(); - let contents = match source { - SocketAddr::V4(_) => &v4_buf[..size], - SocketAddr::V6(_) => &v6_buf[..size], - }; - - let command = match Protocol::parse(contents) { + let command = match Protocol::parse(&buf[..size]) { Ok(Some(command)) => command, Ok(None) => { tracing::debug!("rejected non-qcmp packet"); diff --git a/src/proxy.rs b/src/proxy.rs index fcc0574989..5812d45122 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -14,19 +14,18 @@ * limitations under the License. */ -mod sessions; - -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; -use tokio::net::UdpSocket; +pub use sessions::{Session, SessionKey, SessionMap}; use crate::{ endpoint::{Endpoint, EndpointAddress}, filters::{Filter, ReadContext}, + utils::net::DualStackLocalSocket, Config, }; -pub use sessions::{Session, SessionKey, SessionMap}; +mod sessions; /// Packet received from local port #[derive(Debug)] @@ -34,7 +33,7 @@ struct DownstreamPacket { asn_info: Option, contents: Vec, received_at: i64, - source: std::net::SocketAddr, + source: SocketAddr, } /// Represents the required arguments to run a worker task that @@ -43,7 +42,7 @@ pub(crate) struct DownstreamReceiveWorkerConfig { /// ID of the worker. pub worker_id: usize, /// Socket with reused port from which the worker receives packets. - pub socket: Arc, + pub socket: Arc, pub config: Arc, pub sessions: SessionMap, } @@ -65,7 +64,7 @@ impl DownstreamReceiveWorkerConfig { loop { tracing::debug!( id = worker_id, - addr = ?socket.local_addr(), + port = ?socket.local_ipv6_addr().map(|addr| addr.port()), "Awaiting packet" ); @@ -107,7 +106,7 @@ impl DownstreamReceiveWorkerConfig { packet: DownstreamPacket, source: std::net::SocketAddr, worker_id: usize, - socket: &Arc, + socket: &Arc, config: &Arc, sessions: &SessionMap, ) { @@ -158,7 +157,7 @@ impl DownstreamReceiveWorkerConfig { async fn process_downstream_received_packet( packet: DownstreamPacket, config: Arc, - downstream_socket: Arc, + downstream_socket: Arc, sessions: SessionMap, ) -> Result { let endpoints: Vec<_> = config.clusters.read().endpoints().collect(); @@ -193,7 +192,7 @@ impl DownstreamReceiveWorkerConfig { packet: &[u8], recv_addr: &EndpointAddress, endpoint: &Endpoint, - downstream_socket: &Arc, + downstream_socket: &Arc, config: &Arc, sessions: &SessionMap, asn_info: Option, diff --git a/src/proxy/sessions.rs b/src/proxy/sessions.rs index 59db2ce723..a50a9b8833 100644 --- a/src/proxy/sessions.rs +++ b/src/proxy/sessions.rs @@ -14,9 +14,7 @@ * limitations under the License. */ -pub(crate) mod metrics; - -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use tokio::{ net::UdpSocket, @@ -29,9 +27,11 @@ use crate::{ endpoint::{Endpoint, EndpointAddress}, filters::{Filter, WriteContext}, maxmind_db::IpNetEntry, - utils::Loggable, + utils::{net::DualStackLocalSocket, Loggable}, }; +pub(crate) mod metrics; + pub type SessionMap = crate::ttl_map::TtlMap; /// Session encapsulates a UDP stream session @@ -79,7 +79,7 @@ impl Session { pub fn new( config: Arc, source: EndpointAddress, - downstream_socket: Arc, + downstream_socket: Arc, dest: Endpoint, asn_info: Option, ) -> Result { @@ -110,13 +110,16 @@ impl Session { let address = self.dest.address.clone(); async move { + let connect_addr = address.to_socket_addr().await?; + let bind_addr: SocketAddr = match connect_addr { + SocketAddr::V4(_) => (std::net::Ipv4Addr::UNSPECIFIED, 0).into(), + SocketAddr::V6(_) => (std::net::Ipv6Addr::UNSPECIFIED, 0).into(), + }; + upstream_socket .get_or_try_init(|| async { - let upstream_socket = - UdpSocket::bind((std::net::Ipv4Addr::UNSPECIFIED, 0)).await?; - upstream_socket - .connect(address.to_socket_addr().await?) - .await?; + let upstream_socket = UdpSocket::bind(bind_addr).await?; + upstream_socket.connect(connect_addr).await?; Ok(Arc::new(upstream_socket)) }) .await @@ -126,7 +129,11 @@ impl Session { /// run starts processing receiving upstream udp packets /// and sending them back downstream - fn run(&self, downstream_socket: Arc, mut shutdown_rx: watch::Receiver<()>) { + fn run( + &self, + downstream_socket: Arc, + mut shutdown_rx: watch::Receiver<()>, + ) { let source = self.source.clone(); let config = self.config.clone(); let endpoint = self.dest.clone(); @@ -204,7 +211,7 @@ impl Session { /// process_recv_packet processes a packet that is received by this session. async fn process_recv_packet( - downstream_socket: &Arc, + downstream_socket: &Arc, packet_ctx: ReceivedPacketContext<'_>, ) -> Result { let ReceivedPacketContext { @@ -230,7 +237,7 @@ impl Session { let packet = context.contents.as_ref(); tracing::trace!(%from, dest = %addr, contents = %crate::utils::base64_encode(packet), "sending packet downstream"); downstream_socket - .send_to(packet, addr) + .send_to(packet, &addr) .await .map_err(Error::SendTo) } @@ -290,20 +297,20 @@ impl Loggable for Error { mod tests { use std::{str::from_utf8, sync::Arc, time::Duration}; - use super::*; - use tokio::time::timeout; use crate::{ endpoint::{Endpoint, EndpointAddress}, proxy::sessions::ReceivedPacketContext, - test_utils::{create_socket, new_test_config, TestHelper}, + test_utils::{create_socket, new_test_config, AddressType, TestHelper}, }; + use super::*; + #[tokio::test] async fn session_send_and_receive() { let mut t = TestHelper::default(); - let addr = t.run_echo_server().await; + let addr = t.run_echo_server(&AddressType::Random).await; let endpoint = Endpoint::new(addr.clone()); let socket = Arc::new(create_socket().await); let msg = "hello"; @@ -334,7 +341,7 @@ mod tests { let socket = Arc::new(create_socket().await); let endpoint = Endpoint::new("127.0.1.1:80".parse().unwrap()); - let dest: EndpointAddress = socket.local_addr().unwrap().into(); + let dest: EndpointAddress = socket.local_ipv4_addr().unwrap().into(); // first test with no filtering let msg = "hello"; diff --git a/src/test_utils.rs b/src/test_utils.rs index 21023d2361..fdf83117f3 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -14,13 +14,11 @@ * limitations under the License. */ +use std::net::Ipv4Addr; /// Common utilities for testing use std::{net::SocketAddr, str::from_utf8, sync::Arc, sync::Once}; -use tokio::{ - net::UdpSocket, - sync::{mpsc, oneshot, watch}, -}; +use tokio::sync::{mpsc, oneshot, watch}; use tracing_subscriber::EnvFilter; use crate::{ @@ -29,6 +27,7 @@ use crate::{ endpoint::{Endpoint, EndpointAddress, LocalityEndpoints}, filters::{prelude::*, FilterRegistry}, metadata::Value, + utils::net::DualStackLocalSocket, }; static LOG_ONCE: Once = Once::new(); @@ -44,14 +43,39 @@ pub fn enable_log(filter: impl Into) { }); } +/// Which type of Address do you want? Random may give ipv4 or ipv6 +pub enum AddressType { + Random, + Ipv4, + Ipv6, +} + /// Returns a local address on a port that is not assigned to another test. -pub async fn available_addr() -> SocketAddr { +/// If Random address tye is used, it might be v4, Might be v6. It's random. +pub async fn available_addr(address_type: &AddressType) -> SocketAddr { let socket = create_socket().await; - let addr = socket.local_addr().unwrap(); + let addr = get_address(address_type, &socket); + tracing::debug!(addr = ?addr, "test_util::available_addr"); addr } +fn get_address(address_type: &AddressType, socket: &DualStackLocalSocket) -> SocketAddr { + let addr = match address_type { + AddressType::Random => { + // sometimes give ipv6, sometimes ipv4. + match rand::random() { + true => socket.local_ipv6_addr().unwrap(), + false => socket.local_ipv4_addr().unwrap(), + } + } + AddressType::Ipv4 => socket.local_ipv4_addr().unwrap(), + AddressType::Ipv6 => socket.local_ipv6_addr().unwrap(), + }; + tracing::debug!(addr = ?addr, "test_util::get_address"); + addr +} + // TestFilter is useful for testing that commands are executing filters appropriately. pub struct TestFilter; @@ -102,7 +126,7 @@ pub struct TestHelper { /// Returned from [creating a socket](TestHelper::open_socket_and_recv_single_packet) pub struct OpenSocketRecvPacket { /// The opened socket - pub socket: Arc, + pub socket: Arc, /// A channel on which the received packet will be forwarded. pub packet_rx: oneshot::Receiver, } @@ -136,7 +160,7 @@ impl TestHelper { let socket_recv = socket.clone(); tokio::spawn(async move { let mut buf = vec![0; 1024]; - let size = socket_recv.recv(&mut buf).await.unwrap(); + let (size, _) = socket_recv.recv_from(&mut buf).await.unwrap(); packet_tx .send(from_utf8(&buf[..size]).unwrap().to_string()) .unwrap(); @@ -148,9 +172,28 @@ impl TestHelper { /// returned channel. pub async fn open_socket_and_recv_multiple_packets( &mut self, - ) -> (mpsc::Receiver, Arc) { - let (packet_tx, packet_rx) = mpsc::channel::(10); + ) -> (mpsc::Receiver, Arc) { let socket = Arc::new(create_socket().await); + let packet_rx = self.recv_multiple_packets(&socket).await; + (packet_rx, socket) + } + + // Same as above, but sometimes you just need an ipv4 socket + pub async fn open_ipv4_socket_and_recv_multiple_packets( + &mut self, + ) -> (mpsc::Receiver, Arc) { + let socket = Arc::new( + DualStackLocalSocket::new_with_address((Ipv4Addr::UNSPECIFIED, 0).into()).unwrap(), + ); + let packet_rx = self.recv_multiple_packets(&socket).await; + (packet_rx, socket) + } + + async fn recv_multiple_packets( + &mut self, + socket: &Arc, + ) -> mpsc::Receiver { + let (packet_tx, packet_rx) = mpsc::channel::(10); let mut shutdown_rx = self.get_shutdown_subscriber().await; let socket_recv = socket.clone(); tokio::spawn(async move { @@ -174,24 +217,30 @@ impl TestHelper { } } }); - (packet_rx, socket) + packet_rx } /// Runs a simple UDP server that echos back payloads. /// Returns the server's address. - pub async fn run_echo_server(&mut self) -> EndpointAddress { - self.run_echo_server_with_tap(|_, _, _| {}).await + pub async fn run_echo_server(&mut self, address_type: &AddressType) -> EndpointAddress { + self.run_echo_server_with_tap(address_type, |_, _, _| {}) + .await } /// Runs a simple UDP server that echos back payloads. /// The provided function is invoked for each received payload. /// Returns the server's address. - pub async fn run_echo_server_with_tap(&mut self, tap: F) -> EndpointAddress + pub async fn run_echo_server_with_tap( + &mut self, + address_type: &AddressType, + tap: F, + ) -> EndpointAddress where F: Fn(SocketAddr, &[u8], SocketAddr) + Send + 'static, { let socket = create_socket().await; - let addr = socket.local_addr().unwrap(); + // sometimes give ipv6, sometimes ipv4. + let addr = get_address(address_type, &socket); let mut shutdown = self.get_shutdown_subscriber().await; let local_addr = addr; tokio::spawn(async move { @@ -288,8 +337,8 @@ where } /// Opens a new socket bound to an ephemeral port -pub async fn create_socket() -> UdpSocket { - crate::utils::net::socket_with_reuse(0).unwrap() +pub async fn create_socket() -> DualStackLocalSocket { + DualStackLocalSocket::new(0).unwrap() } pub fn config_with_dummy_endpoint() -> Config { @@ -343,12 +392,12 @@ mod tests { use tokio::time::timeout; - use crate::test_utils::TestHelper; + use crate::test_utils::{AddressType, TestHelper}; #[tokio::test] async fn test_echo_server() { let mut t = TestHelper::default(); - let echo_addr = t.run_echo_server().await; + let echo_addr = t.run_echo_server(&AddressType::Random).await; let endpoint = t.open_socket_and_recv_single_packet().await; let msg = "hello"; endpoint diff --git a/src/utils/net.rs b/src/utils/net.rs index d35c82b2f9..0789b2a71e 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -14,16 +14,19 @@ * limitations under the License. */ -use std::{io, net::SocketAddr}; +use std::{ + io, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, +}; use socket2::{Protocol, Socket, Type}; -use tokio::{net::UdpSocket, select}; +use tokio::{net::ToSocketAddrs, net::UdpSocket}; use crate::Result; -/// returns a UdpSocket with address and port reuse. -pub fn socket_with_reuse(port: u16) -> Result { - socket_with_reuse_and_address((std::net::Ipv4Addr::UNSPECIFIED, port).into()) +/// returns a UdpSocket with address and port reuse, on Ipv6Addr::UNSPECIFIED +fn socket_with_reuse(port: u16) -> Result { + socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into()) } fn socket_with_reuse_and_address(addr: SocketAddr) -> Result { @@ -35,6 +38,10 @@ fn socket_with_reuse_and_address(addr: SocketAddr) -> Result { let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; enable_reuse(&sock)?; sock.set_nonblocking(true)?; + if domain == socket2::Domain::IPV6 { + // be explicit so we can have dual stack sockets. + sock.set_only_v6(false)?; + } sock.bind(&addr.into())?; UdpSocket::from_std(sock.into()).map_err(|error| eyre::eyre!(error)) } @@ -51,127 +58,142 @@ fn enable_reuse(sock: &Socket) -> io::Result<()> { Ok(()) } -/// Socket that can accept and send data from either a local ipv4 address or ipv6 address. +/// An ipv6 socket that can accept and send data from either a local ipv4 address or ipv6 address +/// with port reuse enabled and only_v6 set to false. pub struct DualStackLocalSocket { - v4: UdpSocket, - v6: UdpSocket, + socket: UdpSocket, } impl DualStackLocalSocket { pub fn new(port: u16) -> Result { - // if ephemeral port, make sure they are on the same ports. - if port == 0 { - let v4 = socket_with_reuse_and_address((std::net::Ipv4Addr::UNSPECIFIED, port).into())?; - let port = v4.local_addr()?.port(); - - return Ok(Self { - v4, - v6: socket_with_reuse_and_address((std::net::Ipv6Addr::UNSPECIFIED, port).into())?, - }); - } + Ok(Self { + socket: socket_with_reuse(port)?, + }) + } + /// Primarily used for testing of ipv4 vs ipv6 addresses. + pub(crate) fn new_with_address(addr: SocketAddr) -> Result { Ok(Self { - v4: socket_with_reuse_and_address((std::net::Ipv4Addr::UNSPECIFIED, port).into())?, - v6: socket_with_reuse_and_address((std::net::Ipv6Addr::UNSPECIFIED, port).into())?, + socket: socket_with_reuse_and_address(addr)?, }) } - // Receives datagrams from either an ipv4 address or ipv6. Match on the returned [`SocketAddr`] to - // determine if the received data is in the ipv4_buf or ipv6_buf on a successful result. - pub async fn recv_from( - &self, - v4_buf: &mut [u8], - v6_buf: &mut [u8], - ) -> io::Result<(usize, SocketAddr)> { - select! { - v4 = self.v4.recv_from(v4_buf) => { - v4 - } - v6 = self.v6.recv_from(v6_buf) => { - v6 - } - } + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.socket.recv_from(buf).await } pub fn local_ipv4_addr(&self) -> io::Result { - self.v4.local_addr() + let addr = self.socket.local_addr()?; + match addr { + SocketAddr::V4(_) => Ok(addr), + SocketAddr::V6(_) => Ok((Ipv4Addr::UNSPECIFIED, addr.port()).into()), + } } - pub fn local_ip6_addr(&self) -> io::Result { - self.v6.local_addr() + pub fn local_ipv6_addr(&self) -> io::Result { + let addr = self.socket.local_addr()?; + match addr { + SocketAddr::V4(v4addr) => Ok(SocketAddr::new( + IpAddr::V6(v4addr.ip().to_ipv6_mapped()), + addr.port(), + )), + SocketAddr::V6(_) => Ok(addr), + } } - pub async fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result { - match target { - SocketAddr::V4(_) => self.v4.send_to(buf, target).await, - SocketAddr::V6(_) => self.v6.send_to(buf, target).await, - } + pub async fn send_to(&self, buf: &[u8], target: A) -> io::Result { + self.socket.send_to(buf, target).await } } #[cfg(test)] mod tests { - use crate::test_utils::{available_addr, TestHelper}; - use crate::utils::net::DualStackLocalSocket; - use std::net::SocketAddr; - use std::str::from_utf8; - use std::sync::Arc; - use std::time::Duration; - use tokio::sync::oneshot; + use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + time::Duration, + }; + use tokio::time::timeout; + use crate::endpoint::address::AddressKind; + use crate::test_utils::{available_addr, AddressType, TestHelper}; + #[tokio::test] - async fn socket_with_reuse() { - let expected = available_addr().await; - let socket = super::socket_with_reuse(expected.port()).unwrap(); - let addr = socket.local_addr().unwrap(); + async fn dual_stack_socket_reusable() { + let expected = available_addr(&AddressType::Random).await; + let socket = super::DualStackLocalSocket::new(expected.port()).unwrap(); + let addr = socket.local_ipv4_addr().unwrap(); + + match expected { + SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr().unwrap()), + SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr().unwrap()), + } - assert_eq!(expected, socket.local_addr().unwrap()); + assert_eq!(expected.port(), socket.local_ipv4_addr().unwrap().port()); + assert_eq!(expected.port(), socket.local_ipv6_addr().unwrap().port()); // should be able to do it a second time, since we are reusing the address. - let socket = super::socket_with_reuse(expected.port()).unwrap(); - let addr2 = socket.local_addr().unwrap(); - assert_eq!(addr, addr2); + let socket = super::DualStackLocalSocket::new(expected.port()).unwrap(); + + match expected { + SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr().unwrap()), + SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr().unwrap()), + } + assert_eq!(addr.port(), socket.local_ipv4_addr().unwrap().port()); + assert_eq!(addr.port(), socket.local_ipv6_addr().unwrap().port()); } #[tokio::test] - async fn dual_domain_socket() { + async fn dual_stack_socket() { + // Since the TestHelper uses the DualStackSocket, we can use it to test ourselves. let mut t = TestHelper::default(); - let expected = available_addr().await; - let socket = Arc::new(DualStackLocalSocket::new(expected.port()).unwrap()); + let echo_addr = t.run_echo_server(&AddressType::Random).await; + let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; - // TODO: when DualStackSocket is used everywhere, add a test for Ipv6 as well. - let echo_addr = t.run_echo_server().await; + let msg = "hello"; + let addr = echo_addr.to_socket_addr().await.unwrap(); - let (packet_tx, packet_rx) = oneshot::channel::(); - let socket_recv = socket.clone(); - tokio::spawn(async move { - let mut v4_buf = vec![0; 1024]; - let mut v6_buf = vec![0; 1024]; - let (size, addr) = socket_recv - .recv_from(&mut v4_buf, &mut v6_buf) + socket.send_to(msg.as_bytes(), &addr).await.unwrap(); + assert_eq!( + msg, + timeout(Duration::from_secs(5), rx.recv()) .await - .unwrap(); - - let contents = match addr { - SocketAddr::V4(_) => &v4_buf[..size], - SocketAddr::V6(_) => &v6_buf[..size], - }; + .expect("should not timeout") + .unwrap() + ); - packet_tx - .send(from_utf8(contents).unwrap().to_string()) - .unwrap(); - }); + // try again, but from the opposite type of IP Address + // Proof that a dual stack ipv6 socket can send to both ipv6 and ipv4. + let ipv4_echo_addr = (Ipv4Addr::UNSPECIFIED, echo_addr.port).into(); + let opp_addr: SocketAddr = match echo_addr.host { + AddressKind::Ip(ip) => match ip { + IpAddr::V4(_) => (Ipv6Addr::UNSPECIFIED, echo_addr.port).into(), + IpAddr::V6(_) => ipv4_echo_addr, + }, + // we're not testing this, since DNS resolves to IP. + AddressKind::Name(_) => unreachable!(), + }; + + socket.send_to(msg.as_bytes(), &opp_addr).await.unwrap(); + assert_eq!( + msg, + timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("should not timeout") + .unwrap() + ); - let msg = "hello"; + // Since all other sockets are actual ipv6 sockets, let's force a test with a real ipv4 socket sending to our dual + // stack socket. + let (mut rx, socket) = t.open_ipv4_socket_and_recv_multiple_packets().await; socket - .send_to(msg.as_bytes(), &echo_addr.to_socket_addr().await.unwrap()) + .send_to(msg.as_bytes(), &ipv4_echo_addr) .await .unwrap(); assert_eq!( msg, - timeout(Duration::from_secs(5), packet_rx) + timeout(Duration::from_secs(5), rx.recv()) .await .expect("should not timeout") .unwrap() diff --git a/src/xds.rs b/src/xds.rs index 542cd2c63e..a39b023a52 100644 --- a/src/xds.rs +++ b/src/xds.rs @@ -149,21 +149,27 @@ mod tests { use std::sync::Arc; + use crate::test_utils::AddressType; use crate::{config::Config, endpoint::Endpoint, filters::*}; #[tokio::test] async fn token_routing() { let mut helper = crate::test_utils::TestHelper::default(); - let token = uuid::Uuid::new_v4().into_bytes(); + let token = "mytoken"; let address = { - let mut addr = Endpoint::new(helper.run_echo_server().await); + let mut addr = Endpoint::new(helper.run_echo_server(&AddressType::Random).await); addr.metadata.known.tokens.insert(token.into()); addr }; let localities = crate::endpoint::LocalityEndpoints::from(address.clone()); - let xds_port = crate::test_utils::available_addr().await.port(); - let xds_config: Arc = serde_json::from_value(serde_json::json!({ + tracing::debug!(?address); + tracing::debug!(?localities); + + let xds_port = crate::test_utils::available_addr(&AddressType::Random) + .await + .port(); + let json = serde_json::json!({ "version": "v1alpha1", "id": "test-proxy", "clusters": { @@ -171,11 +177,10 @@ mod tests { "localities": [localities] } }, - })) - .map(Arc::new) - .unwrap(); + }); + let xds_config: Arc = serde_json::from_value(json).map(Arc::new).unwrap(); - let client_addr = crate::test_utils::available_addr().await; + let client_addr = crate::test_utils::available_addr(&AddressType::Random).await; let client_config = serde_json::from_value(serde_json::json!({ "version": "v1alpha1", "id": "test-proxy", @@ -191,7 +196,7 @@ mod tests { tokio::spawn(server::spawn(xds_port, xds_config.clone())); let client_proxy = crate::cli::Proxy { port: client_addr.port(), - management_server: vec![format!("http://0.0.0.0:{}", xds_port).parse().unwrap()], + management_server: vec![format!("http://[::1]:{}", xds_port).parse().unwrap()], ..<_>::default() }; @@ -258,32 +263,27 @@ mod tests { .unwrap(), )); - let client = tokio::net::UdpSocket::bind((std::net::Ipv4Addr::UNSPECIFIED, 0)) - .await - .unwrap(); - - let data = "Hello World!".as_bytes(); + let fixture = "Hello World!"; + let data = fixture.as_bytes(); let mut packet = data.to_vec(); - packet.extend(token); - packet.push(1); + packet.extend(token.as_bytes()); + + let client = helper.open_socket_and_recv_single_packet().await; client + .socket .send_to( &packet, (std::net::Ipv4Addr::UNSPECIFIED, client_addr.port()), ) .await .unwrap(); - let mut buf = vec![0; 12]; - tokio::time::timeout( - std::time::Duration::from_secs(1), - client.recv_from(&mut buf), - ) - .await - .unwrap() - .unwrap(); + let response = tokio::time::timeout(std::time::Duration::from_secs(1), client.packet_rx) + .await + .unwrap() + .unwrap(); - assert_eq!(data, buf); + assert_eq!(format!("{}{}", fixture, token), response); } #[tokio::test] diff --git a/tests/capture.rs b/tests/capture.rs index 46d4e5494f..b58e664565 100644 --- a/tests/capture.rs +++ b/tests/capture.rs @@ -23,7 +23,7 @@ use quilkin::{ endpoint::Endpoint, filters::{Capture, StaticFilter, TokenRouter}, metadata::MetadataView, - test_utils::TestHelper, + test_utils::{AddressType, TestHelper}, }; /// This test covers both token_router and capture filters, @@ -31,7 +31,7 @@ use quilkin::{ #[tokio::test] async fn token_router() { let mut t = TestHelper::default(); - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; let server_port = 12348; let server_proxy = quilkin::cli::Proxy { port: server_port, diff --git a/tests/compress.rs b/tests/compress.rs index d4f0e71004..021759ab20 100644 --- a/tests/compress.rs +++ b/tests/compress.rs @@ -16,21 +16,20 @@ use tokio::time::{timeout, Duration}; -use quilkin::test_utils::available_addr; use quilkin::{ config::Filter, endpoint::Endpoint, filters::{Compress, StaticFilter}, - test_utils::TestHelper, + test_utils::{available_addr, AddressType, TestHelper}, }; #[tokio::test] async fn client_and_server() { let mut t = TestHelper::default(); - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; // create server configuration as - let server_addr = available_addr().await; + let server_addr = available_addr(&AddressType::Random).await; let yaml = " on_read: DECOMPRESS on_write: COMPRESS @@ -56,7 +55,7 @@ on_write: COMPRESS t.run_server(server_config, server_proxy, None); // create a local client - let client_addr = available_addr().await; + let client_addr = available_addr(&AddressType::Random).await; let yaml = " on_read: COMPRESS on_write: DECOMPRESS diff --git a/tests/concatenate_bytes.rs b/tests/concatenate_bytes.rs index dfa5424774..85b6843c5d 100644 --- a/tests/concatenate_bytes.rs +++ b/tests/concatenate_bytes.rs @@ -22,7 +22,7 @@ use quilkin::{ config::Filter, endpoint::Endpoint, filters::{ConcatenateBytes, StaticFilter}, - test_utils::TestHelper, + test_utils::{AddressType, TestHelper}, }; #[tokio::test] @@ -32,7 +32,7 @@ async fn concatenate_bytes() { on_read: APPEND bytes: YWJj #abc "; - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; let server_port = 12346; let server_proxy = quilkin::cli::Proxy { diff --git a/tests/filter_order.rs b/tests/filter_order.rs index 3d4ccbfb78..86f9a01c94 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -22,7 +22,7 @@ use quilkin::{ config::Filter, endpoint::Endpoint, filters::{Compress, ConcatenateBytes, StaticFilter}, - test_utils::TestHelper, + test_utils::{AddressType, TestHelper}, }; #[tokio::test] @@ -45,7 +45,7 @@ on_write: DECOMPRESS "; let echo = t - .run_echo_server_with_tap(move |_, bytes, _| { + .run_echo_server_with_tap(&AddressType::Random, move |_, bytes, _| { assert!( from_utf8(bytes).is_err(), "Should be compressed, and therefore unable to be turned into a string" diff --git a/tests/filters.rs b/tests/filters.rs index 3a5fcfc45b..f0aa787b7b 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -24,7 +24,7 @@ use quilkin::{ config::Filter, endpoint::Endpoint, filters::{Debug, StaticFilter}, - test_utils::{load_test_filters, TestHelper}, + test_utils::{load_test_filters, AddressType, TestHelper}, }; #[tokio::test] @@ -33,7 +33,7 @@ async fn test_filter() { load_test_filters(); // create an echo server as an endpoint. - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; // create server configuration let server_port = 12346; @@ -120,7 +120,7 @@ async fn debug_filter() { let factory = Debug::factory(); // create an echo server as an endpoint. - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; // create server configuration let server_port = 12247; diff --git a/tests/firewall.rs b/tests/firewall.rs index 756e73adc9..208a68541b 100644 --- a/tests/firewall.rs +++ b/tests/firewall.rs @@ -14,10 +14,10 @@ * limitations under the License. */ -use std::net::SocketAddr; +use std::net::{Ipv6Addr, SocketAddr}; use tokio::{ - sync::oneshot, + sync::mpsc, time::{timeout, Duration}, }; @@ -25,29 +25,33 @@ use quilkin::{ config::Filter, endpoint::Endpoint, filters::{Firewall, StaticFilter}, - test_utils::TestHelper, + test_utils::{available_addr, AddressType, TestHelper}, }; #[tokio::test] -async fn firewall_allow() { +async fn ipv4_firewall_allow() { let mut t = TestHelper::default(); + let address_type = AddressType::Ipv4; + let port = available_addr(&address_type).await.port(); let yaml = " on_read: - action: ALLOW - source: 127.0.0.1/32 + sources: + - 127.0.0.1/32 ports: - - %1 + - %1 on_write: - action: ALLOW - source: 127.0.0.0/24 + sources: + - 127.0.0.0/24 ports: - - %2 + - %2 "; - let recv = test(&mut t, 12354, yaml).await; + let mut rx = test(&mut t, port, yaml, &address_type).await; assert_eq!( "hello", - timeout(Duration::from_secs(5), recv) + timeout(Duration::from_secs(5), rx.recv()) .await .expect("should have received a packet") .unwrap() @@ -55,52 +59,155 @@ on_write: } #[tokio::test] -async fn firewall_read_deny() { +async fn ipv6_firewall_allow() { + let mut t = TestHelper::default(); + let address_type = AddressType::Ipv6; + let port = available_addr(&address_type).await.port(); + let yaml = " +on_read: + - action: ALLOW + sources: + - ::1/128 + ports: + - %1 +on_write: + - action: ALLOW + sources: + - ::1/64 + ports: + - %2 +"; + let mut rx = test(&mut t, port, yaml, &address_type).await; + + assert_eq!( + "hello", + timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); +} + +#[tokio::test] +async fn ipv4_firewall_read_deny() { + let mut t = TestHelper::default(); + let address_type = AddressType::Ipv4; + let port = available_addr(&address_type).await.port(); + let yaml = " +on_read: + - action: DENY + sources: + - 127.0.0.1/32 + ports: + - %1 +on_write: + - action: ALLOW + sources: + - 127.0.0.0/24 + ports: + - %2 +"; + let mut rx = test(&mut t, port, yaml, &address_type).await; + + let result = timeout(Duration::from_secs(3), rx.recv()).await; + assert!(result.is_err(), "should not have received a packet"); +} + +#[tokio::test] +async fn ipv6_firewall_read_deny() { let mut t = TestHelper::default(); + let address_type = AddressType::Ipv6; + let port = available_addr(&address_type).await.port(); let yaml = " on_read: - action: DENY - source: 127.0.0.1/32 + sources: + - ::1/128 ports: - %1 on_write: - action: ALLOW - source: 127.0.0.0/24 + sources: + - ::1/64 ports: - - %2 + - %2 "; - let recv = test(&mut t, 12355, yaml).await; + let mut rx = test(&mut t, port, yaml, &address_type).await; - let result = timeout(Duration::from_secs(3), recv).await; + let result = timeout(Duration::from_secs(3), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); } #[tokio::test] -async fn firewall_write_deny() { +async fn ipv4_firewall_write_deny() { let mut t = TestHelper::default(); + let address_type = AddressType::Ipv4; + let port = available_addr(&address_type).await.port(); let yaml = " on_read: - action: ALLOW - source: 127.0.0.1/32 + sources: + - 127.0.0.1/32 ports: - - %1 + - %1 on_write: - action: DENY - source: 127.0.0.0/24 + sources: + - 127.0.0.0/24 ports: - - %2 + - %2 "; - let recv = test(&mut t, 12356, yaml).await; + let mut rx = test(&mut t, port, yaml, &address_type).await; - let result = timeout(Duration::from_secs(3), recv).await; + let result = timeout(Duration::from_secs(3), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); } -async fn test(t: &mut TestHelper, server_port: u16, yaml: &str) -> oneshot::Receiver { - let echo = t.run_echo_server().await; +#[tokio::test] +async fn ipv6_firewall_write_deny() { + let mut t = TestHelper::default(); + let address_type = AddressType::Ipv6; + let port = available_addr(&address_type).await.port(); + let yaml = " +on_read: + - action: ALLOW + sources: + - ::1/128 + ports: + - %1 +on_write: + - action: DENY + sources: + - ::1/64 + ports: + - %2 +"; + let mut rx = test(&mut t, port, yaml, &address_type).await; + + let result = timeout(Duration::from_secs(3), rx.recv()).await; + assert!(result.is_err(), "should not have received a packet"); +} + +async fn test( + t: &mut TestHelper, + server_port: u16, + yaml: &str, + address_type: &AddressType, +) -> mpsc::Receiver { + let echo = t.run_echo_server(address_type).await; + + let (rx, socket) = match address_type { + AddressType::Ipv4 => t.open_ipv4_socket_and_recv_multiple_packets().await, + AddressType::Ipv6 => t.open_socket_and_recv_multiple_packets().await, + AddressType::Random => unreachable!(), + }; + + let client_addr = match address_type { + AddressType::Ipv4 => socket.local_ipv4_addr().unwrap(), + AddressType::Ipv6 => socket.local_ipv6_addr().unwrap(), + AddressType::Random => unreachable!(), + }; - let recv = t.open_socket_and_recv_single_packet().await; - let client_addr = recv.socket.local_addr().unwrap(); let yaml = yaml .replace("%1", client_addr.port().to_string().as_str()) .replace("%2", echo.port().to_string().as_str()); @@ -127,9 +234,12 @@ async fn test(t: &mut TestHelper, server_port: u16, yaml: &str) -> oneshot::Rece t.run_server(server_config, server_proxy, None); - let local_addr: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, server_port).into(); + let local_addr: SocketAddr = match address_type { + AddressType::Ipv4 => (std::net::Ipv4Addr::LOCALHOST, server_port).into(), + AddressType::Ipv6 => (Ipv6Addr::LOCALHOST, server_port).into(), + AddressType::Random => unreachable!(), // don't do this. + }; tracing::info!(source = %client_addr, address = %local_addr, "Sending hello"); - recv.socket.send_to(b"hello", &local_addr).await.unwrap(); - - recv.packet_rx + socket.send_to(b"hello", &local_addr).await.unwrap(); + rx } diff --git a/tests/load_balancer.rs b/tests/load_balancer.rs index b23ac24ee7..5a73a843b7 100644 --- a/tests/load_balancer.rs +++ b/tests/load_balancer.rs @@ -23,7 +23,7 @@ use quilkin::{ config::Filter, endpoint::Endpoint, filters::{LoadBalancer, StaticFilter}, - test_utils::TestHelper, + test_utils::{AddressType, TestHelper}, }; #[tokio::test] @@ -39,7 +39,7 @@ policy: ROUND_ROBIN for _ in 0..2 { let selected_endpoint = selected_endpoint.clone(); echo_addresses.insert( - t.run_echo_server_with_tap(move |_, _, echo_addr| { + t.run_echo_server_with_tap(&AddressType::Random, move |_, _, echo_addr| { let _ = selected_endpoint.lock().unwrap().replace(echo_addr); }) .await, diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs index a935e02f4d..cfb42682f3 100644 --- a/tests/local_rate_limit.rs +++ b/tests/local_rate_limit.rs @@ -22,7 +22,7 @@ use quilkin::{ config::Filter, endpoint::Endpoint, filters::{LocalRateLimit, StaticFilter}, - test_utils::{available_addr, TestHelper}, + test_utils::{available_addr, AddressType, TestHelper}, }; #[tokio::test] @@ -33,9 +33,9 @@ async fn local_rate_limit_filter() { max_packets: 2 period: 1 "; - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; - let server_addr = available_addr().await; + let server_addr = available_addr(&AddressType::Random).await; let server_proxy = quilkin::cli::Proxy { port: server_addr.port(), ..<_>::default() diff --git a/tests/match.rs b/tests/match.rs index 584c4f3cc6..edb121dc28 100644 --- a/tests/match.rs +++ b/tests/match.rs @@ -22,13 +22,13 @@ use quilkin::{ config::Filter, endpoint::Endpoint, filters::{Capture, Match, StaticFilter}, - test_utils::TestHelper, + test_utils::{AddressType, TestHelper}, }; #[tokio::test] async fn r#match() { let mut t = TestHelper::default(); - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; let capture_yaml = " suffix: diff --git a/tests/metrics.rs b/tests/metrics.rs index 1ada884baf..83d8d54f6e 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -16,18 +16,23 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use quilkin::{endpoint::Endpoint, test_utils::TestHelper}; +use quilkin::{ + endpoint::Endpoint, + test_utils::{AddressType, TestHelper}, +}; #[tokio::test] async fn metrics_server() { let mut t = TestHelper::default(); // create an echo server as an endpoint. - let echo = t.run_echo_server().await; - let metrics_port = quilkin::test_utils::available_addr().await.port(); + let echo = t.run_echo_server(&AddressType::Random).await; + let metrics_port = quilkin::test_utils::available_addr(&AddressType::Random) + .await + .port(); // create server configuration - let server_addr = quilkin::test_utils::available_addr().await; + let server_addr = quilkin::test_utils::available_addr(&AddressType::Random).await; let server_proxy = quilkin::cli::Proxy { port: server_addr.port(), ..<_>::default() diff --git a/tests/no_filter.rs b/tests/no_filter.rs index 74a10f1dcd..120b0acfb7 100644 --- a/tests/no_filter.rs +++ b/tests/no_filter.rs @@ -17,19 +17,21 @@ use tokio::time::timeout; use tokio::time::Duration; -use quilkin::test_utils::available_addr; -use quilkin::{endpoint::Endpoint, test_utils::TestHelper}; +use quilkin::{ + endpoint::Endpoint, + test_utils::{available_addr, AddressType, TestHelper}, +}; #[tokio::test] async fn echo() { let mut t = TestHelper::default(); // create two echo servers as endpoints - let server1 = t.run_echo_server().await; - let server2 = t.run_echo_server().await; + let server1 = t.run_echo_server(&AddressType::Random).await; + let server2 = t.run_echo_server(&AddressType::Random).await; // create server configuration - let local_addr = available_addr().await; + let local_addr = available_addr(&AddressType::Random).await; let server_proxy = quilkin::cli::Proxy { port: local_addr.port(), ..<_>::default() diff --git a/tests/qcmp.rs b/tests/qcmp.rs index 81b0c2977b..ef37aa3b29 100644 --- a/tests/qcmp.rs +++ b/tests/qcmp.rs @@ -18,12 +18,17 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::time::Duration; -use quilkin::{protocol::Protocol, test_utils::TestHelper}; +use quilkin::{ + protocol::Protocol, + test_utils::{AddressType, TestHelper}, +}; #[tokio::test] async fn proxy_ping() { let mut t = TestHelper::default(); - let server_port = quilkin::test_utils::available_addr().await.port(); + let server_port = quilkin::test_utils::available_addr(&AddressType::Random) + .await + .port(); let server_proxy = quilkin::cli::Proxy { qcmp_port: server_port, to: vec![(Ipv4Addr::UNSPECIFIED, 0).into()], @@ -36,7 +41,9 @@ async fn proxy_ping() { #[tokio::test] async fn agent_ping() { - let qcmp_port = quilkin::test_utils::available_addr().await.port(); + let qcmp_port = quilkin::test_utils::available_addr(&AddressType::Random) + .await + .port(); let agent = quilkin::cli::Agent { qcmp_port, ..<_>::default() diff --git a/tests/token_router.rs b/tests/token_router.rs index d4907dd0e1..259c22ee8d 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -23,7 +23,7 @@ use quilkin::{ endpoint::Endpoint, filters::{Capture, StaticFilter, TokenRouter}, metadata::MetadataView, - test_utils::TestHelper, + test_utils::{AddressType, TestHelper}, }; /// This test covers both token_router and capture filters, @@ -31,7 +31,7 @@ use quilkin::{ #[tokio::test] async fn token_router() { let mut t = TestHelper::default(); - let echo = t.run_echo_server().await; + let echo = t.run_echo_server(&AddressType::Random).await; let capture_yaml = " suffix: