鍍金池/ 教程/ 大數(shù)據(jù)/ Redis 事件驅(qū)動詳解
Redis 數(shù)據(jù)淘汰機制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲 session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機制
Redis 集群(下)
主從復制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機制

Redis 事件驅(qū)動詳解

概述

在講述 Redis 如何提供服務(wù)之前,有必要介紹 Redis 的事件驅(qū)動模型。

我們知道,進程能夠進行網(wǎng)絡(luò)的讀寫操作,但有些時候這些讀寫操作是不可行的,譬如因為內(nèi)核的網(wǎng)絡(luò)發(fā)送緩沖區(qū)滿了導致不可寫;網(wǎng)絡(luò)收取緩存中無數(shù)據(jù)可讀,導致不可讀。那如果有一種機制,可以在一個事件(可讀或者可寫)發(fā)生的時候,才告知到進程,這樣就避免了進程在一個事件出現(xiàn)等待阻塞的情況,提高了進程的吞吐能力。 Redis 內(nèi)部有一個小型的事件驅(qū)動,它和 Libevent 網(wǎng)絡(luò)庫的事件驅(qū)動一樣,都是依托操作系統(tǒng)的 I/O 多路復用技術(shù)支撐起來的,這種 IO 驅(qū)動模型有個經(jīng)典的名字:Reactor 模型,反應(yīng)爐。

利用 I/O 多路復用技術(shù),監(jiān)聽感興趣的 I/O 事件,例如讀事件,寫事件等,同時也要維護一個以文件描述符為主鍵,數(shù)據(jù)為某個預(yù)設(shè)函數(shù)的事件表,這里其實就是一個數(shù)組或者鏈表。當事件觸發(fā)時,比如某個文件描述符可讀,系統(tǒng)會返回文件描述符值,用這個值在事件表中找到相應(yīng)的數(shù)據(jù)項(包括回調(diào)函數(shù)等),從而實現(xiàn)回調(diào)。同樣的,定時事件也是可以實現(xiàn)的,因為系統(tǒng)提供的 I/O 多路復用技術(shù)中的函數(shù)允許我們設(shè)置等待超時的時間,預(yù)設(shè)定時間內(nèi)沒有事件發(fā)生時,會返回。

上面一段話比較綜合,可能需要一些 Linux 系統(tǒng)編程和網(wǎng)絡(luò)編程的基礎(chǔ),但你會看到多數(shù) Reactor 事件驅(qū)動程序都是這么實現(xiàn)的。

事件驅(qū)動數(shù)據(jù)結(jié)構(gòu)

Redis 事件驅(qū)動內(nèi)部有四個主要的數(shù)據(jù)結(jié)構(gòu),分別是:事件循環(huán)結(jié)構(gòu)體,文件事件結(jié)構(gòu)體,時間事件結(jié)構(gòu)體和觸發(fā)事件結(jié)構(gòu)體。

// 文件事件結(jié)構(gòu)體
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
  // 回調(diào)函數(shù)指針
   aeFileProc *rfileProc;
   aeFileProc *wfileProc;
  // clientData 參數(shù)一般是指向redisClient 的指針
   void *clientData;
} aeFileEvent;
// 時間事件結(jié)構(gòu)體
/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    // 定時回調(diào)函數(shù)指針
    aeTimeProc *timeProc;
    // 定時事件清理函數(shù),當刪除定時事件的時候會被調(diào)用
    aeEventFinalizerProc *finalizerProc;
    // clientData 參數(shù)一般是指向redisClient 的指針
    void *clientData;
    // 定時事件表采用鏈表來維護
    struct aeTimeEvent *next;
} aeTimeEvent;
// 觸發(fā)事件
/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
// 事件循環(huán)結(jié)構(gòu)體
/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd; /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    // 記錄最大的定時事件id + 1
    long long timeEventNextId;
    // 用于系統(tǒng)時間的矯正
    time_t lastTime; /* Used to detect system clock skew */
    // I/O 事件表
    aeFileEvent *events; /* Registered events */
    // 被觸發(fā)的事件
    aeFiredEvent *fired; /* Fired events */
    // 定時事件表
    aeTimeEvent *timeEventHead;
    // 事件循環(huán)結(jié)束標識
    int stop;
    // 對于不同的I/O 多路復用技術(shù),有不同的數(shù)據(jù),詳見各自實現(xiàn)
    void *apidata; /* This is used for polling API specific data */
    // 新的循環(huán)前需要執(zhí)行的操作
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

上面的數(shù)據(jù)結(jié)構(gòu)能給我們很好的提示:事件循環(huán)結(jié)構(gòu)體維護 I/O 事件表,定時事件表和觸發(fā)事件表。

