Skip to content

Commit

Permalink
Introduce io_uring config and opt in.
Browse files Browse the repository at this point in the history
Signed-off-by: Lipeng Zhu <[email protected]>
  • Loading branch information
lipzhu committed May 21, 2024
1 parent d1b93a2 commit 144e210
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 140 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" >
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DUSE_IO_URING
FINAL_CFLAGS+= -DHAVE_LIBURING
FINAL_LIBS+= -luring
endif
endif
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3117,6 +3117,7 @@ standardConfig static_configs[] = {
createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled),
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("io_uring", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
5 changes: 5 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@
#define HAVE_EPOLL 1
#endif

/* Test for liburing API */
#ifndef __linux__
#define HAVE_LIBURING 0
#endif

/* Test for accept4() */
#if defined(__linux__) || \
defined(__FreeBSD__) || \
Expand Down
257 changes: 216 additions & 41 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#include "server.h"
#include "io_uring.h"

#ifdef USE_IO_URING
#ifdef HAVE_LIBURING
#include <liburing.h>
/* io_uring instance queue depth */
static const unsigned int IOUringDepth = 256;
static unsigned int uringQueueLen = 0;

#define IO_URING_DEPTH 256 /* io_uring instance queue depth. */

static size_t io_uring_queue_len = 0;

void initIOUring(void) {
struct io_uring_params params;
Expand All @@ -13,65 +14,239 @@ void initIOUring(void) {
/* On success, io_uring_queue_init_params(3) returns 0 and ring will
* point to the shared memory containing the io_uring queues.
* On failure -errno is returned. */
int ret = io_uring_queue_init_params(IOUringDepth, ring, &params);
int ret = io_uring_queue_init_params(IO_URING_DEPTH, ring, &params);
if (ret != 0) {
serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring.");
if (server.io_uring_enabled)
serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring.");
zfree(ring);
server.io_uring = NULL;
server.io_uring_enabled = 0;
} else {
serverLog(LL_NOTICE, "System support io_uring, enable io_uring.");
server.io_uring = ring;
server.io_uring_enabled = 1;
}
}

void ioUringPrepWrite(client *c) {
static inline int checkConnState(client *c) {
/* Note that where synchronous system calls will return -1 on failure and
* set errno to the actual error value, io_uring never uses errno.
* Instead it returns the negated errno directly in the CQE res field. */
if (c->nwritten <= 0) {
c->conn->last_errno = -(c->nwritten);
/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks. */
if (c->conn->last_errno != EINTR &&
c->conn->last_errno != EAGAIN &&
c->conn->state == CONN_STATE_CONNECTED) {
c->conn->state = CONN_STATE_ERROR;
}
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE,
"Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
}
return C_ERR;
}
return C_OK;
}

/* Submit requests to the submission queue and peek the completions. */
void ioUringSubmitAndPeek(void) {
io_uring_submit(server.io_uring);
struct io_uring_cqe *cqes[io_uring_queue_len];
int cqecnt = io_uring_peek_batch_cqe(server.io_uring, cqes, io_uring_queue_len);
for (int i = 0; i < cqecnt; i++) {
struct io_uring_cqe *cqe = cqes[i];
client *c = io_uring_cqe_get_data(cqe);
c->nwritten = cqe->res;
io_uring_cqe_seen(server.io_uring, cqe);
io_uring_queue_len--;
}
}

/* Write output buffer to client using io_uring. Return C_OK if
* write request to submission queue successfully, C_ERR if it was
* freed because of some error. */
int ioUringPrepareWrite(client *c) {
/* Update total number of writes on server */
atomicIncr(server.stat_total_writes_processed, 1);

/* Reset the metadata before submit write requests to io_uring. */
c->nwritten = 0;

struct io_uring_sqe *sqe = io_uring_get_sqe(server.io_uring);
io_uring_prep_send(sqe, c->conn->fd, c->buf + c->sentlen,
c->bufpos - c->sentlen, MSG_DONTWAIT);
if (sqe == NULL) return C_ERR;

if (clientHasPendingReplies(c)) {
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
if (o->used > c->ref_block_pos) {
c->flags |= SLAVE_CLIENT_PENDING_IO_URING_WRITE;
io_uring_prep_send(sqe, c->conn->fd, o->buf + c->ref_block_pos,
o->used - c->ref_block_pos, MSG_DONTWAIT);
}
} else if (c->bufpos > 0) { /* If the static reply buffer is not empty. */
c->flags |= CLIENT_PENDING_IO_URING_WRITE;
io_uring_prep_send(sqe, c->conn->fd, c->buf + c->sentlen,
c->bufpos - c->sentlen, MSG_DONTWAIT);
} else {
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
/* the first reply node is empty, return. */
if (o->used == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply, listFirst(c->reply));
return C_OK;
}
c->flags |= CLIENT_PENDING_IO_URING_WRITE_LIST;
io_uring_prep_send(sqe, c->conn->fd, o->buf + c->sentlen,
o->used - c->sentlen, MSG_DONTWAIT);
}
}
io_uring_queue_len++;
io_uring_sqe_set_data(sqe, c);
uringQueueLen++;
return C_OK;
}

