在剛剛接觸 Redis 的時(shí)候,最想要知道的是一個(gè)’set name Jhon’ 命令到達(dá) Redis 服務(wù)器的時(shí)候,它是如何返回’OK’ 的?里面命令處理的流程如何,具體細(xì)節(jié)怎么樣?你一定有問(wèn)過(guò)自己。閱讀別人的代碼是很枯燥的,但帶著好奇心閱讀代碼,是一件很興奮的事情,接著翻到了 Redis 源碼的 main() 函數(shù)。
http://wiki.jikexueyuan.com/project/redis/images/redis6.png" alt="" />
Redis 在啟動(dòng)做了一些初始化邏輯,比如配置文件讀取,數(shù)據(jù)中心初始化,網(wǎng)絡(luò)通信模塊初始化等,待所有初始化任務(wù)完畢后,便開(kāi)始等待請(qǐng)求。
當(dāng)請(qǐng)求到來(lái)時(shí),Redis 進(jìn)程會(huì)被喚醒,原理是 epoll. select, kqueue 等一些 I/O 多路復(fù)用的系統(tǒng)調(diào)用。如果有閱讀上一章節(jié),應(yīng)該理解這一句話。接著讀取來(lái)來(lái)自客戶端的數(shù)據(jù),解析命令,查找命令,并執(zhí)行命令。
執(zhí)行命令’set name Jhon’ 的時(shí)候,Redis 會(huì)在預(yù)先初始化好的哈希表里頭, 查找 key=’name’ 對(duì)應(yīng)的位置,并存入。
最后,把回復(fù)的內(nèi)容準(zhǔn)備好回送給客戶端,客戶端于是收到了’OK’. 接下來(lái),我們看看詳細(xì)的過(guò)程是怎么樣的。
了解了 Redis 的事件驅(qū)動(dòng)模型后,帶著命令是如何被處理的這個(gè)問(wèn)題去讀代碼。剛開(kāi)始的時(shí)候,會(huì)有一堆的變量和函數(shù)等著讀者,但只要抓住主干就好了,下面就是 Redis 的主干部分。
int main(int argc, char **argv) {
......
// 初始化服務(wù)器配置,主要是填充 redisServer 結(jié)構(gòu)體中的各種參數(shù)
initServerConfig();
......
// 初始化服務(wù)器
initServer();
......
// 進(jìn)入事件循環(huán)
aeMain(server.el);
}
分別來(lái)看看它們主要做了什么?
initServerConfig() 主要是填充 struct redisServer 這個(gè)結(jié)構(gòu)體,Redis 所有相關(guān)的配置都 在里面。
void initServer() {
// 創(chuàng)建事件循環(huán)結(jié)構(gòu)體
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
// 分配數(shù)據(jù)集空間
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有調(diào)用listen()
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
......
// 初始化redis 數(shù)據(jù)集
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.REDIS_DEFAULT_DBNUM; j++) { // 初始化多個(gè)數(shù)據(jù)庫(kù)
// 哈希表,用于存儲(chǔ)鍵值對(duì)
server.db[j].dict = dictCreate(&dbDictType,NULL);
// 哈希表,用于存儲(chǔ)每個(gè)鍵的過(guò)期時(shí)間
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
......
// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
// acceptTcpHandler() tcp 連接接受處理函數(shù)
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
......
}
在這里,創(chuàng)建了事件中心,是 Redis 的網(wǎng)絡(luò)模塊,如果你有學(xué)過(guò) linux 下的網(wǎng)絡(luò)編程,那么知道這里一定和 select/epoll/kqueue 相關(guān)。
接著,是初始化數(shù)據(jù)中心,我們平時(shí)使用 Redis 設(shè)置的鍵值對(duì),就是存儲(chǔ)在里面。這里不急著深入它是怎么做到存儲(chǔ)我們的鍵值對(duì)的,接著往下看好了,因?yàn)槲覀冎饕窍氚汛笾碌拿}絡(luò)弄清楚。
在最后一段的代碼中,Redis 給 listen fd 注冊(cè)了回調(diào)函數(shù) acceptTcpHandler,也就是說(shuō)當(dāng)新的客戶端連接的時(shí)候,這個(gè)函數(shù)會(huì)被調(diào)用,詳情接下來(lái)再展開(kāi)。
接著就開(kāi)始等待請(qǐng)求的到來(lái)。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 進(jìn)入事件循環(huán)可能會(huì)進(jìn)入睡眠狀態(tài)。在睡眠之前,執(zhí)行預(yù)設(shè)置
// 的函數(shù)aeSetBeforeSleepProc()。
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// AE_ALL_EVENTS 表示處理所有的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
前面的兩個(gè)函數(shù)都屬于是初始化的工作,到這里的時(shí)候,Redis 正式進(jìn)入等待接收請(qǐng)求的狀態(tài)。具體的實(shí)現(xiàn),和 select/epoll/kqueue 這些 IO 多路復(fù)用的系統(tǒng)調(diào)用相關(guān),而這也是網(wǎng)絡(luò)編程的基礎(chǔ)部分了。繼續(xù)跟蹤調(diào)用鏈:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
......
// 調(diào)用IO 多路復(fù)用函數(shù)阻塞監(jiān)聽(tīng)
numevents = aeApiPoll(eventLoop, tvp);
// 處理已經(jīng)觸發(fā)的事件
for (j = 0; j < numevents; j++) {
// 找到I/O 事件表中存儲(chǔ)的數(shù)據(jù)
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 讀事件
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫(xiě)事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 處理定時(shí)事件
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
可以看到,aeApiPoll 即是 IO 多路復(fù)用調(diào)用的地方,當(dāng)有請(qǐng)求到來(lái)的時(shí)候,進(jìn)程會(huì)覺(jué)醒以處理到來(lái)的請(qǐng)求。如果你有留意到上面的定時(shí)事件處理,也就明白相應(yīng)的定時(shí)處理函數(shù)是在哪里觸發(fā)的了。
在 initServer() 的講解中,Redis 注冊(cè)了回調(diào)函數(shù) acceptTcpHandler(),當(dāng)有新的連接到來(lái)時(shí),這個(gè)函數(shù)會(huì)被回調(diào),上面的函數(shù)指針 rfileProc() 實(shí)際上就是指向了 acceptTcpHandler()。
下面是 acceptTcpHandler() 的核心代碼:
// 用于TCP 接收請(qǐng)求的處理函數(shù)
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
// 接收客戶端請(qǐng)求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 出錯(cuò)
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
// 記錄
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 真正有意思的地方
acceptCommonHandler(cfd,0);
}
anetTcpAccept 是接收一個(gè)請(qǐng)求cfd,真正有意思的地方是acceptCommonHandler,而 acceptCommonHandler 最核心的調(diào)用是 createClient。Redis 對(duì)于每一個(gè)客戶端的連接,都會(huì)對(duì)應(yīng)一個(gè)結(jié)構(gòu)體 struct redisClient。下面是 createClient 的核心代碼:
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 為接收到的套接字注冊(cè)監(jiān)聽(tīng)事件
// readQueryFromClient() 應(yīng)該為處理客戶端請(qǐng)求的函數(shù)
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
......
return c;
}
可以看到,createClient 在事件中心為與客戶端連接的套接字注冊(cè)了 readQueryFrom-Client() 回調(diào)函數(shù),而這也就是說(shuō)當(dāng)客戶端有請(qǐng)求數(shù)據(jù)過(guò)來(lái)的時(shí)候,acceptTcpHandler() 會(huì)被調(diào)用。于是,我們找到了’set name Jhon’ 開(kāi)始處理的地方。
readQueryFromClient() 則是獲取來(lái)自客戶端的數(shù)據(jù),接下來(lái)它會(huì)調(diào)用processInput-Buffer() 解析命令和執(zhí)行命令,對(duì)于命令的執(zhí)行,調(diào)用的是函數(shù) processCommand()。下面是 processCommand() 核心代碼:
int processCommand(redisClient *c) {
......
// 查找命令,redisClient.cmd 在此時(shí)賦值
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// 沒(méi)有找到命令
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
// 參數(shù)個(gè)數(shù)不符合
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
.....
// 加入命令隊(duì)列的情況
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 命令入隊(duì)
queueMultiCommand(c);
addReply(c,shared.queued);
// 真正執(zhí)行命令。
// 注意,如果是設(shè)置了多命令模式,那么不是直接執(zhí)行命令,而是讓命令入隊(duì)
} else {
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
如上可以看到,Redis 首先根據(jù)客戶端給出的命令字在命令表中查找對(duì)應(yīng)的 c->cmd, 即 struct redisCommand()。
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
Redis 在初始化的時(shí)候準(zhǔn)備了一個(gè)大數(shù)組,初始化了所有的命令,即初始化多個(gè) structredisCommand,在 struct redisCommand 中就有該命令對(duì)應(yīng)的回調(diào)函數(shù)指針。找到命令結(jié)構(gòu)體后,則開(kāi)始執(zhí)行命令,核心調(diào)用是call()。
call() 做的事情有很多,但這里只關(guān)注這一句話:call() 調(diào)用了命令的回調(diào)函數(shù)。
// call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
......
// 執(zhí)行命令對(duì)應(yīng)的處理函數(shù)
c->cmd->proc(c);
......
}
http://wiki.jikexueyuan.com/project/redis/images/redis7.png" alt="" />
對(duì)于’set name Jhon’ 命令,對(duì)應(yīng)的回調(diào)函數(shù)是 setCommand() 函數(shù)。setCommand 對(duì) set 命令的參數(shù)做了檢測(cè),因?yàn)檫€提供設(shè)置一個(gè)鍵值對(duì)的過(guò)期時(shí)間等功能,這里只關(guān)注最簡(jiǎn)單的情況。
void setCommand(redisClient *c) {
......
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setGenericCommand(redisClient *c, int flags, robj *key,
robj *val, robj *expire, int unit, robj *ok_reply,
robj *abort_reply) {
......
setKey(c->db,key,val);
......
addReply(c, ok_reply ? ok_reply : shared.ok);
}
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
......
}
setKey() 首先查看 key 是否存在于數(shù)據(jù)集中,如果存在則覆蓋寫(xiě);如果不存在則添加到數(shù)據(jù)集中。這里關(guān)注 key 不存在的情況:
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
}
dictAdd() 就是把 key 存到字典中,實(shí)際上即是存到一個(gè)哈希表。
最后,回到 setGenericCommand(), 會(huì)調(diào)用 addReply()。addReply() 會(huì)為與客戶端連接的套接字注冊(cè)可寫(xiě)事件,把’ok’ 添加到客戶端的回復(fù)緩存中。待再一次回到事件循環(huán)的時(shí)候,如果這個(gè)套接字可寫(xiě),相應(yīng)的回調(diào)函數(shù)就可以被回調(diào)了?;貜?fù)緩存中的數(shù)據(jù)會(huì)被發(fā)送到客戶端。
由此’set name Jhon’ 命令執(zhí)行完畢。在把這個(gè)流程捋順的過(guò)程,我省去了很多的細(xì)節(jié),只關(guān)注場(chǎng)景最簡(jiǎn)單情況最單一的時(shí)候,其他的代碼都沒(méi)有去看,譬如主從復(fù)制的,持久化的相關(guān)邏輯。這對(duì)我們快速了解一個(gè)系統(tǒng)的原理是很關(guān)鍵的。同樣,在面對(duì)其他系統(tǒng)代碼的時(shí)候,也可以帶著這三個(gè)最簡(jiǎn)單的問(wèn)題去閱讀:它是誰(shuí),它從哪里來(lái),又到哪里去。