在講述 Redis 如何提供服務(wù)之前,有必要介紹 Redis 的事件驅(qū)動模型。
我們知道,進程能夠進行網(wǎng)絡(luò)的讀寫操作,但有些時候這些讀寫操作是不可行的,譬如因為內(nèi)核的網(wǎng)絡(luò)發(fā)送緩沖區(qū)滿了導致不可寫;網(wǎng)絡(luò)收取緩存中無數(shù)據(jù)可讀,導致不可讀。那如果有一種機制,可以在一個事件(可讀或者可寫)發(fā)生的時候,才告知到進程,這樣就避免了進程在一個事件出現(xiàn)等待阻塞的情況,提高了進程的吞吐能力。 Redis 內(nèi)部有一個小型的事件驅(qū)動,它和 Libevent 網(wǎng)絡(luò)庫的事件驅(qū)動一樣,都是依托操作系統(tǒng)的 I/O 多路復用技術(shù)支撐起來的,這種 IO 驅(qū)動模型有個經(jīng)典的名字:Reactor 模型,反應(yīng)爐。
利用 I/O 多路復用技術(shù),監(jiān)聽感興趣的 I/O 事件,例如讀事件,寫事件等,同時也要維護一個以文件描述符為主鍵,數(shù)據(jù)為某個預(yù)設(shè)函數(shù)的事件表,這里其實就是一個數(shù)組或者鏈表。當事件觸發(fā)時,比如某個文件描述符可讀,系統(tǒng)會返回文件描述符值,用這個值在事件表中找到相應(yīng)的數(shù)據(jù)項(包括回調(diào)函數(shù)等),從而實現(xiàn)回調(diào)。同樣的,定時事件也是可以實現(xiàn)的,因為系統(tǒng)提供的 I/O 多路復用技術(shù)中的函數(shù)允許我們設(shè)置等待超時的時間,預(yù)設(shè)定時間內(nèi)沒有事件發(fā)生時,會返回。
上面一段話比較綜合,可能需要一些 Linux 系統(tǒng)編程和網(wǎng)絡(luò)編程的基礎(chǔ),但你會看到多數(shù) Reactor 事件驅(qū)動程序都是這么實現(xiàn)的。
Redis 事件驅(qū)動內(nèi)部有四個主要的數(shù)據(jù)結(jié)構(gòu),分別是:事件循環(huán)結(jié)構(gòu)體,文件事件結(jié)構(gòu)體,時間事件結(jié)構(gòu)體和觸發(fā)事件結(jié)構(gòu)體。
// 文件事件結(jié)構(gòu)體
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
// 回調(diào)函數(shù)指針
aeFileProc *rfileProc;
aeFileProc *wfileProc;
// clientData 參數(shù)一般是指向redisClient 的指針
void *clientData;
} aeFileEvent;
// 時間事件結(jié)構(gòu)體
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 定時回調(diào)函數(shù)指針
aeTimeProc *timeProc;
// 定時事件清理函數(shù),當刪除定時事件的時候會被調(diào)用
aeEventFinalizerProc *finalizerProc;
// clientData 參數(shù)一般是指向redisClient 的指針
void *clientData;
// 定時事件表采用鏈表來維護
struct aeTimeEvent *next;
} aeTimeEvent;
// 觸發(fā)事件
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
// 事件循環(huán)結(jié)構(gòu)體
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
// 記錄最大的定時事件id + 1
long long timeEventNextId;
// 用于系統(tǒng)時間的矯正
time_t lastTime; /* Used to detect system clock skew */
// I/O 事件表
aeFileEvent *events; /* Registered events */
// 被觸發(fā)的事件
aeFiredEvent *fired; /* Fired events */
// 定時事件表
aeTimeEvent *timeEventHead;
// 事件循環(huán)結(jié)束標識
int stop;
// 對于不同的I/O 多路復用技術(shù),有不同的數(shù)據(jù),詳見各自實現(xiàn)
void *apidata; /* This is used for polling API specific data */
// 新的循環(huán)前需要執(zhí)行的操作
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
上面的數(shù)據(jù)結(jié)構(gòu)能給我們很好的提示:事件循環(huán)結(jié)構(gòu)體維護 I/O 事件表,定時事件表和觸發(fā)事件表。
Redis 的主函數(shù)中調(diào)用 initServer() 函數(shù)從而初始化事件循環(huán)中心(EventLoop),它的主要工作是在 aeCreateEventLoop() 中完成的。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 分配空間
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 分配文件事件結(jié)構(gòu)體空間
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
// 分配已觸發(fā)事件結(jié)構(gòu)體空間
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
// 時間事件鏈表頭
eventLoop->timeEventHead = NULL;
// 后續(xù)提到
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
// 進入事件循環(huán)前需要執(zhí)行的操作,此項會在redis main() 函數(shù)中設(shè)置
eventLoop->beforesleep = NULL;
// 在這里,aeApiCreate() 函數(shù)對于每個IO 多路復用模型的實現(xiàn)都有不同,
// 具體參見源代碼,因為每種IO 多路復用模型的初始化都不同
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化事件類型掩碼為無事件狀態(tài)
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
有上面初始化工作只是完成了一個空的事件中心而已,并沒有注冊一些感興趣的事件。要想驅(qū)動事件循環(huán),還需要下面的工作。
文件 I/O 事件注冊主要操作在 aeCreateFileEvent() 中完成。aeCreateFileEvent() 會根據(jù)文件描述符的數(shù)值大小在事件循環(huán)結(jié)構(gòu)體的 I/O 事件表中取一個數(shù)據(jù)空間,利用系統(tǒng)提供的 I/O 多路復用技術(shù)監(jiān)聽感興趣的 I/O 事件,并設(shè)置回調(diào)函數(shù)。
http://wiki.jikexueyuan.com/project/redis/images/redis3.png" alt="" />
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 在I/O 事件表中選擇一個空間
aeFileEvent *fe = &eventLoop->events[fd];
// aeApiAddEvent() 只在此函數(shù)中調(diào)用,對于不同IO 多路復用實現(xiàn),會有所不同
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 設(shè)置回調(diào)函數(shù)
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
對于不同版本的 I/O 多路復用,比如 epoll,select,kqueue 等,Redis 有各自的版本,但接口統(tǒng)一,譬如 aeApiAddEvent(),會有多個版本的實現(xiàn)。
http://wiki.jikexueyuan.com/project/redis/images/redis4.png" alt="" />
initServer() 中調(diào)用了 aeCreateEventLoop() 完成了事件中心的初始化,initServer() 還做了監(jiān)聽的準備。
/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有調(diào)用listen()
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
// UNIX 域套接字
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
}
從上面可以看出,Redis 提供了 TCP 和 UNIX 域套接字兩種工作方式。以 TCP 工作方式為例,listenPort() 創(chuàng)建綁定了套接字并啟動了監(jiān)聽,這是網(wǎng)絡(luò)編程的基礎(chǔ)部分了。
在進入事件循環(huán)前還需要做一些準備工作。緊接著,initServer() 為所有的監(jiān)聽套接字注冊了讀事件(讀事件表示有新的連接到來),響應(yīng)函數(shù)為 acceptTcpHandler() 或者 acceptUnixHandler()。
// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
// acceptTcpHandler() tcp 連接接受處理函數(shù)
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// UNIX 域套接字
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR)
redisPanic("Unrecoverable error creating server.sofd file event.");
來看看 acceptTcpHandler() 做了什么:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
// 接收客戶端請求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 出錯
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
// 記錄
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 真正有意思的地方
acceptCommonHandler(cfd,0);
}
接收套接字與客戶端建立連接后,調(diào)用 acceptCommonHandler()。acceptCommonHandler()主要工作就是:
簡而言之,就是接收一個 TCP 請求。
以上做好了準備工作,可以進入事件循環(huán)。跳出 initServer() 回到 main() 中,main() 會調(diào)用 aeMain()。進入事件循環(huán)發(fā)生在 aeProcessEvents() 中:
來看看 aeMain() 的具體實現(xiàn):
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 進入事件循環(huán)可能會進入睡眠狀態(tài)。在睡眠之前,執(zhí)行預(yù)設(shè)置的函數(shù)
// aeSetBeforeSleepProc()。
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// AE_ALL_EVENTS 表示處理所有的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
這里以 select 版本的 redis api 實現(xiàn)作為講解,aeApiPoll() 調(diào)用了 select() 進入了監(jiān)聽輪 詢。aeApiPoll() 的 tvp 參數(shù)是最小等待時間,它會被預(yù)先計算出來,它主要完成:
接下來的操作便是執(zhí)行相應(yīng)的回調(diào)函數(shù),代碼在上一段中已經(jīng)貼出:先處理 I/O 事件,再 處理定時事件。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
/*
真有意思,在aeApiState 結(jié)構(gòu)中:
typedef struct aeApiState {
fd_set rfds, wfds;
fd_set _rfds, _wfds;
} aeApiState;
在調(diào)用select() 的時候傳入的是_rfds 和_wfds,所有監(jiān)聽的數(shù)據(jù)
在rfds 和wfds 中。
在下次需要調(diào)用selec() 的時候,會將rfds 和wfds 中的數(shù)據(jù)拷貝
進_rfds 和_wfds 中。*/
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
// 輪詢
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
// 添加到觸發(fā)事件表中
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
http://wiki.jikexueyuan.com/project/redis/images/redis5.png" alt="" />
Redis 的事件驅(qū)動總結(jié)如下: