Skip to content

Commit

Permalink
Use io_uring to batch handle clients pending writes to reduce SYSCALL…
Browse files Browse the repository at this point in the history
… count.

Signed-off-by: Lipeng Zhu <[email protected]>
  • Loading branch information
lipzhu committed May 30, 2024
1 parent 6bab2d7 commit 3abd93c
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 35 deletions.
13 changes: 12 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,17 @@ ifeq ($(MALLOC),jemalloc)
FINAL_LIBS := ../deps/jemalloc/lib/libjemalloc.a $(FINAL_LIBS)
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" > foo.c; \
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DHAVE_LIBURING
FINAL_LIBS+= -luring
endif
endif

# LIBSSL & LIBCRYPTO
LIBSSL_LIBS=
LIBSSL_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libssl && echo $$?)
Expand Down Expand Up @@ -396,7 +407,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3054,6 +3054,7 @@ standardConfig static_configs[] = {
createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled),
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("io_uring", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_uring_enabled, 1, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
155 changes: 155 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#include "io_uring.h"

#ifdef HAVE_LIBURING
#include <liburing.h>

/* io_uring instance queue depth */
#define IO_URING_DEPTH 256

static size_t io_uring_queue_len = 0;

void initIOUring(void) {
if (server.io_uring_enabled) {
struct io_uring_params params;
struct io_uring *ring = zmalloc(sizeof(struct io_uring));
memset(&params, 0, sizeof(params));
/* On success, io_uring_queue_init_params(3) returns 0 and ring will
* point to the shared memory containing the io_uring queues.
* On failure -errno is returned. */
int ret = io_uring_queue_init_params(IO_URING_DEPTH, ring, &params);
if (ret != 0) {
/* Warning if user enable the io_uring in config but system doesn't support yet. */
serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring.");
zfree(ring);
server.io_uring = NULL;
} else {
serverLog(LL_NOTICE, "System support io_uring, enable io_uring.");
server.io_uring = ring;
}
}
}

int useIOUring(client *c) {
if (server.io_uring_enabled && server.io_uring) {
/* Currently, we only use io_uring to handle the static buffer write requests. */
return getClientType(c) != CLIENT_TYPE_SLAVE && listLength(c->reply) == 0 && c->bufpos > 0;
}
return 0;
}

int writeToClientUsingIOUring(client *c) {
c->flags |= CLIENT_PENDING_IO_URING_WRITE;
struct io_uring_sqe *sqe = io_uring_get_sqe(server.io_uring);
if (sqe == NULL) return C_ERR;
io_uring_prep_send(sqe, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen, MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, c);
io_uring_queue_len++;
return C_OK;
}

/* Submit requests to the submission queue and wait for completion. */
static inline void ioUringSubmitAndWait(void) {
/* wait for all submitted queue entries complete. */
io_uring_submit(server.io_uring);
while (io_uring_queue_len) {
struct io_uring_cqe *cqe;
if (io_uring_wait_cqe(server.io_uring, &cqe) == 0) {
client *c = io_uring_cqe_get_data(cqe);
c->nwritten = cqe->res;
io_uring_cqe_seen(server.io_uring, cqe);
io_uring_queue_len--;
}
}
}

/* Check the compeleted io_uring event and update the state. */
int checkPendingIOUringWriteState(client *c) {
/* Note that where synchronous system calls will return -1 on
* failure and set errno to the actual error value,
* io_uring never uses errno. Instead it returns the negated
* errno directly in the CQE res field. */
if (c->nwritten <= 0) {
if (c->nwritten != -EAGAIN) {
c->conn->last_errno = -(c->nwritten);
/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks. */
if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED) c->conn->state = CONN_STATE_ERROR;
}
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
}
return C_ERR;
}

c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
atomic_fetch_add_explicit(&server.stat_net_output_bytes, c->nwritten, memory_order_relaxed);
/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;

return C_OK;
}

void submitAndWaitIOUringComplete() {
if (server.io_uring_enabled && server.io_uring && listLength(server.clients_pending_write) > 0) {
ioUringSubmitAndWait();
listIter li;
listNode *ln;
/* An optimization for connWrite: batch submit the write(3). */
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_IO_URING_WRITE;
listUnlinkNode(server.clients_pending_write, ln);

if (checkPendingIOUringWriteState(c) == C_ERR) continue;
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClientAsync(c);
continue;
}
}
/* Update client's memory usage after writing.
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
* handleClientsWithPendingWritesUsingThreads(). */
if (io_threads_op == IO_THREADS_OP_IDLE) updateClientMemUsageAndBucket(c);
}
}
}

void freeIOUring(void) {
if (server.io_uring_enabled && server.io_uring) {
io_uring_queue_exit(server.io_uring);
zfree(server.io_uring);
server.io_uring = NULL;
}
}
#else
void initIOUring(void) {
}

int useIOUring(client *c) {
return 0;
}

int writeToClientUsingIOUring(client *c) {
return 0;
}

void submitAndWaitIOUringComplete(void) {
}

void freeIOUring(void) {
}
#endif /* IO_URING_H */
21 changes: 21 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#ifndef IO_URING_H
#define IO_URING_H

#include "server.h"

/* Initialize io_uring at server startup if have io_uring configured, setup io_uring submission and completion. */
void initIOUring(void);