事件循環(huán)中心

Redis 的主函數(shù)中調(diào)用 initServer() 函數(shù)從而初始化事件循環(huán)中心(EventLoop),它的主要工作是在 aeCreateEventLoop() 中完成的。

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    // 分配空間
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // 分配文件事件結(jié)構(gòu)體空間
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    // 分配已觸發(fā)事件結(jié)構(gòu)體空間
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    // 時間事件鏈表頭
    eventLoop->timeEventHead = NULL;
    // 后續(xù)提到
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    // 進入事件循環(huán)前需要執(zhí)行的操作,此項會在redis main() 函數(shù)中設(shè)置
    eventLoop->beforesleep = NULL;
    // 在這里,aeApiCreate() 函數(shù)對于每個IO 多路復用模型的實現(xiàn)都有不同,
    // 具體參見源代碼,因為每種IO 多路復用模型的初始化都不同
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
    // 初始化事件類型掩碼為無事件狀態(tài)
for (i = 0; i < setsize; i++)
    eventLoop->events[i].mask = AE_NONE;
return eventLoop;
    err:
if (eventLoop) {
     zfree(eventLoop->events);
     zfree(eventLoop->fired);
     zfree(eventLoop);
   }
   return NULL;
}

有上面初始化工作只是完成了一個空的事件中心而已,并沒有注冊一些感興趣的事件。要想驅(qū)動事件循環(huán),還需要下面的工作。

Redis 事件驅(qū)動原理

事件注冊詳解

文件 I/O 事件注冊主要操作在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 會根據(jù)文件描述符的數(shù)值大小在事件循環(huán)結(jié)構(gòu)體的 I/O 事件表中取一個數(shù)據(jù)空間,利用系統(tǒng)提供的 I/O 多路復用技術(shù)監(jiān)聽感興趣的 I/O 事件,并設(shè)置回調(diào)函數(shù)。

http://wiki.jikexueyuan.com/project/redis/images/redis3.png" alt="" />

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
       aeFileProc *proc, void *clientData)
{
   if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
   }
   // 在I/O 事件表中選擇一個空間
   aeFileEvent *fe = &eventLoop->events[fd];
   // aeApiAddEvent() 只在此函數(shù)中調(diào)用,對于不同IO 多路復用實現(xiàn),會有所不同
  if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
        fe->mask |= mask;
   // 設(shè)置回調(diào)函數(shù)
   if (mask & AE_READABLE) fe->rfileProc = proc;
   if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
   if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
        return AE_OK;
}

對于不同版本的 I/O 多路復用,比如 epoll,select,kqueue 等,Redis 有各自的版本,但接口統(tǒng)一,譬如 aeApiAddEvent(),會有多個版本的實現(xiàn)。

http://wiki.jikexueyuan.com/project/redis/images/redis4.png" alt="" />

準備監(jiān)聽工作

initServer() 中調(diào)用了 aeCreateEventLoop() 完成了事件中心的初始化,initServer() 還做了監(jiān)聽的準備。

/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有調(diào)用listen()
if (server.port != 0 &&
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
     exit(1);
// UNIX 域套接字
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
    unlink(server.unixsocket); /* don't care if this fails */
    server.sofd = anetUnixServer(server.neterr,server.unixsocket,
    server.unixsocketperm);
if (server.sofd == ANET_ERR) {
    redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
    exit(1);
   }
}

從上面可以看出,Redis 提供了 TCP 和 UNIX 域套接字兩種工作方式。以 TCP 工作方式為例,listenPort() 創(chuàng)建綁定了套接字并啟動了監(jiān)聽,這是網(wǎng)絡(luò)編程的基礎(chǔ)部分了。

為監(jiān)聽套接字注冊事件

在進入事件循環(huán)前還需要做一些準備工作。緊接著,initServer() 為所有的監(jiān)聽套接字注冊了讀事件(讀事件表示有新的連接到來),響應(yīng)函數(shù)為 acceptTcpHandler() 或者 acceptUnixHandler()。

// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
    // acceptTcpHandler() tcp 連接接受處理函數(shù)
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    acceptTcpHandler,NULL) == AE_ERR)
   {
       redisPanic(
         "Unrecoverable error creating server.ipfd file event.");
       }
   }
// UNIX 域套接字
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR)
    redisPanic("Unrecoverable error creating server.sofd file event.");

來看看 acceptTcpHandler() 做了什么:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    // 接收客戶端請求
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    // 出錯
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
    return;
    }
    // 記錄
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
    // 真正有意思的地方
    acceptCommonHandler(cfd,0);
}

