Redis 支持 master-slave(主從)模式,一個 redis server 可以設置為另一個 redis server 的主機(從機),從機定期從主機拿數(shù)據。特殊的,一個從機同樣可以設置為一個 redis server 的主機,這樣一來 master-slave 的分布看起來就是一個有向無環(huán)圖 DAG,如此形成 redis server 集群,無論是主機還是從機都是 redis server,都可以提供服務。
http://wiki.jikexueyuan.com/project/redis/images/redis22.png" alt="" />
在配置后,主機可負責讀寫服務,從機只負責讀。Redis 提高這種配置方式,為的是讓其支持數(shù)據的弱一致性,即最終一致性。在業(yè)務中,選擇強一致性還是弱一致性,應該取決于具體的業(yè)務需求,比如微博里的 timeline,可以使用弱一致性模型;比如支付寶的支付賬單,要選用強一致性模型。
binlog 是在 mysql 中的一種日志類型,它記錄了所有數(shù)據庫自備份一來的所有更新操作或潛在的更新操作,描述了數(shù)據的更改。因為 binlog 只記錄了數(shù)據的更新,所以適合用來做實時備份和主從復制。同樣,Redis 在主從復制上用的就是一種類似 binlog 的日志。在《AOF 持久化策略》中,介紹了更新緩存的概念,舉一個例子:客戶端發(fā)來命令:set name Jhon
,這一數(shù)據更新被記錄為:*3/r/n$3/r/nSET/r/n$4/r/nname/r/n$3/r/nJhon/r/n
,并存儲在更新緩存中。
同樣,在主從連接中,也有更新緩存的概念。只是兩者的用途不一樣,前者被寫入本地,后者被寫入從機,這里我們把它成為積壓空間。更新緩存存儲在 server.repl_backlog,Redis 將其作為一個環(huán)形空間來處理,這樣做節(jié)省了空間,避免內存再分配的情況。
struct redisServer {
......
/* Replication (master) */
// 最近一次使用(訪問)的數(shù)據集
int slaveseldb; /* Last SELECTed DB in replication output */
// 全局的數(shù)據同步偏移量
long long master_repl_offset; /* Global replication offset */
// 主從連接心跳頻率
int repl_ping_slave_period; /* Master pings the slave every N seconds */
// 積壓空間指針
char *repl_backlog; /* Replication backlog for partial syncs */
// 積壓空間大小
long long repl_backlog_size; /* Backlog circular buffer size */
// 積壓空間中寫入的新數(shù)據的大小
long long repl_backlog_histlen; /* Backlog actual data length */
// 下一次向積壓空間寫入數(shù)據的起始位置
long long repl_backlog_idx; /* Backlog circular buffer current offset */
// 積壓數(shù)據的起始位置的所對應的全局主從復制偏移量
long long repl_backlog_off; /* Replication offset of first byte in the
backlog buffer. */
// 積壓空間有效時間
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
......
}
積壓空間中的數(shù)據變更記錄是什么時候被寫入的?在執(zhí)行一個 Redis 命令的時候,如果存在數(shù)據的修改(寫),那么就會把變更記錄傳播。Redis 源碼中是這么實現(xiàn)的:call()->propagate()->replicationFeedSlaves()。
需注意,命令真正執(zhí)行的地方在 call() 中,call() 如果發(fā)現(xiàn)數(shù)據被修改(dirty),則傳播 propagrate(),replicationFeedSlaves() 將修改記錄寫入積壓空間和所有已連接的從機。
同樣,在《AOF 持久化策略》提到的,propagrate() 也會將數(shù)據的修改記錄寫入到更新緩存中。
這里可能會有疑問:為什么把數(shù)據添加入積壓空間,又把數(shù)據分發(fā)給所有的從機?為什么不僅僅將數(shù)據分發(fā)給所有從機呢?
因為有一些從機會因特殊情況,與主機斷開連接。從機斷開前有暫存主機的狀態(tài)信息,因此這些斷開的從機就沒有及時收到更新的數(shù)據。Redis 為了讓斷開的從機在下次連接后能夠獲取更新數(shù)據,將更新數(shù)據加入了積壓空間。從replicationFeedSlaves() 實現(xiàn)來看,在線的 slave 能馬上收到數(shù)據更新記錄;因某些原因暫時斷開連接的slave,需要從積壓空間中找回斷開期間的數(shù)據更新記錄。如果斷開的時間足夠長,master 會拒絕 slave 的部分 同步請求,從而slave 只能進行全同步。
下面是更細積壓空間的核心代碼注釋:首先,在命令執(zhí)行函數(shù)中,如果發(fā)現(xiàn)是涉及寫的命令,會將修改傳播,即調用 propagrate()
// call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
......
/* Call the command. */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagate);
// 臟數(shù)據標記,數(shù)據是否被修改
dirty = server.dirty;
// 執(zhí)行命令對應的函數(shù)
c->cmd->proc(c);
dirty = server.dirty-dirty;
duration = ustime()-start;
......
// 將客戶端請求的數(shù)據修改記錄傳播給AOF 和從機
/* Propagate the command into the AOF and replication link */
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
// 強制主從復制
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
// 強制AOF 持久化
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
// 數(shù)據被修改
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
// 傳播數(shù)據修改記錄
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
......
}
主要向兩個方向傳播修改記錄,一個是 AOF 持久化,另一個則是主從復制。
// 向AOF 和從機發(fā)布數(shù)據更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
**
flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// AOF 策略需要打開,且設置AOF 傳播標記,將更新發(fā)布給本地文件
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 設置了從機傳播標記,將更新發(fā)布給從機
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
向從機傳播更新記錄的時候,Redis 主機會向所有的從機發(fā)送變更記錄,同時也會寫入到積壓空間,方便已經斷開的從機,再下一次重新連接的時候,拷貝數(shù)據。
// 向積壓空間和從機發(fā)送數(shù)據
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[REDIS_LONGSTR_SIZE];
// 沒有積壓數(shù)據且沒有從機,直接退出
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
robj *selectcmd;
// 小于等于10 的可以用共享對象
/* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
// 不能使用共享對象,生成SELECT 命令對應的redis 對象
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
// 這里可能會有疑問:為什么把數(shù)據添加入積壓空間,又把數(shù)據分發(fā)給所有的從機?
// 為什么不僅僅將數(shù)據分發(fā)給所有從機呢?
// 因為有一些從機會因特殊情況,與主機斷開連接。從機斷開前有暫存
// 主機的狀態(tài)信息,因此這些斷開的從機就沒有及時收到更新的數(shù)據。redis 為了讓
// 斷開的從機在下次連接后能夠獲取更新數(shù)據,將更新數(shù)據加入了積壓空間。
// 將SELECT 命令對應的redis 對象數(shù)據添加到積壓空間
/* Add the SELECT command into the backlog. */
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
// 將數(shù)據分發(fā)所有的從機
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
addReply(slave,selectcmd);
}
// 銷毀對象
if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
// 更新最近一次使用(訪問)的數(shù)據集
server.slaveseldb = dictid;
// 將命令寫入積壓空間
/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
char aux[REDIS_LONGSTR_SIZE+3];
// 命令個數(shù)
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
// 逐個命令寫入
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* ad add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
/* 每個命令格式如下:
$3
*3
SET
*4
NAME
*4
Jhon*/
// 命令長度
feedReplicationBacklog(aux,len+3);
// 命令
feedReplicationBacklogWithObject(argv[j]);
// 換行
feedReplicationBacklog(aux+len+1,2);
}
}
// 立即給每一個從機發(fā)送命令
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
// 如果從機要求全同步,則不對此從機發(fā)送數(shù)據
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
// 向從機命令的長度
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc);
}
}
Redis 主從同步有兩種方式(或者所兩個階段):全同步和部分同步。
主從剛剛連接的時候,進行全同步;全同步結束后,進行部分同步。如果有需要,slave 在任何時候都可以發(fā)起全同步。Redis 策略是,無論如何,首先會嘗試進行部分同步,如不成功,要求從機進行全同步,并啟動BGSAVE??BGSAVE 結束后,傳輸 RDB 文件;如果成功,允許從機進行部分同步,并傳輸積壓空間中的數(shù)據。
http://wiki.jikexueyuan.com/project/redis/images/r1.png" alt="" />
如需設置 slave,master 需要向slave 發(fā)送 SLAVEOF hostname port,從機接收到后會自動連接主機,注冊相應讀寫事件(syncWithMaster())。
// 修改主機
void slaveofCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
// slaveof no one 斷開主機連接
if (server.masterhost) {
replicationUnsetMaster();
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
}
} else {
long port;
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
return;
// 可能已經連接需要連接的主機
/* Check if we are already attached to the specified slave */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
return;
}
// 斷開之前連接主機的連接,連接新的。replicationSetMaster() 并不會真正連接主機,只是修改/* There was no previous master or the user specified a different one,
* we can continue. */
replicationSetMaster(c->argv[1]->ptr, port);
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
server.masterhost, server.masterport);
}
addReply(c,shared.ok);
}
// 設置新主機
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
sdsfree(server.masterhost);
server.masterhost = sdsdup(ip);
server.masterport = port;
// 斷開之前主機的連接
if (server.master) freeClient(server.master);
disconnectSlaves(); /* Force our slaves to resync with us as well. */
// 取消緩存主機
replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
// 釋放積壓空間
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
// cancelReplicationHandshake() 嘗試斷開數(shù)據傳輸和主機連接
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
}
// 管理主從連接的定時程序定時程序,每秒執(zhí)行一次
// 在serverCorn() 中調用
/* --------------------------- REPLICATION CRON ----------------------------- */
/* Replication cron funciton, called 1 time per second. */
void replicationCron(void) {
......
// 如果需要(EDIS_REPL_CONNECT),嘗試連接主機,真正連接主機的操作在這里
/* Check if we should connect to a MASTER */
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
}
......
}
無論如何,Redis 首先會嘗試部分同步,如果失敗才嘗試全同步。而剛剛建立連接的 master-slave 需要全同步。
從機連接主機后,會主動發(fā)起 PSYNC 命令,從機會提供 master_runid 和offset,主機驗證 master_runid 和 offset 是否有效?master_runid 相當于主機身份驗證碼,用來驗證從機上一次連接的主機,offset 是全局積壓空間數(shù)據的偏移量。
驗證未通過則,則進行全同步:主機返回+FULLRESYNC master_runid offset(從機接收并記錄 master_runid 和 offset,并準備接收RDB 文件)接著啟動 BGSAVE 生成 RDB 文件,BGSAVE 結束后,向從機傳輸,從而完成全同步。
主機和從機之間的交互圖如下:
http://wiki.jikexueyuan.com/project/redis/images/r2.png" alt="" />
// 連接主機connectWithMaster() 的時候,會被注冊為回調函數(shù)
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
......
// 這里嘗試向主機請求部分同步,主機會回復以拒絕或接受請求。如果拒絕部分同步,
// 會返回+FULLRESYNC master_runid offset
// 從機接收后準備進行全同步
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
"Partial Resynchronization.");
return;
}
// 執(zhí)行全同步
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
// 未知結果,進行出錯處理
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
// 為什么要嘗試5 次???
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> "
"SLAVE synchronization: %s",strerror(errno));
goto error;
}
// 注冊讀事件,回調函數(shù)readSyncBulkPayload(), 準備讀RDB 文件
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
// 設置傳輸RDB 文件數(shù)據的選項
// 狀態(tài)
server.repl_state = REDIS_REPL_TRANSFER;
// RDB 文件大小
server.repl_transfer_size = -1;
// 已經傳輸?shù)拇笮? server.repl_transfer_read = 0;
// 上一次同步的偏移,為的是定時寫入磁盤
server.repl_transfer_last_fsync_off = 0;
// 本地RDB 文件套接字
server.repl_transfer_fd = dfd;
// 上一次同步IO 時間
server.repl_transfer_lastio = server.unixtime;
// 臨時文件名
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
全同步請求的數(shù)據是RDB 數(shù)據文件和積壓空間中的數(shù)據。關于 RDB 數(shù)據文件,請參見《RDB 持久化策略》。如果沒有后臺持久化 BGSAVE 進程,那么 BGSVAE 會被觸發(fā),否則所有請求全同步的 slave 都會被標記為等待 BGSAVE 結束。BGSAVE 結束后,master 會馬上向所有的從機發(fā)送 RDB 文件。
下面 syncCommand() 摘取全同步的部分:
// 主機SYNC 和PSYNC 命令處理函數(shù),會嘗試進行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
......
// 主機嘗試部分同步,失敗的話向從機發(fā)送+FULLRESYNC master_runid offset,
// 接著啟動BGSAVE
// 執(zhí)行全同步:
/* Full resynchronization. */
server.stat_sync_full++;
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != -1) {
/* 存在BGSAVE 后臺進程。
1. 如果master 現(xiàn)有所連接的所有從機slaves 當中有存在
REDIS_REPL_WAIT_BGSAVE_END 的從機,那么將從機c 設置為
REDIS_REPL_WAIT_BGSAVE_END;
2. 否則,設置為REDIS_REPL_WAIT_BGSAVE_START*/
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;
// 檢測是否已經有從機申請全同步
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
// 存在狀態(tài)為REDIS_REPL_WAIT_BGSAVE_END 的從機slave,
// 就將此從機c 狀態(tài)設置為REDIS_REPL_WAIT_BGSAVE_END,
// 從而在BGSAVE 進程結束后,可以發(fā)送RDB 文件,
// 同時將從機slave 中的更新復制到此從機c。
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
// 將其他從機上的待回復的緩存復制到從機c
copyClientOutputBuffer(c,slave);
// 修改從機c 狀態(tài)為「等待BGSAVE 進程結束」
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
// 不存在狀態(tài)為REDIS_REPL_WAIT_BGSAVE_END 的從機,就將此從機c 狀態(tài)設置為
// REDIS_REPL_WAIT_BGSAVE_START,即等待新的BGSAVE 進程的開啟。
// 修改狀態(tài)為「等待BGSAVE 進程開始」
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
// 不存在BGSAVE 后臺進程,啟動一個新的BGSAVE 進程
* Ok we don't have a BGSAVE in progress, let's start one */
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
// 將此從機c 狀態(tài)設置為REDIS_REPL_WAIT_BGSAVE_END,從而在BGSAVE
// 進程結束后,可以發(fā)送RDB 文件,同時將從機slave 中的更新復制到此從機c。
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
// 清理腳本緩存???
/* Flush the script cache for the new slave. */
replicationScriptCacheFlush();
}
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
listAddNodeTail(server.slaves,c);
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
主機執(zhí)行完 BGSAVE 后,會將 RDB 文件發(fā)送給從機。
// BGSAVE 結束后,會調用
/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
// 其他操作
......
// 可能從機正在等待BGSAVE 進程的終止
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
// 當RDB 持久化(backgroundSaveDoneHandler()) 結束后,會調用此函數(shù)
// RDB 文件就緒,給所有的從機發(fā)送RDB 文件
/* This function is called at the end of every background saving.
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function.
**
The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization. */
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
// 等待BGSAVE 開始。調整狀態(tài)為等待下一次BGSAVE 進程的結束
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
// 等待BGSAVE 結束。準備向slave 發(fā)送RDB 文件
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
// 如果RDB 持久化失敗, bgsaveerr 會被設置為REDIS_ERR
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned "
"an error");
continue;
}
// 打開RDB 文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after"
" BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
// 如果之前有注冊寫事件,取消
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
// 注冊新的寫事件,sendBulkToSlave() 傳輸RDB 文件
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
// startbgsave == REDIS_ERR 表示BGSAVE 失敗,再一次進行BGSAVE 嘗試
if (startbgsave) {
/* Since we are starting a new background save for one or more slaves,
* we flush the Replication Script Cache to use EVAL to propagate every
* new EVALSHA for the first time, since all the new slaves don't know
* about previous scripts. */
replicationScriptCacheFlush();
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
/*BGSAVE 可能fork 失敗,所有等待BGSAVE 的從機都將結束連接。這是
redis 自我保護的措施,fork 失敗很可能是內存緊張*/
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
如上所說,無論如何,Redis 首先會嘗試部分同步。部分同步即把積壓空間緩存的數(shù)據,即更新記錄發(fā)送給從機。
從機連接主機后,會主動發(fā)起 PSYNC 命令,從機會提供 master_runid 和offset,主機驗證 master_runid 和 offset 是否有效?驗證通過則,進行部分同步:主機返回 +CONTINUE(從機接收后會注冊積壓數(shù)據接收事件),接著發(fā)送積壓空間數(shù)據。
主機和從機之間的交互圖如下:
http://wiki.jikexueyuan.com/project/redis/images/r3.png" alt="" />
syncWithMaster() 已經被設置為回調函數(shù),當與主機建立連接后,syncWithMaster() 會被回調,這一點查閱在 connectWithMaster() 函數(shù)。首先如果該從機從未與主機有過連接,那么會進行全同步,從主機拷貝所有的數(shù)據;否則,會嘗試進行部分同步。
// 連接主機connectWithMaster() 的時候,會被注冊為回調函數(shù)
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
......
// 嘗試部分同步,主機允許進行部分同步會返回+CONTINUE,從機接收后注冊相應的事件
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
// 函數(shù)返回三種狀態(tài):
// PSYNC_CONTINUE:表示會進行部分同步,在slaveTryPartialResynchronization()
// 中已經設置回調函數(shù)readQueryFromClient()
// PSYNC_FULLRESYNC:全同步,會下載RDB 文件
// PSYNC_NOT_SUPPORTED:未知
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
"Partial Resynchronization.");
return;
}
// 執(zhí)行全同步
......
}
slaveTryPartialResynchronization() 主要工作是判斷是進行全同步還是部分同步。
// 函數(shù)返回三種狀態(tài):
// PSYNC_CONTINUE:表示會進行部分同步,已經設置回調函數(shù)
// PSYNC_FULLRESYNC:全同步,會下載RDB 文件
// PSYNC_NOT_SUPPORTED:未知
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = -1;
if (server.cached_master) {
// 緩存了上一次與主機連接的信息,可以嘗試進行部分同步,減少數(shù)據傳輸
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld",
server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization "
"(request %s:%s).", psync_runid, psync_offset);
} else {
// 未緩存上一次與主機連接的信息,進行全同步
// psync ? -1 可以獲取主機的master_runid
redisLog(REDIS_NOTICE,"Partial resynchronization not possible "
"(no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
// 向主機發(fā)送命令,并接收回復
/* Issue the PSYNC command */
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
// 全同步
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *runid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* runid to make sure next PSYNCs will fail. */
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
// 拷貝runid
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
server.repl_master_initial_offset = strtoll(offset,NULL,10);
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
// 部分同步
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog(REDIS_NOTICE,
"Successful partial resynchronization with master.");
sdsfree(reply);
// 緩存主機替代現(xiàn)有主機,且為PSYNC(部分同步) 做好準備
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
/* If we reach this point we receied either an error since the master does
* not understand PSYNC, or an unexpected reply from the master.
* Reply with PSYNC_NOT_SUPPORTED in both cases. */
// 接收到主機發(fā)出的錯誤信息
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
下面 syncCommand() 摘取部分同步的部分:
// 主機SYNC 和PSYNC 命令處理函數(shù),會嘗試進行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
......
// 主機嘗試部分同步,允許則進行部分同步,會返回+CONTINUE,接著發(fā)送積壓空間
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
**
+FULLRESYNC <runid> <offset>
**
So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 部分同步
if (masterTryPartialResynchronization(c) == REDIS_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 部分同步失敗,會進行全同步,這時會收到來自客戶端的runid
char *master_runid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= REDIS_PRE_PSYNC_SLAVE;
}
// 執(zhí)行全同步:
......
}
主機雖然收到了來自從機的部分同步的請求,但主機并不一定會允許進行部分同步。在主機側,如果收到部分同步的請求,還需要驗證從機是否適合進行部分同步。
// 主機嘗試是否能進行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
**
On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
// 當因為異常需要與主機斷開連接的時候,從機會暫存主機的狀態(tài)信息,以便
// 下一次的部分同步。
// 1)master_runid 是從機提供一個因緩存主機的runid,
// 2)server.runid 是本機(主機)的runid。
// 匹配失敗,說明是本機(主機)不是從機緩存的主機,這時候不能進行部分同步,
// 只能進行全同步
// "?" 表示從機要求全同步
// 什么時候從機會要求全同步???
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for '%s', I'm '%s')",
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
goto need_full_resync;
}
// 從參數(shù)中解析整數(shù),整數(shù)是從機指定的偏移量
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
// 部分同步失敗的情況:
// 1、不存在積壓空間
if (!server.repl_backlog ||
// 2、psync_offset 太過小,即從機錯過太多更新記錄,安全起見,實行全同步
// 我們知道,積壓空間的大小是有限的,如果某個從機錯過的更新過多,將無法
// 在積壓空間中找到更新的記錄
psync_offset 越界
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
// 經檢測,不滿足部分同步的條件,轉而進行全同步
{
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog "
"(Slave request was: %lld).", psync_offset);
if (psync_offset > server. ) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is "
"greater than the master replication offset.");
}
goto need_full_resync;
}
// 執(zhí)行部分同步:
// 1)標記客戶端為從機
// 2)通知從機準備接收數(shù)據。從機收到+CONTINUE 會做好準備
// 3)開發(fā)發(fā)送數(shù)據
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
// 將連接的客戶端標記為從機
c->flags |= REDIS_SLAVE;
// 表示進行部分同步
// #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
// updates. */
c->replstate = REDIS_REPL_ONLINE;
// 更新ack 的時間
c->repl_ack_time = server.unixtime;
// 添加入從機鏈表
listAddNodeTail(server.slaves,c);
// 告訴從機可以進行部分同步,從機收到后會做相關的準備(注冊回調函數(shù))
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 向從機寫積壓空間中的數(shù)據,積壓空間存儲有「更新緩存」
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of "
"backlog starting from offset %lld.", psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
......
// 向從機發(fā)送+FULLRESYNC runid repl_offset
}
從機因為某些原因,譬如網絡延遲(PING 超時,ACK 超時等),可能會斷開與主機的連接。這時候,從機會嘗試保存與主機連接的信息,譬如全局積壓空間數(shù)據偏移量等,以便下一次的部分同步,并且從機會再一次嘗試連接主機。注意一點,如果斷開的時間足夠長,部分同步肯定會失敗的。
void freeClient(redisClient *c) {
listNode *ln;
/* If this is mar