From 36932445a46539cf71b51cc4609d0173f88b7664 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 19 Apr 2023 17:08:03 -0400 Subject: [PATCH 1/6] copied perf test from sendmmsg branch --- test/CMakeLists.txt | 1 + test/stream-write-read-perf.c | 139 ++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 test/stream-write-read-perf.c diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 12f7e429..216da4d8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -14,6 +14,7 @@ list(APPEND tests stream-send-recv-ipv6 stream-write-read stream-write-read-ipv6 + stream-write-read-perf ) foreach(test IN LISTS tests) diff --git a/test/stream-write-read-perf.c b/test/stream-write-read-perf.c new file mode 100644 index 00000000..93655087 --- /dev/null +++ b/test/stream-write-read-perf.c @@ -0,0 +1,139 @@ +#include +#include +#include +#include + +#include "../include/udx.h" + +uv_loop_t loop; +udx_t udx; + +udx_socket_t asock; +udx_stream_t astream; + +udx_socket_t bsock; +udx_stream_t bstream; + +udx_stream_write_t req; + +struct { + uint64_t size_bytes; +} options; + +struct { + uint64_t bytes_read; + uint64_t last_bytes_read; + + uint64_t last_print_ms; + uint64_t time_zero_ms; + + uint64_t last_read_ms; + int finished; +} stats; + +void +on_ack (udx_stream_write_t *r, int status, int unordered) { + printf("write acked\n"); +} + +void +on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { + stats.bytes_read += read_len; + stats.last_read_ms = uv_hrtime() / 1000000; + + if (stats.bytes_read == options.size_bytes) { + udx_stream_destroy(handle); + } +} + +static void +on_b_sock_close() { + printf("sending socket closing\n"); + uv_stop(&loop); +} + +static void +on_b_stream_close() { + printf("sending stream closing\n"); + int e = udx_socket_close(&bsock, on_b_sock_close); + assert(e == 0 && "udx_socket_close (sender, 'b')"); +} + +static void +on_a_sock_close() { + printf("receiving socket closing\n"); +} + +static void +on_a_stream_close() { + printf("receiving stream closing\n"); + int e = udx_socket_close(&asock, on_a_sock_close); + assert(e == 0 && "udx_socket_close (receiver, 'a')"); +} +int +main () { + int e; + + uv_loop_init(&loop); + + e = udx_init(&loop, &udx); + assert(e == 0); + + e = udx_socket_init(&udx, &asock); + assert(e == 0); + + e = udx_socket_init(&udx, &bsock); + assert(e == 0); + + struct sockaddr_in baddr; + uv_ip4_addr("127.0.0.1", 8082, &baddr); + e = udx_socket_bind(&bsock, (struct sockaddr *) &baddr); + assert(e == 0); + + struct sockaddr_in aaddr; + uv_ip4_addr("127.0.0.1", 8081, &aaddr); + e = udx_socket_bind(&asock, (struct sockaddr *) &aaddr); + assert(e == 0); + + e = udx_stream_init(&udx, &astream, 1, on_a_stream_close); + assert(e == 0); + + e = udx_stream_init(&udx, &bstream, 2, on_b_stream_close); + assert(e == 0); + + e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr); + assert(e == 0); + + e = udx_stream_connect(&bstream, &bsock, 1, (struct sockaddr *) &aaddr); + assert(e == 0); + + e = udx_stream_read_start(&astream, on_read); + assert(e == 0); + + printf("generating data ...\n"); + + options.size_bytes = 2 * 1024 * 1024 * 1024L; + // options.size_bytes = 2 * 1024L; + + uint8_t *data = calloc(options.size_bytes, 1); + + //for (int64_t i = 0; i < options.size_bytes; i++) { + // data[i] = 32 + (i%96); + //} + + assert(data != NULL && "malloc"); + + printf("writing data\n"); + + uv_buf_t buf = uv_buf_init(data, options.size_bytes); + udx_stream_write(&req, &bstream, &buf, 1, on_ack); + + e = uv_run(&loop, UV_RUN_DEFAULT); + assert(e == 0 && "UV_RUN"); + + uv_loop_close(&loop); + + // just for valgrind + free(data); + return 0; +} From 7c6c578ba3696b9df2f5402030203c715b0a9919 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Fri, 21 Apr 2023 16:56:58 -0400 Subject: [PATCH 2/6] trailing whitespace --- test/stream-write-read-perf.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/stream-write-read-perf.c b/test/stream-write-read-perf.c index 93655087..19dac9ee 100644 --- a/test/stream-write-read-perf.c +++ b/test/stream-write-read-perf.c @@ -23,7 +23,7 @@ struct { struct { uint64_t bytes_read; uint64_t last_bytes_read; - + uint64_t last_print_ms; uint64_t time_zero_ms; @@ -46,13 +46,13 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { } } -static void +static void on_b_sock_close() { printf("sending socket closing\n"); uv_stop(&loop); } -static void +static void on_b_stream_close() { printf("sending stream closing\n"); int e = udx_socket_close(&bsock, on_b_sock_close); @@ -114,7 +114,7 @@ main () { options.size_bytes = 2 * 1024 * 1024 * 1024L; // options.size_bytes = 2 * 1024L; - + uint8_t *data = calloc(options.size_bytes, 1); //for (int64_t i = 0; i < options.size_bytes; i++) { @@ -122,7 +122,7 @@ main () { //} assert(data != NULL && "malloc"); - + printf("writing data\n"); uv_buf_t buf = uv_buf_init(data, options.size_bytes); @@ -134,6 +134,6 @@ main () { uv_loop_close(&loop); // just for valgrind - free(data); + free(data); return 0; } From df3e9a386f951a8c25e3f160e99f70d337cc0362 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 27 Sep 2023 03:45:14 -0400 Subject: [PATCH 3/6] fix some warnings, add warnings to build --- CMakeLists.txt | 8 ++++++++ src/debug.h | 2 +- src/io_posix.c | 2 +- src/udx.c | 5 +++-- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index dbbca4de..375db9c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,14 @@ set_target_properties( POSITION_INDEPENDENT_CODE ON ) +if(WIN32) + target_compile_options(udx PRIVATE /W4) +endif() + +if(UNIX) + target_compile_options(udx PRIVATE -Wall -Wextra) +endif() + target_sources( udx INTERFACE diff --git a/src/debug.h b/src/debug.h index 2f43e59a..f48efefd 100644 --- a/src/debug.h +++ b/src/debug.h @@ -30,7 +30,7 @@ debug_print_outgoing (udx_stream_t *stream) { printf("%-*s%-*s%s\n", i, "RA", j, "SF", "Seq"); - for (int s = stream->remote_acked; s < stream->seq; s++) { + for (uint32_t s = stream->remote_acked; s < stream->seq; s++) { udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, s); if (pkt == NULL) { printf("-"); diff --git a/src/io_posix.c b/src/io_posix.c index 458a6733..6c8c9946 100644 --- a/src/io_posix.c +++ b/src/io_posix.c @@ -41,7 +41,7 @@ udx__get_link_mtu (const struct sockaddr *addr) { } int mtu; - int mtu_opt_size = sizeof mtu; + socklen_t mtu_opt_size = sizeof mtu; if (addr->sa_family == AF_INET) { rc = getsockopt(s, IPPROTO_IP, IP_MTU, &mtu, &mtu_opt_size); } else { diff --git a/src/udx.c b/src/udx.c index b7a7c434..d7e69b53 100644 --- a/src/udx.c +++ b/src/udx.c @@ -596,8 +596,9 @@ send_state_packet (udx_stream_t *stream) { return update_poll(stream->socket); } -static inline int +static inline uint32_t max_payload (udx_stream_t *stream) { + assert(stream->mtu > (AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE)); return stream->mtu - (stream->remote_addr.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE); } @@ -698,7 +699,7 @@ fill_window (udx_stream_t *stream) { int header_flag = w->is_write_end ? UDX_HEADER_END : UDX_HEADER_DATA; - int mss = max_payload(stream); + uint32_t mss = max_payload(stream); uint32_t len = get_window_bytes(stream); From 4b9b26740139efbed7ebe29c4355ca92b781d32c Mon Sep 17 00:00:00 2001 From: James Thomas Date: Thu, 28 Sep 2023 05:12:03 -0400 Subject: [PATCH 4/6] cleanup warnings --- src/debug.h | 58 +++++++++++++++++++++++++++----------------------- src/internal.h | 2 ++ src/udx.c | 5 +++-- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/src/debug.h b/src/debug.h index f48efefd..be63ab73 100644 --- a/src/debug.h +++ b/src/debug.h @@ -20,42 +20,46 @@ debug_print_cwnd_stats (udx_stream_t *stream) { } #else static void -debug_print_cwnd_stats (udx_stream_t *stream) {} +debug_print_cwnd_stats (udx_stream_t *stream) { + (void) stream; // silence 'unused-parameter' warning +} #endif +#define debug_printf(...) \ + do { \ + if (DEBUG) fprintf(stderr, __VA_ARGS__); \ + } while (0) + static void debug_print_outgoing (udx_stream_t *stream) { - uint32_t i = stream->seq_flushed - stream->remote_acked; - uint32_t j = stream->seq - stream->seq_flushed; + if (DEBUG) { + uint32_t i = stream->seq_flushed - stream->remote_acked; + uint32_t j = stream->seq - stream->seq_flushed; - printf("%-*s%-*s%s\n", i, "RA", j, "SF", "Seq"); + debug_printf("%-*s%-*s%s\n", i, "RA", j, "SF", "Seq"); - for (uint32_t s = stream->remote_acked; s < stream->seq; s++) { - udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, s); - if (pkt == NULL) { - printf("-"); - continue; - } + for (uint32_t s = stream->remote_acked; s < stream->seq; s++) { + udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, s); + if (pkt == NULL) { + debug_printf("-"); + continue; + } - if (pkt->type == UDX_PACKET_INFLIGHT) { - printf("I"); - continue; - } - if (pkt->type == UDX_PACKET_SENDING) { - printf("S"); - continue; - } - if (pkt->type == UDX_PACKET_WAITING) { - printf("W"); - continue; + if (pkt->type == UDX_PACKET_INFLIGHT) { + debug_printf("I"); + continue; + } + if (pkt->type == UDX_PACKET_SENDING) { + debug_printf("S"); + continue; + } + if (pkt->type == UDX_PACKET_WAITING) { + debug_printf("W"); + continue; + } } + debug_printf("\n"); } - printf("\n"); } -#define debug_printf(...) \ - do { \ - if (DEBUG) fprintf(stderr, __VA_ARGS__); \ - } while (0) - #endif // UDX_DEBUG_H diff --git a/src/internal.h b/src/internal.h index 4de78936..355bb3c4 100644 --- a/src/internal.h +++ b/src/internal.h @@ -6,6 +6,8 @@ #define UDX_PACKET_CALLBACK (UDX_PACKET_STREAM_SEND | UDX_PACKET_STREAM_DESTROY | UDX_PACKET_SEND) #define UDX_PACKET_FREE_ON_SEND (UDX_PACKET_STREAM_STATE | UDX_PACKET_STREAM_DESTROY) +#define UDX_UNUSED(x) ((void) (x)) + static inline void addr_to_v6 (struct sockaddr_in *addr) { struct sockaddr_in6 in; diff --git a/src/udx.c b/src/udx.c index eb8249dd..a77aed70 100644 --- a/src/udx.c +++ b/src/udx.c @@ -904,8 +904,8 @@ rack_detect_loss (udx_stream_t *stream) { if (resending > mtu_probes_lost) { debug_printf("resending=%d mtu_probe_lost=%d\n", resending, mtu_probes_lost); - if (stream->recovery == 0 /* && resending > mtu_probe_lost */) { - // debug_print_outgoing(stream); + if (stream->recovery == 0) { + debug_print_outgoing(stream); // easy win is to clear packets that are in the queue - they def wont help if sent. unqueue_first_transmits(stream); @@ -1381,6 +1381,7 @@ udx__trigger_send_callback (udx_packet_t *pkt) { static void on_uv_poll (uv_poll_t *handle, int status, int events) { + UDX_UNUSED(status); udx_socket_t *socket = handle->data; ssize_t size; From 5b7e62620194099740a78d1e2d43a9a663591031 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Tue, 7 Nov 2023 14:19:01 -0500 Subject: [PATCH 5/6] re-enable mtu probing --- src/udx.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/udx.c b/src/udx.c index c808feb6..85931273 100644 --- a/src/udx.c +++ b/src/udx.c @@ -735,13 +735,11 @@ fill_window (udx_stream_t *stream) { stream->pkts_inflight++; stream->inflight += pkt->size; - /* There is an issue somewhere when probes run it seems, disabling for now if (stream->mtu_probe_wanted && mtu_probeify_packet(pkt, stream->mtu_probe_size)) { stream->mtu_probe_seq[stream->mtu_probe_count] = pkt->seq; stream->mtu_probe_count++; stream->mtu_probe_wanted = false; } - */ assert(seq_compare(stream->seq_flushed, pkt->seq) <= 0); stream->seq_flushed = pkt->seq + 1; From 18275b24258cf39a1fbcc898d0d459d17d2b5107 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 8 Nov 2023 16:48:48 -0500 Subject: [PATCH 6/6] re-enable ack probing. simplifies code some --- docs/wireshark/udx.lua | 4 ++- include/udx.h | 2 +- src/debug.h | 11 +++----- src/udx.c | 61 +++++++++++++++++------------------------- 4 files changed, 31 insertions(+), 47 deletions(-) diff --git a/docs/wireshark/udx.lua b/docs/wireshark/udx.lua index 222cc2e9..abead0fe 100644 --- a/docs/wireshark/udx.lua +++ b/docs/wireshark/udx.lua @@ -63,9 +63,10 @@ function udx.dissector(tvb, pinfo, tree) local data_offset = tvb(3,1):uint() local pos = 20 + local sacks = "" if bit.band(type, TYPE_SACK) > 0 then - local sacks = " " + sacks = " " local header_end = data_offset > 0 and 20 + data_offset or len while pos + 8 <= header_end do local from = tvb(pos, 4):le_uint() @@ -89,6 +90,7 @@ function udx.dissector(tvb, pinfo, tree) " Id=" .. id .. " Seq=" .. seq .. " Ack=" .. ack .. + sacks .. " " .. type_names pinfo.cols.info:set(info) end diff --git a/include/udx.h b/include/udx.h index e7c03918..af0e1425 100644 --- a/include/udx.h +++ b/include/udx.h @@ -212,7 +212,6 @@ struct udx_stream_s { int mtu_probe_count; int mtu_probe_size; // size of the outstanding probe int mtu_max; // min(UDX_MTU_MAX, get_link_mtu(remote_addr)) - uint32_t mtu_probe_seq[UDX_MTU_MAX_PROBES]; uint16_t mtu; uint32_t seq; @@ -272,6 +271,7 @@ struct udx_packet_s { udx_stream_t *stream; // pointer to the stream if stream packet uint8_t transmits; + bool is_mtu_probe; uint16_t size; uint64_t time_sent; diff --git a/src/debug.h b/src/debug.h index be63ab73..70bdcca1 100644 --- a/src/debug.h +++ b/src/debug.h @@ -33,11 +33,6 @@ debug_print_cwnd_stats (udx_stream_t *stream) { static void debug_print_outgoing (udx_stream_t *stream) { if (DEBUG) { - uint32_t i = stream->seq_flushed - stream->remote_acked; - uint32_t j = stream->seq - stream->seq_flushed; - - debug_printf("%-*s%-*s%s\n", i, "RA", j, "SF", "Seq"); - for (uint32_t s = stream->remote_acked; s < stream->seq; s++) { udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, s); if (pkt == NULL) { @@ -45,15 +40,15 @@ debug_print_outgoing (udx_stream_t *stream) { continue; } - if (pkt->type == UDX_PACKET_INFLIGHT) { + if (pkt->status == UDX_PACKET_INFLIGHT) { debug_printf("I"); continue; } - if (pkt->type == UDX_PACKET_SENDING) { + if (pkt->status == UDX_PACKET_SENDING) { debug_printf("S"); continue; } - if (pkt->type == UDX_PACKET_WAITING) { + if (pkt->status == UDX_PACKET_WAITING) { debug_printf("W"); continue; } diff --git a/src/udx.c b/src/udx.c index 85931273..fbe0f849 100644 --- a/src/udx.c +++ b/src/udx.c @@ -526,6 +526,7 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_ pkt->dest_len = stream->remote_addr_len; pkt->send_queue = NULL; pkt->stream = stream; + pkt->is_mtu_probe = false; pkt->bufs_len = 2; @@ -553,15 +554,28 @@ mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) { pkt->bufs[1].base = probe_data; pkt->header[3] = padding_size; pkt->bufs_len = 3; + pkt->is_mtu_probe = true; return 1; } static void -mtu_unprobeify_packet (udx_packet_t *pkt) { - assert(pkt->bufs_len == 3); +mtu_unprobeify_packet (udx_packet_t *pkt, udx_stream_t *stream) { + assert(pkt->bufs_len == 3 && pkt->is_mtu_probe); pkt->header[3] = 0; pkt->bufs[1] = pkt->bufs[2]; pkt->bufs_len = 2; + pkt->is_mtu_probe = false; + + debug_printf("mtu: probe %d/%d", stream->mtu_probe_count, UDX_MTU_MAX_PROBES); + if (stream->mtu_state == UDX_MTU_STATE_SEARCH) { + if (stream->mtu_probe_count >= UDX_MTU_MAX_PROBES) { + debug_printf(" established mtu=%d via timeout", stream->mtu); + stream->mtu_state = UDX_MTU_STATE_SEARCH_COMPLETE; + } else { + stream->mtu_probe_wanted = true; + } + } + debug_printf("\n"); } static int @@ -736,7 +750,6 @@ fill_window (udx_stream_t *stream) { stream->inflight += pkt->size; if (stream->mtu_probe_wanted && mtu_probeify_packet(pkt, stream->mtu_probe_size)) { - stream->mtu_probe_seq[stream->mtu_probe_count] = pkt->seq; stream->mtu_probe_count++; stream->mtu_probe_wanted = false; } @@ -838,16 +851,6 @@ rack_update_reo_wnd (udx_stream_t *stream) { return r < stream->srtt ? r : stream->srtt; } -static inline bool -seq_was_probe (udx_stream_t *s, uint32_t seq) { - for (int i = 0; i < s->mtu_probe_count; i++) { - if (s->mtu_probe_seq[i] == seq) { - return true; - } - } - return false; -} - static void rack_detect_loss (udx_stream_t *stream) { uint64_t timeout = 0; @@ -881,20 +884,9 @@ rack_detect_loss (udx_stream_t *stream) { stream->pkts_inflight--; stream->retransmits_waiting++; - debug_printf("rack to on seq=%d\n", seq); - - if (seq_was_probe(stream, seq)) { + if (pkt->is_mtu_probe) { + mtu_unprobeify_packet(pkt, stream); mtu_probes_lost++; - if (seq == stream->mtu_probe_seq[stream->mtu_probe_count - 1] && stream->mtu_state == UDX_MTU_STATE_SEARCH) { - mtu_unprobeify_packet(pkt); - debug_printf("mtu: rack to on last probe, seq=%d count=%d/%d\n", seq, stream->mtu_probe_count, UDX_MTU_MAX_PROBES); - if (stream->mtu_probe_count >= UDX_MTU_MAX_PROBES) { - stream->mtu_state = UDX_MTU_STATE_SEARCH_COMPLETE; - debug_printf("mtu: established via probe timeout, mtu=%d\n", stream->mtu); - } else { - stream->mtu_probe_wanted = true; - } - } } // todo: state check unnecessary? @@ -969,8 +961,8 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) { return 0; } - if (stream->mtu_state == UDX_MTU_STATE_SEARCH && stream->mtu_probe_count > 0 && seq == stream->mtu_probe_seq[stream->mtu_probe_count - 1]) { - debug_printf("mtu: probe acked seq=%d mtu=%d->%d\n", seq, stream->mtu, stream->mtu_probe_size); + if (stream->mtu_state == UDX_MTU_STATE_SEARCH && stream->mtu_probe_count > 0 && pkt->is_mtu_probe) { + debug_printf("mtu: probe acked seq=%d mtu=%d->%d sack=%d\n", seq, stream->mtu, stream->mtu_probe_size, sack); stream->mtu_probe_count = 0; stream->mtu = stream->mtu_probe_size; @@ -1913,16 +1905,11 @@ udx_stream_check_timeouts (udx_stream_t *handle) { handle->pkts_waiting++; handle->pkts_inflight--; handle->retransmits_waiting++; - } - - // todo: handle possibility of downward MTU change - // this would require re-sending in-flight packets that were too big to send. - // resizing is easier if sequence numbers are based on bytes - // handle->mtu = UDX_MTU_BASE; - // handle->mtu_state = UDX_MTU_STATE_ERROR; - // handle->mtu_probe_count = 0; - // handle->mtu_probe_size = UDX_MTU_BASE; + if (pkt->is_mtu_probe) { + mtu_unprobeify_packet(pkt, handle); + } + } debug_printf("timeout! pkt loss detected - inflight=%zu ssthresh=%u cwnd=%u acked=%u seq=%u rtt=%u\n", handle->inflight, handle->ssthresh, handle->cwnd, handle->remote_acked, handle->seq_flushed, handle->srtt); }