Skip to content

Latest commit

 

History

History
255 lines (214 loc) · 8.17 KB

2025-01-15-Redis事件循环.md

File metadata and controls

255 lines (214 loc) · 8.17 KB
layout title date categories tags series series_index comments copyrights mathjax draft
post
Redis 事件循环
2025-01-15 00:00:00 +0800
中间件
redis
深入 Redis 源码
6
true
原创
true
true

本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。

写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。

事件循环

与 Node.js 类似,Redis 是事件驱动的。

从事件处理角度来说,Redis 是单线程的,它使用了一个事件循环来处理事件。事件循环是一个无限循环,不断地从事件队列中取出事件,然后执行事件对应的回调函数。

Redis 事件主要包含两类:

  • 文件事件:Redis 通过 socket 与客户端通信,通过文件事件来处理客户端的请求;
  • 时间事件:Redis 通过时间事件来执行定时任务。

事件循环通过 aeMain 作为入口启动:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

它调用了 aeProcessEvents 函数:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    /* flags 包含了处理哪些、如何处理事件 */
    // processed 为已经处理的事件数
    // numevents 为需要处理的事件数
    int processed = 0, numevents;

    // 事件事件和文件事件都为空,则直接返回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    // 有文件事件或者时间时间
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) &&
        !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp = NULL;
        int64_t usUntilTimer;

        if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
            eventLoop->beforesleep(eventLoop);

        // beforesleep 可能会修改 flags,再次检查
        if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        } else if (flags & AE_TIME_EVENTS) {
            usUntilTimer = usUntilEarliestTimer(eventLoop);
            if (usUntilTimer >= 0) {
                tv.tv_sec = usUntilTimer / 1000000;
                tv.tv_usec = usUntilTimer % 1000000;
                tvp = &tv;
            }
        }
        
        // 多路复用处理 socket
        numevents = aeApiPoll(eventLoop, tvp);

        // 如果不需要处理文件事件,则不处理文件事件
        if (!(flags & AE_FILE_EVENTS)) {
            numevents = 0;
        }

        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0;

            // 先处理读事件,再处理写事件;如果设置了 AE_BARRIER 则反转顺序
            int invert = fe->mask & AE_BARRIER;

            // 读事件
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd];
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            // 反转顺序的情况
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    // 检查时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed;
}

文件事件

I/O 多路复用

对于 Redis 的使用场景来讲,网络带宽和内存带宽才是真正的瓶颈。如果使用 readwrite 系统调用,那么可能会导致阻塞,从而浪费 CPU 时间。因此,Redis 使用了非阻塞的 I/O 多路复用技术,如 epollselect 等,来监听多个文件描述符的可读、可写和异常事件。

epoll 是 Linux 下的一种 I/O 多路复用机制。它的使用方法如下:

  • 使用 epoll_create 创建一个 epoll 实例:

    int epoll_create(int size);

    epoll_create 的返回值是一个文件描述符,用于后续的操作。

    epoll_create1epoll_create 的一个变种,可以传入一个 flags 参数。假如 flagsEPOLL_CLOEXEC,则在 exec 时关闭文件描述符。

  • 使用 epoll_ctl 添加/删除/修改监听的文件描述符:

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 使用 epoll_wait 等待事件发生:

    int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);

Redis 支持多种 I/O 多路复用程序,其实现在 ae_epoll.cae_evport.cae_kqueue.cae_select.c 中。Redis 为它们包装了统一的接口,并会自动选择性能最好的使用:

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

我们以 ae_epoll 为例,来看其实现:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    /* ... */
    // 分配 epoll_event
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    /* ... */
    // 创建 epoll 文件描述符
    state->epfd = epoll_create(1024);
    /* ... */
}

static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    /* ... */
    // 重新分配 epoll_event
    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
    /* ... */
}

static void aeApiFree(aeEventLoop *eventLoop) {
    /* ... */
    // 关闭 epoll 文件描述符
    close(state->epfd);
    /* ... */
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    /* ... */
    // 根据文件描述符是否已经被监控,确定是 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    /* ... */
    // 添加监听的文件描述符
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    /* ... */
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    /* ... */
    // 删除监听的文件描述符
    epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    /* ... */
}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    /* ... */
    // 等待事件发生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        // 遍历返回的事件
        for (j = 0; j < numevents; j++) {
            /* ... */
        }
    }
    /* ... */
}

/* ... */

可以看到,它将开启、关闭、增加、删除、处理这五个事件进行了封装处理。

// TODO: 事件循环