diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 7e4dcb4a4..b7547f9f9 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -350,8 +350,10 @@ task_t *pni_raw_connection_task(praw_connection_t *rc) { return &rc->task; } -static long snd(int fd, const void* b, size_t s) { - return send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT); +static long snd(int fd, const void* b, size_t s, bool more) { + int flags = MSG_NOSIGNAL | MSG_DONTWAIT; + if (more) flags |= MSG_MORE; + return send(fd, b, s, flags); } static long rcv(int fd, void* b, size_t s) { diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index fe0e29fe0..eb38e81d3 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -132,7 +132,7 @@ void pni_raw_close(pn_raw_connection_t *conn); void pni_raw_read_close(pn_raw_connection_t *conn); void pni_raw_write_close(pn_raw_connection_t *conn); void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int)); -void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int)); +void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t, bool), void (*set_error)(pn_raw_connection_t *, const char *, int)); void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int)); void pni_raw_async_disconnect(pn_raw_connection_t *conn); bool pni_raw_can_read(pn_raw_connection_t *conn); diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index 0c2118e01..27a699617 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -566,7 +566,7 @@ void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, return; } -void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) { +void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t, bool), void(*set_error)(pn_raw_connection_t *, const char *, int)) { assert(conn); if (pni_raw_wdrained(conn)) return; @@ -578,7 +578,8 @@ void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const assert(conn->wbuffers[p-1].type == buff_unwritten); char *bytes = conn->wbuffers[p-1].bytes+conn->wbuffers[p-1].offset+conn->unwritten_offset; size_t s = conn->wbuffers[p-1].size-conn->unwritten_offset; - int r = send(sock, bytes, s); + bool more = conn->wbuffers[p-1].next != 0; + int r = send(sock, bytes, s, more); if (r < 0) { // Interrupted system call try again switch (errno) { diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp index 7378d7541..92cc2ad0f 100644 --- a/c/tests/raw_connection_test.cpp +++ b/c/tests/raw_connection_test.cpp @@ -87,10 +87,12 @@ namespace { ::shutdown(fd, SHUT_WR); } - long snd(int fd, const void* b, size_t s) { + long snd(int fd, const void* b, size_t s, bool more) { write_err = 0; + int flags = MSG_NOSIGNAL | MSG_DONTWAIT; + if (more) flags |= MSG_MORE; if (max_send_size && max_send_size < s) s = max_send_size; - return ::send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT); + return ::send(fd, b, s, flags); } int makepair(int fds[2]) { @@ -164,7 +166,7 @@ namespace { return s; } - long snd(int fd, const void* b, size_t s){ + long snd(int fd, const void* b, size_t s, bool /* more: unused */ ){ CHECK(fd < buffers.size()); write_err = 0; if (max_send_size && max_send_size < s) s = max_send_size;