void ioUringSubmitAndWait(void) {
/* wait for all submitted queue entries complete. */
while (uringQueueLen) {
io_uring_submit(server.io_uring);
struct io_uring_cqe *cqe;
if (io_uring_wait_cqe(server.io_uring, &cqe) == 0) {
client *c = io_uring_cqe_get_data(cqe);
c->nwritten = cqe->res;
if ((c->bufpos - c->sentlen) > c->nwritten && c->nwritten > 0) {
c->sentlen += c->nwritten;
ioUringPrepWrite(c);
void ioUringCompleteWrite(void) {
listIter li;
listNode *ln;
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);

/* Handle the slave client write buffer. */
if (c->flags & SLAVE_CLIENT_PENDING_IO_URING_WRITE) {
c->flags &= ~SLAVE_CLIENT_PENDING_IO_URING_WRITE;
if (checkConnState(c) == C_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
c->ref_block_pos += c->nwritten;
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;
c->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
atomicIncr(server.stat_net_repl_output_bytes, c->nwritten);
c->net_output_bytes += c->nwritten;
}

/* Handle the client write buffer for static reply buffer. */
if (c->flags & CLIENT_PENDING_IO_URING_WRITE) {
c->flags &= ~CLIENT_PENDING_IO_URING_WRITE;
if (checkConnState(c) == C_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
io_uring_cqe_seen(server.io_uring, cqe);
uringQueueLen--;
atomicIncr(server.stat_net_output_bytes, c->nwritten);
c->net_output_bytes += c->nwritten;
}

/* Handle the client write buffer for reply list. */
if (c->flags & CLIENT_PENDING_IO_URING_WRITE_LIST) {
c->flags &= ~CLIENT_PENDING_IO_URING_WRITE_LIST;
if (checkConnState(c) == C_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
c->sentlen += c->nwritten;
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
/* If we fully sent the object on head go to the next one */
if (c->sentlen == o->used) {
c->reply_bytes -= o->size;
listDelNode(c->reply, listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
atomicIncr(server.stat_net_output_bytes, c->nwritten);
c->net_output_bytes += c->nwritten;
}

/* If the entire client reply buffer has been sent,
* delete it from server.clients_pending_write list. */
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
listUnlinkNode(server.clients_pending_write, ln);
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClientAsync(c);
continue;
}
}
/* Update client's memory usage after writing.
* Since this isn't thread safe we do this conditionally. */
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsageAndBucket(c);
}
}

void freeIOUring(void) {
if(server.io_uring_enabled) {
io_uring_queue_exit(server.io_uring);
zfree(server.io_uring);
server.io_uring = NULL;
server.io_uring_enabled = 0;
/* This function is a variant of handleClientsWithPendingWrites, it use
* io_uring to hanlde the I/O write, try ot benifit the io_uring batching
* feature to reduce the SYSCALL count for IO operations.
* See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#batching
* for more info. */
int handleClientsWithPendingWritesUsingIoUring(void) {
listIter li;
listNode *ln;
int origin = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write, &li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
/* Try to submit write buffers to the client socket using io_uring.
* This is asycn operation. */
if (!(c->flags & (CLIENT_PENDING_IO_URING_WRITE |
CLIENT_PENDING_IO_URING_WRITE_LIST |
SLAVE_CLIENT_PENDING_IO_URING_WRITE))) {
if (ioUringPrepareWrite(c) == C_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
}
}
}
#else
void initIOUring(void) {
serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring.");
server.io_uring = NULL;
server.io_uring_enabled = 0;
/* Doesn't wait for all completions. For unfinished I/O, peek in next loop. */
ioUringSubmitAndPeek();
/* Revisit the server.clients_pending_write list, handle the I/O completions. */
ioUringCompleteWrite();
return origin - listLength(server.clients_pending_write);
}

void ioUringPrepWrite(client *c) {
UNUSED(c);
void freeIOUring(void) {
io_uring_queue_exit(server.io_uring);
zfree(server.io_uring);
server.io_uring = NULL;
}

void ioUringSubmitAndWait(void) {}
#else
void initIOUring(void) {}

void freeIOUring(void) {}

int handleClientsWithPendingWritesUsingIoUring(void) {
return 0
}
#endif
16 changes: 16 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#ifndef IO_URING_H
#define IO_URING_H

#include "server.h"

/* Initialize io_uring at server startup if have io_uring configured,
* setup io_uring submission and completion. */
void initIOUring(void);

/* Free io_uring. */
void freeIOUring(void);

/* Hanle clients pending writes using io_uring. */
int handleClientsWithPendingWritesUsingIoUring(void);

#endif /* IO_URING_H */
Loading

0 comments on commit 144e210

Please sign in to comment.