Skip to content

Commit

Permalink
fixes bug on relay slow path, and adds FORCE_RELAY_SLOW_PATH flag to …
Browse files Browse the repository at this point in the history
…enable testing (#234)
  • Loading branch information
jthomas43 authored Dec 17, 2024
1 parent 27b6cc7 commit c0b01eb
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 18 deletions.
4 changes: 4 additions & 0 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ extern "C" {
#define UDX_STREAM_WRITE_WANT_DESTROY 0b0100
#define UDX_STREAM_WRITE_WANT_ZWP 0b1000

#define UDX_DEBUG_FORCE_RELAY_SLOW_PATH 0x01

typedef struct {
uint32_t seq;
} udx_cirbuf_val_t;
Expand Down Expand Up @@ -107,6 +109,8 @@ typedef void (*udx_interface_event_close_cb)(udx_interface_event_t *handle);
struct udx_s {
uv_loop_t *loop;

uint32_t debug_flags;

int refs;
bool teardown;
bool has_streams;
Expand Down
32 changes: 14 additions & 18 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,8 @@ process_data_packet (udx_stream_t *stream, int type, uint32_t seq, char *data, s
}

static int
relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint8_t data_offset, uint32_t seq, uint32_t ack, uint32_t rwnd) {
relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32_t seq) {

stream->seq = seq_max(stream->seq, seq);

udx_stream_t *relay = stream->relay_to;
Expand All @@ -1136,9 +1137,15 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint8_

uv_buf_t b = uv_buf_init(buf, buf_len);

int err = udx__sendmsg(relay->socket, &b, 1, (struct sockaddr *) &relay->remote_addr, relay->remote_addr_len);
int err;

if (stream->udx->debug_flags & UDX_DEBUG_FORCE_RELAY_SLOW_PATH) {
err = UV_EAGAIN;
} else {
err = udx__sendmsg(relay->socket, &b, 1, (struct sockaddr *) &relay->remote_addr, relay->remote_addr_len);
}

if (err == EAGAIN) {
if (err == UV_EAGAIN) {
// create a socket_send_t with no callback to send this packet on the relay's send_queue

udx_socket_send_t *req = malloc(sizeof(udx_socket_send_t) + b.len);
Expand All @@ -1151,20 +1158,7 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint8_
udx_packet_t *pkt = &req->pkt;

memcpy(&pkt->dest, &relay->remote_addr, relay->remote_addr_len);

uint8_t *p = (uint8_t *) data;

// 8 bit magic byte + 8 bit version + 8 bit type + 8 bit extensions
*(p++) = UDX_MAGIC_BYTE;
*(p++) = UDX_VERSION;
*(p++) = (uint8_t) type;
*(p++) = data_offset; // data offset

uint32_t *i = (uint32_t *) p;
*(i++) = udx__swap_uint32_if_be(stream->remote_id);
*(i++) = udx__swap_uint32_if_be(rwnd);
*(i++) = udx__swap_uint32_if_be(seq);
*(i++) = udx__swap_uint32_if_be(ack);
pkt->dest_len = relay->remote_addr_len;

pkt->nbufs = 1;
pkt->size = b.len;
Expand Down Expand Up @@ -1244,7 +1238,7 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
if (stream->on_firewall(stream, socket, addr)) return 1;
}

if (stream->relay_to) return relay_packet(stream, buf, buf_len, type, data_offset, seq, ack, rwnd);
if (stream->relay_to) return relay_packet(stream, buf, buf_len, type, seq);

buf += UDX_HEADER_SIZE;
buf_len -= UDX_HEADER_SIZE;
Expand Down Expand Up @@ -1962,6 +1956,8 @@ udx_init (uv_loop_t *loop, udx_t *udx, udx_idle_cb on_idle) {
udx->packets_dropped_by_kernel = -1;
udx->loop = loop;

udx->debug_flags = 0;

return 0;
}

Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ list(APPEND tests
stream-preconnect
stream-preconnect-same-socket
stream-relay
stream-relay-force-slow-path
stream-send-recv
stream-send-recv-ipv6
stream-write-read
Expand Down
153 changes: 153 additions & 0 deletions test/stream-relay-force-slow-path.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

#define NBYTES_TO_SEND 1000000

uv_loop_t loop;
udx_t udx;

struct sockaddr_in aaddr;
udx_socket_t asock;
udx_stream_t astream;

struct sockaddr_in baddr;
udx_socket_t bsock;
udx_stream_t bstream;

struct sockaddr_in caddr;
udx_socket_t csock;
udx_stream_t cstream;

struct sockaddr_in daddr;
udx_socket_t dsock;
udx_stream_t dstream;

bool ack_called = false;
bool read_called = false;

size_t write_hash = HASH_INIT;
size_t read_hash = HASH_INIT;

size_t nbytes_read;

void
on_ack (udx_stream_write_t *req, int status, int unordered) {
assert(status == 0);
// assert(unordered == 0);

uv_stop(&loop);

ack_called = true;
}

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
assert(buf->len == read_len);

if (nbytes_read == 0) {
printf("read_len=%ld\n", read_len);
assert(memcmp(buf->base, "hello", 5) == 0);
}
read_hash = hash(read_hash, (uint8_t *) buf->base, read_len);

nbytes_read += read_len;
read_called = true;
}

int
main () {

udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1));
int e;

uv_loop_init(&loop);

e = udx_init(&loop, &udx, NULL);
assert(e == 0);

udx.debug_flags |= UDX_DEBUG_FORCE_RELAY_SLOW_PATH;

e = udx_socket_init(&udx, &asock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &bsock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &csock, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &dsock, NULL);
assert(e == 0);

uv_ip4_addr("127.0.0.1", 8081, &aaddr);
e = udx_socket_bind(&asock, (struct sockaddr *) &aaddr, 0);
assert(e == 0);

uv_ip4_addr("127.0.0.1", 8082, &baddr);
e = udx_socket_bind(&bsock, (struct sockaddr *) &baddr, 0);
assert(e == 0);

uv_ip4_addr("127.0.0.1", 8083, &caddr);
e = udx_socket_bind(&csock, (struct sockaddr *) &caddr, 0);
assert(e == 0);

uv_ip4_addr("127.0.0.1", 8084, &daddr);
e = udx_socket_bind(&dsock, (struct sockaddr *) &daddr, 0);
assert(e == 0);

e = udx_stream_init(&udx, &astream, 1, NULL, NULL);
assert(e == 0);

e = udx_stream_init(&udx, &bstream, 2, NULL, NULL);
assert(e == 0);

e = udx_stream_init(&udx, &cstream, 3, NULL, NULL);
assert(e == 0);

e = udx_stream_init(&udx, &dstream, 4, NULL, NULL);
assert(e == 0);

e = udx_stream_relay_to(&cstream, &bstream);
assert(e == 0);

e = udx_stream_relay_to(&bstream, &cstream);
assert(e == 0);

e = udx_stream_read_start(&astream, on_read);
assert(e == 0);

e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr);
assert(e == 0);

e = udx_stream_connect(&bstream, &bsock, 1, (struct sockaddr *) &aaddr);
assert(e == 0);

e = udx_stream_connect(&cstream, &csock, 4, (struct sockaddr *) &daddr);
assert(e == 0);

e = udx_stream_connect(&dstream, &dsock, 3, (struct sockaddr *) &caddr);
assert(e == 0);

uv_buf_t buf = uv_buf_init(calloc(NBYTES_TO_SEND, 1), NBYTES_TO_SEND);

memcpy(buf.base, "hello", 5);

write_hash = hash(write_hash, (uint8_t *) buf.base, buf.len);

udx_stream_write(req, &dstream, &buf, 1, on_ack);

uv_run(&loop, UV_RUN_DEFAULT);

assert(ack_called && read_called);

assert(nbytes_read == NBYTES_TO_SEND && read_hash == write_hash);

free(req);

return 0;
}

0 comments on commit c0b01eb

Please sign in to comment.