/* If the client is suitable to use io_uring handle the write request. */
int useIOUring(client *c);

/* Use io_uring to handle the client request, it is always used together with useIOUring(). */
int writeToClientUsingIOUring(client *c);

/* Submit requests to the submission queue and wait for completion. */
void submitAndWaitIOUringComplete(void);

/* Free io_uring. */
void freeIOUring(void);

#endif /* IO_URING_H */
78 changes: 44 additions & 34 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "script.h"
#include "fpconv_dtoa.h"
#include "fmtargs.h"
#include "io_uring.h"
#include <sys/socket.h>
#include <sys/uio.h>
#include <math.h>
Expand Down Expand Up @@ -1811,10 +1812,8 @@ client *lookupClientByID(uint64_t id) {

/* This function should be called from _writeToClient when the reply list is not empty,
* it gathers the scattered buffers from reply list and sends them away with connWritev.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
* and 'nwritten' is an output parameter, it means how many bytes server write
* to client. */
static int _writevToClient(client *c, ssize_t *nwritten) {
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned. */
static int _writevToClient(client *c) {
int iovcnt = 0;
int iovmax = min(IOV_MAX, c->conn->iovcnt);
struct iovec iov[iovmax];
Expand Down Expand Up @@ -1848,12 +1847,12 @@ static int _writevToClient(client *c, ssize_t *nwritten) {
offset = 0;
}
if (iovcnt == 0) return C_OK;
*nwritten = connWritev(c->conn, iov, iovcnt);
if (*nwritten <= 0) return C_ERR;
c->nwritten = connWritev(c->conn, iov, iovcnt);
if (c->nwritten <= 0) return C_ERR;

/* Locate the new node which has leftover data and
* release all nodes in front of it. */
ssize_t remaining = *nwritten;
ssize_t remaining = c->nwritten;
if (c->bufpos > 0) { /* deal with static reply buffer first. */
int buf_len = c->bufpos - c->sentlen;
c->sentlen += remaining;
Expand Down Expand Up @@ -1884,21 +1883,18 @@ static int _writevToClient(client *c, ssize_t *nwritten) {

/* This function does actual writing output buffers to different types of
* clients, it is called by writeToClient.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
* and 'nwritten' is an output parameter, it means how many bytes server write
* to client. */
int _writeToClient(client *c, ssize_t *nwritten) {
*nwritten = 0;
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned. */
int _writeToClient(client *c) {
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);

replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);
/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
*nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos);
if (*nwritten <= 0) return C_ERR;
c->ref_block_pos += *nwritten;
c->nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos);
if (c->nwritten <= 0) return C_ERR;
c->ref_block_pos += c->nwritten;
}

/* If we fully sent the object on head, go to the next one. */
Expand All @@ -1916,16 +1912,16 @@ int _writeToClient(client *c, ssize_t *nwritten) {
/* When the reply list is not empty, it's better to use writev to save us some
* system calls and TCP packets. */
if (listLength(c->reply) > 0) {
int ret = _writevToClient(c, nwritten);
int ret = _writevToClient(c);
if (ret != C_OK) return ret;

/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0) serverAssert(c->reply_bytes == 0);
} else if (c->bufpos > 0) {
*nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (*nwritten <= 0) return C_ERR;
c->sentlen += *nwritten;
c->nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (c->nwritten <= 0) return C_ERR;
c->sentlen += c->nwritten;

/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
Expand All @@ -1950,12 +1946,12 @@ int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
atomic_fetch_add_explicit(&server.stat_total_writes_processed, 1, memory_order_relaxed);

ssize_t nwritten = 0, totwritten = 0;

ssize_t totwritten = 0;
c->nwritten = 0;
while (clientHasPendingReplies(c)) {
int ret = _writeToClient(c, &nwritten);
int ret = _writeToClient(c);
if (ret == C_ERR) break;
totwritten += nwritten;
totwritten += c->nwritten;
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
Expand All @@ -1980,7 +1976,7 @@ int writeToClient(client *c, int handler_installed) {
}
c->net_output_bytes += totwritten;

if (nwritten == -1) {
if (c->nwritten == -1) {
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
Expand Down Expand Up @@ -2037,24 +2033,38 @@ int handleClientsWithPendingWrites(void) {
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listUnlinkNode(server.clients_pending_write, ln);

/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;
if (c->flags & CLIENT_PROTECTED) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;

/* Try to write buffers to the client socket. */
if (writeToClient(c, 0) == C_ERR) continue;
if (c->flags & CLIENT_CLOSE_ASAP) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
/* We can use io_uring to write to client to reduce the count of syscall. */
if (useIOUring(c)) {
if (writeToClientUsingIOUring(c) == C_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
} else {
listUnlinkNode(server.clients_pending_write, ln);
/* Try to write buffers to the client socket. */
if (writeToClient(c, 0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
}
}
}
submitAndWaitIOUringComplete();
return processed;
}

Expand Down
3 changes: 3 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "syscheck.h"
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_uring.h"

#include <time.h>
#include <signal.h>
Expand Down Expand Up @@ -2813,6 +2814,7 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initThreadedIO();
initIOUring();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -6984,6 +6986,7 @@ int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
freeIOUring();
return 0;
}

Expand Down
Loading

0 comments on commit 3abd93c

Please sign in to comment.