閱讀 Memcached 最好有 libevent 基礎(chǔ),Memcached 是基于 Libevent 構(gòu)建起來的。通由 Libevent 提供的事件驅(qū)動機(jī)制觸發(fā) Memcached 中的 IO 事件。已經(jīng)有大牛剖析過 Libevent 源碼了,可以在網(wǎng)絡(luò)上搜索相關(guān)的資料。
個(gè)人認(rèn)為,閱讀源碼的起初最忌鉆牛角尖,如頭文件里天花亂墜的結(jié)構(gòu)體到底有什么用。源文件里稀里嘩啦的函數(shù)是做什么的。剛開始并沒必要事無巨細(xì)弄清楚頭文件每個(gè)類型定義的具體用途; 很可能那些是不緊要的工具函數(shù),知道他的功能和用法就沒他事了。
來看 Memcached 內(nèi)部做了什么事情。Memcached 是用 C 語言實(shí)現(xiàn),必須有一個(gè)入口函數(shù) main(),Memcached 的生命從這里開始。
根據(jù) Memcached 配置,開啟以下兩種服務(wù)模式中的一種:
Memcached 有可配置的兩種模式:UNIX 域套接字和 TCP/UDP,允許客戶端以兩種方式向 Memcached 發(fā)起請求??蛻舳撕头?wù)器在同一個(gè)主機(jī)上的情況下可以用 UNIX 域套接字,否則可以采用 TCP/UDP 的模式。兩種模式是不兼容的。特別的,如果是 UNIX 域套接字或者 TCP 模式,需要建立監(jiān)聽套接字,并在事件中心注冊了讀事件,回調(diào)函數(shù)是 event_handler(),我們會看到所有的連接都會被注冊回調(diào)函數(shù)是 event_handler()。
調(diào)用 event_base_loop() 開啟 libevent 的事件循環(huán)。到此,Memcached 服務(wù)器的工作正式進(jìn)入了工作。如果遇到致命錯(cuò)誤或者客戶明令結(jié)束 Memcached,那么才會進(jìn)入接下來的清理工作。
在初始化過程中介紹了這兩種模式,Memcached 這么做為的是讓其能更加可配置。
TCP/UDP 自不用說,UNIX 域套接字有獨(dú)特的優(yōu)勢:
在 thread_init(),setup_thread() 函數(shù)的實(shí)現(xiàn)中,Memcached 的意圖是很清楚的。每個(gè)線程都有自己獨(dú)有的連接隊(duì)列,即 CQ,注意這個(gè)連接隊(duì)列中的對象并不是一個(gè)或者多個(gè) Memcached 命令,它對應(yīng)一個(gè)客戶!一旦一個(gè)客戶交給了一個(gè)線程,它的余生就屬于這個(gè)線程了! 線程只要被喚醒就立即進(jìn)入工作狀態(tài),將自己 CQ 隊(duì)列的任務(wù)所有完完成。當(dāng)然,每一個(gè)工作線程都有自己的 libevent 事件中心。
很關(guān)鍵的線索是 thread_init() 的實(shí)現(xiàn)中,每個(gè)工作線程都創(chuàng)建了讀寫管道,所能給我們的提示是:只要利用 libevent 在工作線程的事件中心注冊讀管道的讀事件,就可以按需喚醒線程,完成工作,很有意思,而setup_thread() 的工作正是讀管道的讀事件被注冊到線程的事件中心,回調(diào)函數(shù)是 thread_libevent_process()。thread_libevent_process() 的工作就是從工作線程自己的 CQ 隊(duì)列中取出任務(wù)執(zhí)行,而往工作線程工作隊(duì)列中添加任務(wù)的是 dispatch_conn_new(),此函數(shù)一般由主線程調(diào)用。下面是主線程和工作線程的工作流程:
http://wiki.jikexueyuan.com/project/redis/images/s.png" alt="" />
多任務(wù)并行處理的兩種方式,一種是將所有的任務(wù)用隊(duì)列存儲起來,每個(gè)工作者依次去拿一個(gè)來處理,直到做完所有的> 任務(wù)為止。另一種是將任務(wù)平均分給工作者,先做完任務(wù)的工作者就去別的工作者那里拿一些任務(wù)來做,同樣直到所有任務(wù)做完為止。兩種方式的結(jié)果如何?根據(jù)自己的場景寫碼驗(yàn)證。
Memcached 所采用的模式就是這里所說的第二種! Memcached 的線程分配模式是:一個(gè)主線程和多個(gè)工作線程。主線程負(fù)責(zé)初始化和將接收的請求分派給工作線程,工作線程負(fù)責(zé)接收客戶的命令請求和回復(fù)客戶。
Memcached 是做緩存用的,內(nèi)部肯定有一個(gè)容器?;氐?main() 中,調(diào)用 assoc_init() 初始化了容器–hashtable,采用頭插法插入新數(shù)據(jù),因?yàn)轭^插法是最快的。Memcached 只做了一級的索引,即 hash;接下來的就靠 memcmp() 在鏈表中找數(shù)據(jù)所在的位置。Memcached 容器管理的接口主要在 item.h .c 中。
http://wiki.jikexueyuan.com/project/redis/images/s1.png" alt="" />
每個(gè)連接都會建立一個(gè)連接結(jié)構(gòu)體與之對應(yīng)。main() 中會調(diào)用 conn_init() 建立連接結(jié)構(gòu)體數(shù)組。連接結(jié)構(gòu)體 struct conn 記錄了連接套接字,讀取的數(shù)據(jù),將要寫入的數(shù)據(jù),libevent event 結(jié)構(gòu)體以及所屬的線程信息。
當(dāng)有新的連接時(shí),主線程會被喚醒,主線程選定一個(gè)工作線程 thread0,在thread0 的寫管道中寫入數(shù)據(jù),特別的如果是接受新的連接而不是接受新的數(shù)據(jù),寫入管道的數(shù)據(jù)是字符’c’。工作線程因管道中有數(shù)據(jù)可讀被喚醒,thread_libevent_process() 被調(diào)用,新連接套接字被注冊了 event_handler() 回調(diào)函數(shù),這些工作在conn_new() 中完成。因此,客戶端有命令請求的時(shí)候(譬如發(fā)起 get key 命令),工作線程都會被觸發(fā)調(diào)用 event_handler()。
當(dāng)出現(xiàn)致命錯(cuò)誤或者客戶命令結(jié)束服務(wù)(quit 命令),關(guān)于此連接的結(jié)構(gòu)體內(nèi)部的數(shù)據(jù)會被釋放(譬如曾經(jīng)讀取的數(shù)據(jù)),但結(jié)構(gòu)體本身不釋放,等待下一次使用。如果有需要,連接結(jié)構(gòu)體數(shù)組會指數(shù)自增。
Memcached 服務(wù)一個(gè)客戶的時(shí)候,是怎么一個(gè)過程,試著去調(diào)試模擬一下。當(dāng)一個(gè)客戶向 Memcached 發(fā)起請求時(shí),主線程會被喚醒,接受請求。接下來的工作在連接管理中有說到。
客戶已經(jīng)與 Memcached 服務(wù)器建立了連接,客戶在終端(黑框框) 敲擊 get key + 回車鍵,一個(gè)請求包就發(fā)出去了。從連接管理中已經(jīng)了解到所有連接套接字都會被注冊回調(diào)函數(shù)為 event_handler(),因此 event_handler() 會被觸發(fā)調(diào)用。
void event_handler(const int fd, const short which, void *arg) {
conn *c;
c = (conn *)arg;
assert(c != NULL);
c->which = which;
/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
drive_machine(c);
/* wait for next event */
return;
}
event_handler() 調(diào)用了 drive_machine()。drive_machine() 是請求處理的開端,特別的當(dāng)有新的連接時(shí),listen socket 也是有請求的,所以建立新的連接也會調(diào)用 drive_machine(),這在連接管理有提到過。下面是 drive_machine() 函數(shù)的骨架:
// 請求的開端。當(dāng)有新的連接的時(shí)候event_handler() 會調(diào)用此函數(shù)。
static void drive_machine(conn *c) {
bool stop = false;
int sfd, flags = 1;
socklen_t addrlen;
struct sockaddr_storage addr;
int nreqs = settings.reqs_per_event;
int res;
const char *str;
assert(c != NULL);
while (!stop) {
// while 能保證一個(gè)命令被執(zhí)行完成或者異常中斷(譬如IO 操作次數(shù)超出了一定的限制)
switch(c->state) {
// 正在連接,還沒有accept
case conn_listening:
// 等待新的命令請求
case conn_waiting:
// 讀取數(shù)據(jù)
case conn_read:
// 嘗試解析命令
case conn_parse_cmd :
// 新的命令請求,只是負(fù)責(zé)轉(zhuǎn)變conn 的狀態(tài)
case conn_new_cmd:
// 真正執(zhí)行命令的地方
case conn_nread:
// 讀取所有的數(shù)據(jù),拋棄!!! 一般出錯(cuò)的情況下會轉(zhuǎn)換到此狀態(tài)
case conn_swallow:
// 數(shù)據(jù)回復(fù)
case conn_write:
case conn_mwrite:
// 連接結(jié)束。一般出錯(cuò)或者客戶顯示結(jié)束服務(wù)的情況下回轉(zhuǎn)換到此狀態(tài)
case conn_closing:
}
}
return;
}
通過修改連接結(jié)構(gòu)體狀態(tài) struct conn.state 執(zhí)行相應(yīng)的操作,從而完成一個(gè)請求,完成后 stop 會被設(shè)置為 true,一個(gè)命令只有執(zhí)行結(jié)束(無論結(jié)果如何) 才會跳出這個(gè)循環(huán)。我們看到 struct conn 有好多種狀態(tài),一個(gè)正常執(zhí)行的命令狀態(tài)的轉(zhuǎn)換是:
conn_new_cmd->conn_waiting->conn_read->conn_parse_cmd->conn_nread->conn_mwrite->conn_close
這個(gè)過程任何一個(gè)環(huán)節(jié)出了問題都會導(dǎo)致狀態(tài)轉(zhuǎn)變?yōu)?conn_close。帶著剛開始的問題把從客戶連接到一個(gè)命令執(zhí)行結(jié)束的過程是怎么樣的:
event_-handler()->drive_machine()
被調(diào)用,此時(shí)主線程對應(yīng) conn 狀態(tài)為 conn_listining,接
受請求Memcached 的服務(wù)器沒有向其他 Memcached 服務(wù)器收發(fā)數(shù)據(jù)的功能,意即就算部署多個(gè) Memcached 服務(wù)器,他們之間也沒有任何的通信,Memcached 所謂的分布式部署也是并非平時(shí)所說的分布式。所說的「分布式」是通過創(chuàng)建多個(gè) Memcached 服務(wù)器節(jié)點(diǎn),在客戶端添加緩存請求分發(fā)器來實(shí)現(xiàn)的。Memcached 的更多的時(shí)候限制是來自網(wǎng)絡(luò) I/O,所以應(yīng)該盡量減少網(wǎng)絡(luò) I/O。
http://wiki.jikexueyuan.com/project/redis/images/s2.png" alt="" />