From 0ab07be681517af9c66200eea50a1abc2542087f Mon Sep 17 00:00:00 2001 From: James Thomas Date: Thu, 18 Jan 2024 06:33:24 -0500 Subject: [PATCH] remove empty placeholder sendmsg buffer --- include/udx.h | 9 ++++--- src/udx.c | 69 ++++++++++++++++++++++++++++++++++----------------- 2 files changed, 52 insertions(+), 26 deletions(-) diff --git a/include/udx.h b/include/udx.h index 02998de7..be89660f 100644 --- a/include/udx.h +++ b/include/udx.h @@ -285,12 +285,15 @@ struct udx_packet_s { // just alloc it in place here, easier to manage char header[UDX_HEADER_SIZE]; unsigned short nbufs; - // buf_t[] starts here + + // inefficient - only relevant for stream_t packets + unsigned short nwrites; + udx_stream_write_t **writes; }; struct udx_socket_send_s { udx_packet_t pkt; - uv_buf_t bufs[1]; // buf_t[] must be after packet + uv_buf_t bufs[1]; // buf_t[] must be after packet_t udx_socket_t *socket; udx_socket_send_cb on_send; @@ -315,7 +318,7 @@ struct udx_stream_write_s { struct udx_stream_send_s { udx_packet_t pkt; - uv_buf_t bufs[3]; // buf_t[] must be after packet + uv_buf_t bufs[3]; // buf_t[] must be after packet_t udx_stream_t *stream; udx_stream_send_cb on_send; diff --git a/src/udx.c b/src/udx.c index 87abced2..5f776ed3 100644 --- a/src/udx.c +++ b/src/udx.c @@ -454,12 +454,14 @@ clear_outgoing_packets (udx_stream_t *stream) { assert(pkt->nbufs >= 2); + int diff = pkt->nbufs - pkt->nwrites; + assert(diff == 1 || diff == 2); // either header buf, or header + padding buff + uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); - udx_stream_write_t **writes = (udx_stream_write_t **) (bufs + pkt->nbufs); - for (int i = 2; i < pkt->nbufs; i++) { - udx_stream_write_t *w = writes[i - 2]; - size_t pkt_len = bufs[i].len; + for (int i = 0; i < pkt->nwrites; i++) { + size_t pkt_len = bufs[i + diff].len; + udx_stream_write_t *w = pkt->writes[i]; on_bytes_acked(stream, w, pkt_len, true); if (w->bytes_acked == w->buf.len && w->on_ack != NULL) { @@ -505,7 +507,7 @@ clear_outgoing_packets (udx_stream_t *stream) { } static void -init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_buf_t *userbufs, int nbufs_minus_two) { +init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_buf_t *userbufs, int nuserbufs) { uint8_t *b = (uint8_t *) &(pkt->header); // 8 bit magic byte + 8 bit version + 8 bit type + 8 bit extensions @@ -535,14 +537,17 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_ pkt->ctx = stream; pkt->is_mtu_probe = false; - pkt->nbufs = 2 + nbufs_minus_two; uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); + pkt->nbufs = 1 + nuserbufs; bufs[0] = uv_buf_init((char *) &(pkt->header), UDX_HEADER_SIZE); - bufs[1] = uv_buf_init("", 0); - for (int i = 0; i < nbufs_minus_two; i++) { - bufs[i + 2] = userbufs[i]; + // for now, set when stream writes data + pkt->writes = NULL; + pkt->nwrites = 0; + + for (int i = 0; i < nuserbufs; i++) { + bufs[i + 1] = userbufs[i]; pkt->size += userbufs[i].len; } } @@ -552,9 +557,7 @@ static int mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) { assert(wanted_size > pkt->size); - uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); - - if (pkt->nbufs < 2 || pkt->header[3] != 0 || bufs[1].len > 0) { + if (pkt->nbufs < 2 || pkt->header[3] != 0) { return 0; } int header_size = (pkt->dest.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE) - 20; @@ -564,8 +567,16 @@ mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) { } debug_printf("mtu: probeify seq=%d size=%u wanted=%d padding=%d\n", pkt->seq, pkt->size + header_size, wanted_size, padding_size); static char probe_data[256] = {0}; + + uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); + for (int i = pkt->nbufs; i > 1; i--) { + bufs[i] = bufs[i - 1]; + } + pkt->nbufs++; + bufs[1].len = padding_size; bufs[1].base = probe_data; + pkt->header[3] = padding_size; pkt->is_mtu_probe = true; return 1; @@ -576,11 +587,18 @@ static void mtu_unprobeify_packet (udx_packet_t *pkt, udx_stream_t *stream) { assert(pkt->is_mtu_probe); + pkt->header[3] = 0; + uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); - pkt->header[3] = 0; - bufs[1].base = ""; - bufs[1].len = 0; + // [header][padding][2][3] 4 = nbufs + + for (int i = 2; i < pkt->nbufs; i++) { + bufs[i - 1] = bufs[i]; + } + + pkt->nbufs--; + pkt->is_mtu_probe = false; debug_printf("mtu: probe %d/%d", stream->mtu_probe_count, UDX_MTU_MAX_PROBES); @@ -782,16 +800,16 @@ udx__shift_packet (udx_socket_t *socket) { debug_printf("combined write, nwrites=%d\n", nwrites); } - /* buf[0] = header, buf[1] = padding */ + int nbufs = 2 + nwrites; // extra for 1.header 2.padding - udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + sizeof(uv_buf_t) * (2 + nwrites) + sizeof(void *) * nwrites); + udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + sizeof(uv_buf_t) * nbufs + sizeof(void *) * nwrites); init_stream_packet(pkt, header_flag, stream, bufs, nwrites); - - udx_stream_write_t **pwrite = (udx_stream_write_t **) (((uv_buf_t *) (pkt + 1)) + pkt->nbufs); + pkt->writes = (udx_stream_write_t **) (((uv_buf_t *) (pkt + 1)) + nbufs); + pkt->nwrites = nwrites; for (int i = 0; i < nwrites; i++) { - pwrite[i] = writes[i]; + pkt->writes[i] = writes[i]; } pkt->ctx = stream; @@ -1189,11 +1207,16 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) { stream->rack_next_seq = next; } } + + int diff = pkt->nbufs - pkt->nwrites; + uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); - for (int i = 2; i < pkt->nbufs; i++) { - udx_stream_write_t *w = ((udx_stream_write_t **) (bufs + pkt->nbufs))[i - 2]; - size_t pkt_len = bufs[i].len; + for (int i = 0; i < pkt->nwrites; i++) { + + size_t pkt_len = bufs[i + diff].len; + udx_stream_write_t *w = pkt->writes[i]; + on_bytes_acked(stream, w, pkt_len, false); if (w->bytes_acked == w->buf.len && w->on_ack) {