Skip to content

Commit

Permalink
fix integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Oct 13, 2023
1 parent 40bcbf8 commit e38356e
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 43 deletions.
3 changes: 2 additions & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl DownstreamReceiveWorkerConfig {
tokio::select! {
result = socket.recv_from(&mut buf) => {
match result {
Ok((size, source)) => {
Ok((size, mut source)) => {
crate::utils::net::to_canonical(&mut source);
let packet = DownstreamPacket {
received_at: chrono::Utc::now().timestamp_nanos_opt().unwrap(),
asn_info: crate::maxmind_db::MaxmindDb::lookup(source.ip()),
Expand Down
20 changes: 2 additions & 18 deletions src/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ impl SessionPool {
},
Ok((size, mut recv_addr)) => {
let received_at = chrono::Utc::now().timestamp_nanos_opt().unwrap();
crate::utils::net::to_canonical(&mut recv_addr);
tracing::trace!(%recv_addr, %size, "received packet");
let (downstream_addr, asn_info): (SocketAddr, Option<IpNetEntry>) = {
let storage = pool.storage.read();
to_canonical(&mut recv_addr);
let Some(downstream_addr) = storage.destination_to_sources.get(&(recv_addr, port)) else {
tracing::warn!(address=%recv_addr, "received traffic from a server that has no downstream");
continue;
Expand Down Expand Up @@ -315,11 +315,10 @@ impl SessionPool {
/// Sends packet data to the appropiate session based on its `key`.
pub async fn send(
self: &Arc<Self>,
mut key: SessionKey,
key: SessionKey,
asn_info: Option<IpNetEntry>,
packet: &[u8],
) -> Result<usize, super::PipelineError> {
to_canonical(&mut key.source);
self.get(key, asn_info)
.await?
.send(packet)
Expand Down Expand Up @@ -484,21 +483,6 @@ impl Loggable for Error {
}
}

fn to_canonical(addr: &mut SocketAddr) {
let ip = match addr.ip() {
std::net::IpAddr::V6(ip) => {
if let Some(mapped) = ip.to_ipv4_mapped() {
std::net::IpAddr::V4(mapped)
} else {
std::net::IpAddr::V6(ip)
}
}
addr => addr,
};

addr.set_ip(ip);
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 2 additions & 1 deletion src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ impl TestHelper {
{
let socket = create_socket().await;
// sometimes give ipv6, sometimes ipv4.
let addr = get_address(address_type, &socket);
let mut addr = get_address(address_type, &socket);
crate::test_utils::map_addr_to_localhost(&mut addr);
let mut shutdown = self.get_shutdown_subscriber().await;
let local_addr = addr;
tokio::spawn(async move {
Expand Down
19 changes: 19 additions & 0 deletions src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,22 @@ mod tests {
);
}
}

/// Converts a a socket address to its canonical version.
/// This is just a copy of the method available in std but that is currently
/// nightly only.
pub fn to_canonical(addr: &mut SocketAddr) {
let ip = match addr.ip() {
std::net::IpAddr::V6(ip) => {
if let Some(mapped) = ip.to_ipv4_mapped() {
std::net::IpAddr::V4(mapped)
} else {
std::net::IpAddr::V6(ip)
}
}
addr => addr,
};

addr.set_ip(ip);
}

4 changes: 2 additions & 2 deletions tests/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn token_router() {

assert_eq!(
"helloabc",
timeout(Duration::from_secs(5), recv_chan.recv())
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -98,6 +98,6 @@ async fn token_router() {
let msg = b"helloxyz";
socket.send_to(msg, &local_addr).await.unwrap();

let result = timeout(Duration::from_secs(3), recv_chan.recv()).await;
let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}
5 changes: 3 additions & 2 deletions tests/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ async fn client_and_server() {
let echo = t.run_echo_server(&AddressType::Random).await;

// create server configuration as
let server_addr = available_addr(&AddressType::Random).await;
let mut server_addr = available_addr(&AddressType::Random).await;
quilkin::test_utils::map_addr_to_localhost(&mut server_addr);
let yaml = "
on_read: DECOMPRESS
on_write: COMPRESS
Expand Down Expand Up @@ -84,7 +85,7 @@ on_write: DECOMPRESS
let (mut rx, tx) = t.open_socket_and_recv_multiple_packets().await;

tx.send_to(b"hello", &client_addr).await.unwrap();
let expected = timeout(Duration::from_secs(5), rx.recv())
let expected = timeout(Duration::from_millis(500), rx.recv())
.await
.expect("should have received a packet")
.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions tests/filter_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ on_read: COMPRESS
on_write: DECOMPRESS
";

let echo = t
let mut echo = t
.run_echo_server_with_tap(&AddressType::Random, move |_, bytes, _| {
assert!(
from_utf8(bytes).is_err(),
Expand All @@ -53,6 +53,7 @@ on_write: DECOMPRESS
})
.await;

quilkin::test_utils::map_to_localhost(&mut echo).await;
let server_port = 12346;
let server_proxy = quilkin::cli::Proxy {
port: server_port,
Expand Down Expand Up @@ -94,7 +95,7 @@ on_write: DECOMPRESS

assert_eq!(
"helloxyzabc",
timeout(Duration::from_secs(5), recv_chan.recv())
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand Down
12 changes: 6 additions & 6 deletions tests/firewall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ on_write:

assert_eq!(
"hello",
timeout(Duration::from_secs(5), rx.recv())
timeout(Duration::from_millis(500), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand Down Expand Up @@ -81,7 +81,7 @@ on_write:

assert_eq!(
"hello",
timeout(Duration::from_secs(5), rx.recv())
timeout(Duration::from_millis(500), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand Down Expand Up @@ -109,7 +109,7 @@ on_write:
";
let mut rx = test(&mut t, port, yaml, &address_type).await;

let result = timeout(Duration::from_secs(3), rx.recv()).await;
let result = timeout(Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}

Expand All @@ -134,7 +134,7 @@ on_write:
";
let mut rx = test(&mut t, port, yaml, &address_type).await;

let result = timeout(Duration::from_secs(3), rx.recv()).await;
let result = timeout(Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}

Expand All @@ -159,7 +159,7 @@ on_write:
";
let mut rx = test(&mut t, port, yaml, &address_type).await;

let result = timeout(Duration::from_secs(3), rx.recv()).await;
let result = timeout(Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}

Expand All @@ -184,7 +184,7 @@ on_write:
";
let mut rx = test(&mut t, port, yaml, &address_type).await;

let result = timeout(Duration::from_secs(3), rx.recv()).await;
let result = timeout(Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}

Expand Down
1 change: 1 addition & 0 deletions tests/local_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ period: 1
.unwrap(),
);
t.run_server(server_config, server_proxy, None);
tokio::time::sleep(Duration::from_millis(50)).await;

let msg = "hello";
let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await;
Expand Down
8 changes: 4 additions & 4 deletions tests/match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ on_read:

assert_eq!(
"helloxyz",
timeout(Duration::from_secs(5), recv_chan.recv())
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -107,7 +107,7 @@ on_read:

assert_eq!(
"helloabc",
timeout(Duration::from_secs(5), recv_chan.recv())
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -119,7 +119,7 @@ on_read:

assert_eq!(
"hellodef",
timeout(Duration::from_secs(5), recv_chan.recv())
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -131,7 +131,7 @@ on_read:

assert_eq!(
"hellodef",
timeout(Duration::from_secs(5), recv_chan.recv())
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand Down
5 changes: 3 additions & 2 deletions tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ async fn metrics_server() {
.port();

// create server configuration
let server_addr = quilkin::test_utils::available_addr(&AddressType::Random).await;
let mut server_addr = quilkin::test_utils::available_addr(&AddressType::Random).await;
quilkin::test_utils::map_addr_to_localhost(&mut server_addr);
let server_proxy = quilkin::cli::Proxy {
port: server_addr.port(),
..<_>::default()
Expand All @@ -44,7 +45,7 @@ async fn metrics_server() {
t.run_server(
server_config,
server_proxy,
Some(Some((std::net::Ipv6Addr::UNSPECIFIED, metrics_port).into())),
Some(Some((std::net::Ipv4Addr::UNSPECIFIED, metrics_port).into())),
);

// create a local client
Expand Down
6 changes: 3 additions & 3 deletions tests/no_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ async fn echo() {
let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;

socket.send_to(b"hello", &local_addr).await.unwrap();
let value = timeout(Duration::from_secs(5), recv_chan.recv())
let value = timeout(Duration::from_millis(500), recv_chan.recv())
.await
.unwrap()
.unwrap();
assert_eq!("hello", value);
let value = timeout(Duration::from_secs(5), recv_chan.recv())
let value = timeout(Duration::from_millis(500), recv_chan.recv())
.await
.unwrap()
.unwrap();
assert_eq!("hello", value);

// should only be two returned items
assert!(timeout(Duration::from_secs(2), recv_chan.recv())
assert!(timeout(Duration::from_millis(500), recv_chan.recv())
.await
.is_err());
}
4 changes: 2 additions & 2 deletions tests/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ quilkin.dev:

assert_eq!(
"hello",
timeout(Duration::from_secs(5), recv_chan.recv())
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -100,6 +100,6 @@ quilkin.dev:
let msg = b"helloxyz";
socket.send_to(msg, &local_addr).await.unwrap();

let result = timeout(Duration::from_secs(3), recv_chan.recv()).await;
let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}

0 comments on commit e38356e

Please sign in to comment.