Skip to content

Commit

Permalink
a
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Oct 15, 2023
1 parent e38356e commit 71afddf
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
17 changes: 16 additions & 1 deletion benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ static THROUGHPUT_SERVER_INIT: Lazy<()> = Lazy::new(|| {
static FEEDBACK_LOOP: Lazy<()> = Lazy::new(|| {
std::thread::spawn(|| {
let socket = UdpSocket::bind(FEEDBACK_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();

loop {
let mut packet = [0; MESSAGE_SIZE];
Expand All @@ -74,6 +77,9 @@ fn throughput_benchmark(c: &mut Criterion) {
// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));
let socket = UdpSocket::bind(BENCH_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];

let mut group = c.benchmark_group("throughput");
Expand Down Expand Up @@ -125,6 +131,9 @@ fn write_feedback(addr: SocketAddr) -> mpsc::Sender<Vec<u8>> {
let (write_tx, write_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(addr).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];
let (_, source) = socket.recv_from(&mut packet).unwrap();
while let Ok(packet) = write_rx.recv() {
Expand All @@ -142,6 +151,9 @@ fn readwrite_benchmark(c: &mut Criterion) {
let (read_tx, read_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(READ_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];
loop {
let (length, _) = socket.recv_from(&mut packet).unwrap();
Expand All @@ -164,9 +176,12 @@ fn readwrite_benchmark(c: &mut Criterion) {
Lazy::force(&WRITE_SERVER_INIT);

// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));
std::thread::sleep(std::time::Duration::from_millis(150));

let socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();

// prime the direct write connection
socket.send_to(PACKETS[0], direct_write_addr).unwrap();
Expand Down
1 change: 0 additions & 1 deletion src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,3 @@ pub fn to_canonical(addr: &mut SocketAddr) {

addr.set_ip(ip);
}

12 changes: 8 additions & 4 deletions tests/local_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ period: 1
";
let echo = t.run_echo_server(&AddressType::Random).await;

let server_addr = available_addr(&AddressType::Random).await;
let mut server_addr = available_addr(&AddressType::Random).await;
quilkin::test_utils::map_addr_to_localhost(&mut server_addr);
let server_proxy = quilkin::cli::Proxy {
port: server_addr.port(),
..<_>::default()
Expand All @@ -53,20 +54,21 @@ period: 1
.map(std::sync::Arc::new)
.unwrap(),
);
tracing::trace!("spawning server");
t.run_server(server_config, server_proxy, None);
tokio::time::sleep(Duration::from_millis(50)).await;

let msg = "hello";
let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await;

for _ in 0..3 {
tracing::trace!(%server_addr, %msg, "sending");
socket.send_to(msg.as_bytes(), &server_addr).await.unwrap();
}

for _ in 0..2 {
assert_eq!(
msg,
timeout(Duration::from_secs(5), rx.recv())
timeout(Duration::from_millis(500), rx.recv())
.await
.unwrap()
.unwrap()
Expand All @@ -76,5 +78,7 @@ period: 1
// Allow enough time to have received any response.
tokio::time::sleep(Duration::from_millis(100)).await;
// Check that we do not get any response.
assert!(timeout(Duration::from_secs(1), rx.recv()).await.is_err());
assert!(timeout(Duration::from_millis(500), rx.recv())
.await
.is_err());
}

0 comments on commit 71afddf

Please sign in to comment.