From e29bd5e261d6cf808fda2c2dedb900fe198a4b87 Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Sat, 13 Jul 2024 02:45:08 +0000 Subject: [PATCH] Rebase based on unstable branch --------- Signed-off-by: Lipeng Zhu Co-authored-by: Wangyang Guo --- src/Makefile | 13 +++- src/config.c | 1 + src/io_uring.c | 140 +++++++++++++++++++++++++++++++++++ src/io_uring.h | 21 ++++++ src/server.c | 3 + src/server.h | 6 +- tests/unit/introspection.tcl | 1 + valkey.conf | 8 +- 8 files changed, 190 insertions(+), 3 deletions(-) create mode 100644 src/io_uring.c create mode 100644 src/io_uring.h diff --git a/src/Makefile b/src/Makefile index 4e8c34b253..354d569419 100644 --- a/src/Makefile +++ b/src/Makefile @@ -318,6 +318,17 @@ else LIBCRYPTO_LIBS=-lcrypto endif +# only Linux has IO_URING support +ifeq ($(uname_S),Linux) +HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include " > foo.c; \ + $(CC) -E foo.c > /dev/null 2>&1 && echo yes; \ + rm foo.c') +ifeq ($(HAS_LIBURING),yes) + FINAL_CFLAGS+= -DHAVE_LIBURING + FINAL_LIBS+= -luring +endif +endif + BUILD_NO:=0 BUILD_YES:=1 BUILD_MODULE:=2 @@ -401,7 +412,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/config.c b/src/config.c index adbfdd43de..b006b8f1a5 100644 --- a/src/config.c +++ b/src/config.c @@ -3073,6 +3073,7 @@ standardConfig static_configs[] = { createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL), createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat), createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), + createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/io_uring.c b/src/io_uring.c new file mode 100644 index 0000000000..047a7649f3 --- /dev/null +++ b/src/io_uring.c @@ -0,0 +1,140 @@ +#include "io_uring.h" + +#include + +/* io_uring instance queue depth. */ +#define IO_URING_DEPTH 256 + +static size_t io_uring_queue_len = 0; + +void initIOUring(void) { + if (server.io_uring_enabled) { + struct io_uring_params params; + struct io_uring *ring = zmalloc(sizeof(struct io_uring)); + memset(¶ms, 0, sizeof(params)); + /* On success, io_uring_queue_init_params(3) returns 0 and ring will + * point to the shared memory containing the io_uring queues. + * On failure -errno is returned. */ + int ret = io_uring_queue_init_params(IO_URING_DEPTH, ring, ¶ms); + if (ret != 0) { + /* Warning if user enable the io_uring in config but system doesn't support yet. */ + serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring."); + zfree(ring); + server.io_uring = NULL; + } else { + serverLog(LL_NOTICE, "System support io_uring, enable io_uring."); + server.io_uring = ring; + } + } +} + +int canWriteUsingIOUring(client *c) { + if (server.io_uring_enabled && server.io_uring) { + /* Currently, we only use io_uring to handle the static buffer write requests. */ + return getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 && c->bufpos > 0; + } + return 0; +} + +int writeToClientUsingIOUring(client *c) { + c->flag.pending_io_uring_write = 1; + struct io_uring_sqe *sqe = io_uring_get_sqe(server.io_uring); + if (sqe == NULL) return C_ERR; + io_uring_prep_send(sqe, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen, MSG_DONTWAIT); + io_uring_sqe_set_data(sqe, c); + io_uring_queue_len++; + return C_OK; +} + +/* Submit requests to the submission queue and wait for completion. */ +static inline void ioUringSubmitAndWaitBarrier(void) { + io_uring_submit(server.io_uring); + /* Wait for all submitted queue entries complete. */ + while (io_uring_queue_len) { + struct io_uring_cqe *cqe; + if (io_uring_wait_cqe(server.io_uring, &cqe) == 0) { + client *c = io_uring_cqe_get_data(cqe); + c->nwritten = cqe->res; + io_uring_cqe_seen(server.io_uring, cqe); + io_uring_queue_len--; + } else { + serverPanic("Error waiting io_uring completion queue."); + } + } +} + +/* Check the completed io_uring event and update the state. */ +int checkPendingIOUringWriteState(client *c) { + /* Note that where synchronous system calls will return -1 on + * failure and set errno to the actual error value, + * io_uring never uses errno. Instead it returns the negated + * errno directly in the CQE res field. */ + if (c->nwritten <= 0) { + if (c->nwritten != -EAGAIN) { + c->conn->last_errno = -(c->nwritten); + /* Don't overwrite the state of a connection that is not already + * connected, not to mess with handler callbacks. */ + if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED) c->conn->state = CONN_STATE_ERROR; + } + if (connGetState(c->conn) != CONN_STATE_CONNECTED) { + serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn)); + freeClientAsync(c); + } + return C_ERR; + } + + c->sentlen += c->nwritten; + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if ((int)c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + atomic_fetch_add_explicit(&server.stat_net_output_bytes, c->nwritten, memory_order_relaxed); + c->net_output_bytes += c->nwritten; + + /* For clients representing masters we don't count sending data + * as an interaction, since we always send REPLCONF ACK commands + * that take some time to just fill the socket output buffer. + * We just rely on data / pings received for timeout detection. */ + if (!c->flag.primary) c->last_interaction = server.unixtime; + + return C_OK; +} + +void submitAndWaitIOUringComplete() { + if (server.io_uring_enabled && server.io_uring && listLength(server.clients_pending_write) > 0) { + ioUringSubmitAndWaitBarrier(); + listIter li; + listNode *ln; + /* An optimization for connWrite: batch submit the write(3). */ + listRewind(server.clients_pending_write, &li); + while ((ln = listNext(&li))) { + client *c = listNodeValue(ln); + c->flag.pending_io_uring_write = 0; + listUnlinkNode(server.clients_pending_write, ln); + + if (checkPendingIOUringWriteState(c) == C_ERR) continue; + if (!clientHasPendingReplies(c)) { + c->sentlen = 0; + /* Close connection after entire reply has been sent. */ + if (c->flag.close_after_reply) { + freeClientAsync(c); + continue; + } + } + /* Update client's memory usage after writing. + * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in + * handleClientsWithPendingWritesUsingThreads(). */ + updateClientMemUsageAndBucket(c); + } + } +} + +void freeIOUring(void) { + if (server.io_uring_enabled && server.io_uring) { + io_uring_queue_exit(server.io_uring); + zfree(server.io_uring); + server.io_uring = NULL; + } +} diff --git a/src/io_uring.h b/src/io_uring.h new file mode 100644 index 0000000000..d0b152be77 --- /dev/null +++ b/src/io_uring.h @@ -0,0 +1,21 @@ +#ifndef IO_URING_H +#define IO_URING_H + +#include "server.h" + +/* Initialize io_uring at server startup if have io_uring configured, setup io_uring submission and completion. */ +void initIOUring(void); + +/* If the client is suitable to use io_uring handle the write request. */ +int canWriteUsingIOUring(client *c); + +/* Use io_uring to handle the client request, it is always used together with canWriteUsingIOUring(). */ +int writeToClientUsingIOUring(client *c); + +/* Submit requests to the submission queue and wait for completion. */ +void submitAndWaitIOUringComplete(void); + +/* Free io_uring. */ +void freeIOUring(void); + +#endif /* IO_URING_H */ diff --git a/src/server.c b/src/server.c index 465aa29391..3e1e5f4b02 100644 --- a/src/server.c +++ b/src/server.c @@ -40,6 +40,7 @@ #include "threads_mngr.h" #include "fmtargs.h" #include "io_threads.h" +#include "io_uring.h" #include #include @@ -2810,6 +2811,7 @@ void initListeners(void) { void InitServerLast(void) { bioInit(); initIOThreads(); + initIOUring(); set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } @@ -6984,6 +6986,7 @@ int main(int argc, char **argv) { aeMain(server.el); aeDeleteEventLoop(server.el); + freeIOUring(); return 0; } diff --git a/src/server.h b/src/server.h index 36a4b641e7..fef38e112e 100644 --- a/src/server.h +++ b/src/server.h @@ -1180,7 +1180,8 @@ typedef struct ClientFlags { uint64_t reprocessing_command : 1; /* The client is re-processing the command. */ uint64_t replication_done : 1; /* Indicate that replication has been done on the client */ uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */ - uint64_t reserved : 9; /* Reserved for future use */ + uint64_t pending_io_uring_write : 1; /* Client has output to send using io_uring. */ + uint64_t reserved : 8; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -2123,6 +2124,9 @@ struct valkeyServer { sds availability_zone; /* When run in a cloud environment we can configure the availability zone it is running in */ /* Local environment */ char *locale_collate; + /* io_uring */ + int io_uring_enabled; /* Enable io_uring */ + struct io_uring *io_uring; }; #define MAX_KEYS_BUFFER 256 diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index a12a3ba23d..90e5ceacb0 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -558,6 +558,7 @@ start_server {tags {"introspection"}} { socket-mark-id req-res-logfile client-default-resp + io-uring-enabled } if {!$::tls} { diff --git a/valkey.conf b/valkey.conf index 8badf1487a..ec21cd933e 100644 --- a/valkey.conf +++ b/valkey.conf @@ -2313,4 +2313,10 @@ jemalloc-bg-thread yes # this is only exposed via the info command for clients to use, but in the future we # we may also use this when making decisions for replication. # -# availability-zone "zone-name" \ No newline at end of file +# availability-zone "zone-name" + +# If Valkey is compiled with io_uring support and liburing is installed in the +# system, then io_uring can be enabled with this config. The io_uring kernel +# interface was adopted in Linux kernel version 5.1. +# +# io-uring-enabled no