鍍金池/ 教程/ 大數(shù)據(jù)/ Redis 是如何提供服務(wù)的
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開(kāi)始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡(jiǎn)介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

Redis 是如何提供服務(wù)的

在剛剛接觸 Redis 的時(shí)候,最想要知道的是一個(gè)’set name Jhon’ 命令到達(dá) Redis 服務(wù)器的時(shí)候,它是如何返回’OK’ 的?里面命令處理的流程如何,具體細(xì)節(jié)怎么樣?你一定有問(wèn)過(guò)自己。閱讀別人的代碼是很枯燥的,但帶著好奇心閱讀代碼,是一件很興奮的事情,接著翻到了 Redis 源碼的 main() 函數(shù)。

http://wiki.jikexueyuan.com/project/redis/images/redis6.png" alt="" />

Redis 在啟動(dòng)做了一些初始化邏輯,比如配置文件讀取,數(shù)據(jù)中心初始化,網(wǎng)絡(luò)通信模塊初始化等,待所有初始化任務(wù)完畢后,便開(kāi)始等待請(qǐng)求。

當(dāng)請(qǐng)求到來(lái)時(shí),Redis 進(jìn)程會(huì)被喚醒,原理是 epoll. select, kqueue 等一些 I/O 多路復(fù)用的系統(tǒng)調(diào)用。如果有閱讀上一章節(jié),應(yīng)該理解這一句話。接著讀取來(lái)來(lái)自客戶端的數(shù)據(jù),解析命令,查找命令,并執(zhí)行命令。

執(zhí)行命令’set name Jhon’ 的時(shí)候,Redis 會(huì)在預(yù)先初始化好的哈希表里頭, 查找 key=’name’ 對(duì)應(yīng)的位置,并存入。

最后,把回復(fù)的內(nèi)容準(zhǔn)備好回送給客戶端,客戶端于是收到了’OK’. 接下來(lái),我們看看詳細(xì)的過(guò)程是怎么樣的。

詳細(xì)的過(guò)程

了解了 Redis 的事件驅(qū)動(dòng)模型后,帶著命令是如何被處理的這個(gè)問(wèn)題去讀代碼。剛開(kāi)始的時(shí)候,會(huì)有一堆的變量和函數(shù)等著讀者,但只要抓住主干就好了,下面就是 Redis 的主干部分。

int main(int argc, char **argv) {
   ......
   // 初始化服務(wù)器配置,主要是填充 redisServer 結(jié)構(gòu)體中的各種參數(shù)
   initServerConfig();
   ......
   // 初始化服務(wù)器
   initServer();
   ......
   // 進(jìn)入事件循環(huán)
   aeMain(server.el);
}

分別來(lái)看看它們主要做了什么?

initServerConfig()

initServerConfig() 主要是填充 struct redisServer 這個(gè)結(jié)構(gòu)體,Redis 所有相關(guān)的配置都 在里面。

void initServer() {
   // 創(chuàng)建事件循環(huán)結(jié)構(gòu)體
   server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
   // 分配數(shù)據(jù)集空間
   server.db = zmalloc(sizeof(redisDb)*server.dbnum);
   /* Open the TCP listening socket for the user commands. */
   // listenToPort() 中有調(diào)用listen()
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);
......
// 初始化redis 數(shù)據(jù)集
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.REDIS_DEFAULT_DBNUM; j++) { // 初始化多個(gè)數(shù)據(jù)庫(kù)
    // 哈希表,用于存儲(chǔ)鍵值對(duì)
    server.db[j].dict = dictCreate(&dbDictType,NULL);
    // 哈希表,用于存儲(chǔ)每個(gè)鍵的過(guò)期時(shí)間
    server.db[j].expires = dictCreate(&keyptrDictType,NULL);
    server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
    server.db[j].ready_keys = dictCreate(&setDictType,NULL);
    server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
    server.db[j].id = j;
    server.db[j].avg_ttl = 0;
  }
......
// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
    // acceptTcpHandler() tcp 連接接受處理函數(shù)
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
   }
......
}

在這里,創(chuàng)建了事件中心,是 Redis 的網(wǎng)絡(luò)模塊,如果你有學(xué)過(guò) linux 下的網(wǎng)絡(luò)編程,那么知道這里一定和 select/epoll/kqueue 相關(guān)。

接著,是初始化數(shù)據(jù)中心,我們平時(shí)使用 Redis 設(shè)置的鍵值對(duì),就是存儲(chǔ)在里面。這里不急著深入它是怎么做到存儲(chǔ)我們的鍵值對(duì)的,接著往下看好了,因?yàn)槲覀冎饕窍氚汛笾碌拿}絡(luò)弄清楚。

