Skip to content

Commit

Permalink
remove empty placeholder sendmsg buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
jthomas43 committed Jan 18, 2024
1 parent 4d9d443 commit 0ab07be
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 26 deletions.
9 changes: 6 additions & 3 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,15 @@ struct udx_packet_s {
// just alloc it in place here, easier to manage
char header[UDX_HEADER_SIZE];
unsigned short nbufs;
// buf_t[] starts here

// inefficient - only relevant for stream_t packets
unsigned short nwrites;
udx_stream_write_t **writes;
};

struct udx_socket_send_s {
udx_packet_t pkt;
uv_buf_t bufs[1]; // buf_t[] must be after packet
uv_buf_t bufs[1]; // buf_t[] must be after packet_t
udx_socket_t *socket;

udx_socket_send_cb on_send;
Expand All @@ -315,7 +318,7 @@ struct udx_stream_write_s {

struct udx_stream_send_s {
udx_packet_t pkt;
uv_buf_t bufs[3]; // buf_t[] must be after packet
uv_buf_t bufs[3]; // buf_t[] must be after packet_t
udx_stream_t *stream;

udx_stream_send_cb on_send;
Expand Down
69 changes: 46 additions & 23 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,14 @@ clear_outgoing_packets (udx_stream_t *stream) {

assert(pkt->nbufs >= 2);

int diff = pkt->nbufs - pkt->nwrites;
assert(diff == 1 || diff == 2); // either header buf, or header + padding buff

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
udx_stream_write_t **writes = (udx_stream_write_t **) (bufs + pkt->nbufs);

for (int i = 2; i < pkt->nbufs; i++) {
udx_stream_write_t *w = writes[i - 2];
size_t pkt_len = bufs[i].len;
for (int i = 0; i < pkt->nwrites; i++) {
size_t pkt_len = bufs[i + diff].len;
udx_stream_write_t *w = pkt->writes[i];
on_bytes_acked(stream, w, pkt_len, true);

if (w->bytes_acked == w->buf.len && w->on_ack != NULL) {
Expand Down Expand Up @@ -505,7 +507,7 @@ clear_outgoing_packets (udx_stream_t *stream) {
}

static void
init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_buf_t *userbufs, int nbufs_minus_two) {
init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_buf_t *userbufs, int nuserbufs) {
uint8_t *b = (uint8_t *) &(pkt->header);

// 8 bit magic byte + 8 bit version + 8 bit type + 8 bit extensions
Expand Down Expand Up @@ -535,14 +537,17 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_
pkt->ctx = stream;
pkt->is_mtu_probe = false;

pkt->nbufs = 2 + nbufs_minus_two;
uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);

pkt->nbufs = 1 + nuserbufs;
bufs[0] = uv_buf_init((char *) &(pkt->header), UDX_HEADER_SIZE);
bufs[1] = uv_buf_init("", 0);

for (int i = 0; i < nbufs_minus_two; i++) {
bufs[i + 2] = userbufs[i];
// for now, set when stream writes data
pkt->writes = NULL;
pkt->nwrites = 0;

for (int i = 0; i < nuserbufs; i++) {
bufs[i + 1] = userbufs[i];
pkt->size += userbufs[i].len;
}
}
Expand All @@ -552,9 +557,7 @@ static int
mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {
assert(wanted_size > pkt->size);

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);

if (pkt->nbufs < 2 || pkt->header[3] != 0 || bufs[1].len > 0) {
if (pkt->nbufs < 2 || pkt->header[3] != 0) {
return 0;
}
int header_size = (pkt->dest.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE) - 20;
Expand All @@ -564,8 +567,16 @@ mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {
}
debug_printf("mtu: probeify seq=%d size=%u wanted=%d padding=%d\n", pkt->seq, pkt->size + header_size, wanted_size, padding_size);
static char probe_data[256] = {0};

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
for (int i = pkt->nbufs; i > 1; i--) {
bufs[i] = bufs[i - 1];
}
pkt->nbufs++;

bufs[1].len = padding_size;
bufs[1].base = probe_data;

pkt->header[3] = padding_size;
pkt->is_mtu_probe = true;
return 1;
Expand All @@ -576,11 +587,18 @@ static void
mtu_unprobeify_packet (udx_packet_t *pkt, udx_stream_t *stream) {
assert(pkt->is_mtu_probe);

pkt->header[3] = 0;

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);

pkt->header[3] = 0;
bufs[1].base = "";
bufs[1].len = 0;
// [header][padding][2][3] 4 = nbufs

for (int i = 2; i < pkt->nbufs; i++) {
bufs[i - 1] = bufs[i];
}

pkt->nbufs--;

pkt->is_mtu_probe = false;

debug_printf("mtu: probe %d/%d", stream->mtu_probe_count, UDX_MTU_MAX_PROBES);
Expand Down Expand Up @@ -782,16 +800,16 @@ udx__shift_packet (udx_socket_t *socket) {
debug_printf("combined write, nwrites=%d\n", nwrites);
}

/* buf[0] = header, buf[1] = padding */
int nbufs = 2 + nwrites; // extra for 1.header 2.padding

udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + sizeof(uv_buf_t) * (2 + nwrites) + sizeof(void *) * nwrites);
udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + sizeof(uv_buf_t) * nbufs + sizeof(void *) * nwrites);

init_stream_packet(pkt, header_flag, stream, bufs, nwrites);

udx_stream_write_t **pwrite = (udx_stream_write_t **) (((uv_buf_t *) (pkt + 1)) + pkt->nbufs);
pkt->writes = (udx_stream_write_t **) (((uv_buf_t *) (pkt + 1)) + nbufs);
pkt->nwrites = nwrites;

for (int i = 0; i < nwrites; i++) {
pwrite[i] = writes[i];
pkt->writes[i] = writes[i];
}

pkt->ctx = stream;
Expand Down Expand Up @@ -1189,11 +1207,16 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {
stream->rack_next_seq = next;
}
}

int diff = pkt->nbufs - pkt->nwrites;

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);

for (int i = 2; i < pkt->nbufs; i++) {
udx_stream_write_t *w = ((udx_stream_write_t **) (bufs + pkt->nbufs))[i - 2];
size_t pkt_len = bufs[i].len;
for (int i = 0; i < pkt->nwrites; i++) {

size_t pkt_len = bufs[i + diff].len;
udx_stream_write_t *w = pkt->writes[i];

on_bytes_acked(stream, w, pkt_len, false);

if (w->bytes_acked == w->buf.len && w->on_ack) {
Expand Down

0 comments on commit 0ab07be

Please sign in to comment.