接收套接字與客戶端建立連接后,調(diào)用 acceptCommonHandler()。acceptCommonHandler()主要工作就是:

  1. 建立并保存服務(wù)端與客戶端的連接信息,這些信息保存在一個 struct redisClient 結(jié)構(gòu) 體中;
  2. 為與客戶端連接的套接字注冊讀事件,相應(yīng)的回調(diào)函數(shù)為 readQueryFromClient(),readQueryFromClient() 作用是從套接字讀取數(shù)據(jù),執(zhí)行相應(yīng)操作并回復客戶端。

簡而言之,就是接收一個 TCP 請求。

事件循環(huán)

以上做好了準備工作,可以進入事件循環(huán)。跳出 initServer() 回到 main() 中,main() 會調(diào)用 aeMain()。進入事件循環(huán)發(fā)生在 aeProcessEvents() 中:

  1. 根據(jù)定時事件表計算需要等待的最短時間;
  2. 調(diào)用 redis api aeApiPoll() 進入監(jiān)聽輪詢,如果沒有事件發(fā)生就會進入睡眠狀態(tài),其實 就是 I/O 多路復用 select() epoll() 等的調(diào)用;
  3. 有事件發(fā)生會被喚醒,處理已觸發(fā)的 I/O 事件和定時事件。

來看看 aeMain() 的具體實現(xiàn):

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
    // 進入事件循環(huán)可能會進入睡眠狀態(tài)。在睡眠之前,執(zhí)行預(yù)設(shè)置的函數(shù)
    // aeSetBeforeSleepProc()。
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
     // AE_ALL_EVENTS 表示處理所有的事件
     aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

事件觸發(fā)

這里以 select 版本的 redis api 實現(xiàn)作為講解,aeApiPoll() 調(diào)用了 select() 進入了監(jiān)聽輪 詢。aeApiPoll() 的 tvp 參數(shù)是最小等待時間,它會被預(yù)先計算出來,它主要完成:

  1. 拷貝讀寫的 fdset。select() 的調(diào)用會破壞傳入的fdset,實際上有兩份 fdset,一份作為 備份,另一份用作調(diào)用。每次調(diào)用 select() 之前都從備份中直接拷貝一份;
  2. 調(diào)用 select();
  3. 被喚醒后,檢查 fdset 中的每一個文件描述符,并將可讀或者可寫的描述符記錄到觸 發(fā)表當中。

接下來的操作便是執(zhí)行相應(yīng)的回調(diào)函數(shù),代碼在上一段中已經(jīng)貼出:先處理 I/O 事件,再 處理定時事件。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
   aeApiState *state = eventLoop->apidata;
   int retval, j, numevents = 0;
   /*
   真有意思,在aeApiState 結(jié)構(gòu)中:
   typedef struct aeApiState {
   fd_set rfds, wfds;
   fd_set _rfds, _wfds;
   } aeApiState;
   在調(diào)用select() 的時候傳入的是_rfds 和_wfds,所有監(jiān)聽的數(shù)據(jù)
   在rfds 和wfds 中。
   在下次需要調(diào)用selec() 的時候,會將rfds 和wfds 中的數(shù)據(jù)拷貝
   進_rfds 和_wfds 中。*/
   memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
   memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
   retval = select(eventLoop->maxfd+1,
   &state->_rfds,&state->_wfds,NULL,tvp);
   if (retval > 0) {
   // 輪詢
   for (j = 0; j <= eventLoop->maxfd; j++) {
        int mask = 0;
        aeFileEvent *fe = &eventLoop->events[j];
   if (fe->mask == AE_NONE) continue;
   if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
        mask |= AE_READABLE;
   if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
        mask |= AE_WRITABLE;
    // 添加到觸發(fā)事件表中
       eventLoop->fired[numevents].fd = j;
       eventLoop->fired[numevents].mask = mask;
       numevents++;
    }
  }
  return numevents;
}

總結(jié)

http://wiki.jikexueyuan.com/project/redis/images/redis5.png" alt="" />

Redis 的事件驅(qū)動總結(jié)如下:

  • 初始化事件循環(huán)結(jié)構(gòu)體
  • 注冊監(jiān)聽套接字的讀事件
  • 注冊定時事件
  • 進入事件循環(huán)
  • 如果監(jiān)聽套接字變?yōu)榭勺x,會接收客戶端請求,并為對應(yīng)的套接字注冊讀事件
  • 如果與客戶端連接的套接字變?yōu)榭勺x,執(zhí)行相應(yīng)的操作
上一篇:初探 Redis下一篇:主從復制