在最后一段的代碼中,Redis 給 listen fd 注冊(cè)了回調(diào)函數(shù) acceptTcpHandler,也就是說(shuō)當(dāng)新的客戶端連接的時(shí)候,這個(gè)函數(shù)會(huì)被調(diào)用,詳情接下來(lái)再展開(kāi)。

aeMain()

接著就開(kāi)始等待請(qǐng)求的到來(lái)。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
    // 進(jìn)入事件循環(huán)可能會(huì)進(jìn)入睡眠狀態(tài)。在睡眠之前,執(zhí)行預(yù)設(shè)置
    // 的函數(shù)aeSetBeforeSleepProc()。
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
    // AE_ALL_EVENTS 表示處理所有的事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  }
}

前面的兩個(gè)函數(shù)都屬于是初始化的工作,到這里的時(shí)候,Redis 正式進(jìn)入等待接收請(qǐng)求的狀態(tài)。具體的實(shí)現(xiàn),和 select/epoll/kqueue 這些 IO 多路復(fù)用的系統(tǒng)調(diào)用相關(guān),而這也是網(wǎng)絡(luò)編程的基礎(chǔ)部分了。繼續(xù)跟蹤調(diào)用鏈:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ......
    // 調(diào)用IO 多路復(fù)用函數(shù)阻塞監(jiān)聽(tīng)
    numevents = aeApiPoll(eventLoop, tvp);
    // 處理已經(jīng)觸發(fā)的事件
for (j = 0; j < numevents; j++) {
    // 找到I/O 事件表中存儲(chǔ)的數(shù)據(jù)
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
    // 讀事件
    if (fe->mask & mask & AE_READABLE) {
        rfired = 1;
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 寫(xiě)事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }
    processed++;
  }
}
// 處理定時(shí)事件
/* Check time events */
if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

可以看到,aeApiPoll 即是 IO 多路復(fù)用調(diào)用的地方,當(dāng)有請(qǐng)求到來(lái)的時(shí)候,進(jìn)程會(huì)覺(jué)醒以處理到來(lái)的請(qǐng)求。如果你有留意到上面的定時(shí)事件處理,也就明白相應(yīng)的定時(shí)處理函數(shù)是在哪里觸發(fā)的了。

新連接的處理流程

在 initServer() 的講解中,Redis 注冊(cè)了回調(diào)函數(shù) acceptTcpHandler(),當(dāng)有新的連接到來(lái)時(shí),這個(gè)函數(shù)會(huì)被回調(diào),上面的函數(shù)指針 rfileProc() 實(shí)際上就是指向了 acceptTcpHandler()。

下面是 acceptTcpHandler() 的核心代碼:

// 用于TCP 接收請(qǐng)求的處理函數(shù)
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    // 接收客戶端請(qǐng)求
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    // 出錯(cuò)
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
        return;
  }
    // 記錄
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
    // 真正有意思的地方
    acceptCommonHandler(cfd,0);
}

anetTcpAccept 是接收一個(gè)請(qǐng)求cfd,真正有意思的地方是acceptCommonHandler,而 acceptCommonHandler 最核心的調(diào)用是 createClient。Redis 對(duì)于每一個(gè)客戶端的連接,都會(huì)對(duì)應(yīng)一個(gè)結(jié)構(gòu)體 struct redisClient。下面是 createClient 的核心代碼:

redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    /* passing -1 as fd it is possible to create a non connected client.
    * This is useful since all the Redis commands needs to be executed
    * in the context of a client. When commands are executed in other
    * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
    if (server.tcpkeepalive)
        anetKeepAlive(NULL,fd,server.tcpkeepalive);
    // 為接收到的套接字注冊(cè)監(jiān)聽(tīng)事件
    // readQueryFromClient() 應(yīng)該為處理客戶端請(qǐng)求的函數(shù)
    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
        readQueryFromClient, c) == AE_ERR)
    {
        close(fd);
        zfree(c);
        return NULL;
    }
  }
  ......
  return c;
}

可以看到,createClient 在事件中心為與客戶端連接的套接字注冊(cè)了 readQueryFrom-Client() 回調(diào)函數(shù),而這也就是說(shuō)當(dāng)客戶端有請(qǐng)求數(shù)據(jù)過(guò)來(lái)的時(shí)候,acceptTcpHandler() 會(huì)被調(diào)用。于是,我們找到了’set name Jhon’ 開(kāi)始處理的地方。

請(qǐng)求的處理流程

readQueryFromClient() 則是獲取來(lái)自客戶端的數(shù)據(jù),接下來(lái)它會(huì)調(diào)用processInput-Buffer() 解析命令和執(zhí)行命令,對(duì)于命令的執(zhí)行,調(diào)用的是函數(shù) processCommand()。下面是 processCommand() 核心代碼:

int processCommand(redisClient *c) {
    ......
    // 查找命令,redisClient.cmd 在此時(shí)賦值
    /* Now lookup the command and check ASAP about trivial error conditions
    * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    // 沒(méi)有找到命令
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return REDIS_OK;
        // 參數(shù)個(gè)數(shù)不符合
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
}
.....
    // 加入命令隊(duì)列的情況
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
    // 命令入隊(duì)
    queueMultiCommand(c);
    addReply(c,shared.queued);
    // 真正執(zhí)行命令。
    // 注意,如果是設(shè)置了多命令模式,那么不是直接執(zhí)行命令,而是讓命令入隊(duì)
    } else {
        call(c,REDIS_CALL_FULL);
    if (listLength(server.ready_keys))
        handleClientsBlockedOnLists();
    }
return REDIS_OK;
}

如上可以看到,Redis 首先根據(jù)客戶端給出的命令字在命令表中查找對(duì)應(yīng)的 c->cmd, 即 struct redisCommand()。

c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

Redis 在初始化的時(shí)候準(zhǔn)備了一個(gè)大數(shù)組,初始化了所有的命令,即初始化多個(gè) structredisCommand,在 struct redisCommand 中就有該命令對(duì)應(yīng)的回調(diào)函數(shù)指針。找到命令結(jié)構(gòu)體后,則開(kāi)始執(zhí)行命令,核心調(diào)用是call()。

執(zhí)行命令

call() 做的事情有很多,但這里只關(guān)注這一句話:call() 調(diào)用了命令的回調(diào)函數(shù)。

    // call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
    /* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    ......
    // 執(zhí)行命令對(duì)應(yīng)的處理函數(shù)
    c->cmd->proc(c);
    ......
}

http://wiki.jikexueyuan.com/project/redis/images/redis7.png" alt="" />

對(duì)于’set name Jhon’ 命令,對(duì)應(yīng)的回調(diào)函數(shù)是 setCommand() 函數(shù)。setCommand 對(duì) set 命令的參數(shù)做了檢測(cè),因?yàn)檫€提供設(shè)置一個(gè)鍵值對(duì)的過(guò)期時(shí)間等功能,這里只關(guān)注最簡(jiǎn)單的情況。

void setCommand(redisClient *c) {
    ......
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setGenericCommand(redisClient *c, int flags, robj *key,
        robj *val, robj *expire, int unit, robj *ok_reply,
        robj *abort_reply) {
    ......
    setKey(c->db,key,val);
    ......
    addReply(c, ok_reply ? ok_reply : shared.ok);
}
void setKey(redisDb *db, robj *key, robj *val) {
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
    } else {
    dbOverwrite(db,key,val);
  }
  ......
}

setKey() 首先查看 key 是否存在于數(shù)據(jù)集中,如果存在則覆蓋寫(xiě);如果不存在則添加到數(shù)據(jù)集中。這里關(guān)注 key 不存在的情況:

void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    int retval = dictAdd(db->dict, copy, val);
    redisAssertWithInfo(NULL,key,retval == REDIS_OK);
}

dictAdd() 就是把 key 存到字典中,實(shí)際上即是存到一個(gè)哈希表。

在哪里回復(fù)客戶端

最后,回到 setGenericCommand(), 會(huì)調(diào)用 addReply()。addReply() 會(huì)為與客戶端連接的套接字注冊(cè)可寫(xiě)事件,把’ok’ 添加到客戶端的回復(fù)緩存中。待再一次回到事件循環(huán)的時(shí)候,如果這個(gè)套接字可寫(xiě),相應(yīng)的回調(diào)函數(shù)就可以被回調(diào)了?;貜?fù)緩存中的數(shù)據(jù)會(huì)被發(fā)送到客戶端。

由此’set name Jhon’ 命令執(zhí)行完畢。在把這個(gè)流程捋順的過(guò)程,我省去了很多的細(xì)節(jié),只關(guān)注場(chǎng)景最簡(jiǎn)單情況最單一的時(shí)候,其他的代碼都沒(méi)有去看,譬如主從復(fù)制的,持久化的相關(guān)邏輯。這對(duì)我們快速了解一個(gè)系統(tǒng)的原理是很關(guān)鍵的。同樣,在面對(duì)其他系統(tǒng)代碼的時(shí)候,也可以帶著這三個(gè)最簡(jiǎn)單的問(wèn)題去閱讀:它是誰(shuí),它從哪里來(lái),又到哪里去。