鍍金池/ 教程/ 大數(shù)據/ 主從復制
Redis 數(shù)據淘汰機制
積分排行榜
小剖 Memcache
Redis 數(shù)據結構 intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據結構 dict
不在浮沙筑高臺
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內存數(shù)據管理
Redis 數(shù)據結構綜述
源碼日志
Web 服務器存儲 session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據結構 sds
Memcached slab 分配策略
訂閱發(fā)布機制
Redis 是如何提供服務的
Redis 事務機制
Redis 集群(下)
主從復制
Redis 應用
RDB 持久化策略
Redis 數(shù)據遷移
Redis 事件驅動詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據結構 redisOb
作者簡介
Redis 數(shù)據結構 ziplist
Redis 數(shù)據結構 skiplist
Redis 哨兵機制

主從復制

概述

Redis 支持 master-slave(主從)模式,一個 redis server 可以設置為另一個 redis server 的主機(從機),從機定期從主機拿數(shù)據。特殊的,一個從機同樣可以設置為一個 redis server 的主機,這樣一來 master-slave 的分布看起來就是一個有向無環(huán)圖 DAG,如此形成 redis server 集群,無論是主機還是從機都是 redis server,都可以提供服務。

http://wiki.jikexueyuan.com/project/redis/images/redis22.png" alt="" />

在配置后,主機可負責讀寫服務,從機只負責讀。Redis 提高這種配置方式,為的是讓其支持數(shù)據的弱一致性,即最終一致性。在業(yè)務中,選擇強一致性還是弱一致性,應該取決于具體的業(yè)務需求,比如微博里的 timeline,可以使用弱一致性模型;比如支付寶的支付賬單,要選用強一致性模型。

積壓空間

binlog 是在 mysql 中的一種日志類型,它記錄了所有數(shù)據庫自備份一來的所有更新操作或潛在的更新操作,描述了數(shù)據的更改。因為 binlog 只記錄了數(shù)據的更新,所以適合用來做實時備份和主從復制。同樣,Redis 在主從復制上用的就是一種類似 binlog 的日志。在《AOF 持久化策略》中,介紹了更新緩存的概念,舉一個例子:客戶端發(fā)來命令:set name Jhon,這一數(shù)據更新被記錄為:*3/r/n$3/r/nSET/r/n$4/r/nname/r/n$3/r/nJhon/r/n,并存儲在更新緩存中。

同樣,在主從連接中,也有更新緩存的概念。只是兩者的用途不一樣,前者被寫入本地,后者被寫入從機,這里我們把它成為積壓空間。更新緩存存儲在 server.repl_backlog,Redis 將其作為一個環(huán)形空間來處理,這樣做節(jié)省了空間,避免內存再分配的情況。

struct redisServer {
    ......
    /* Replication (master) */
    // 最近一次使用(訪問)的數(shù)據集
    int slaveseldb; /* Last SELECTed DB in replication output */
    // 全局的數(shù)據同步偏移量
    long long master_repl_offset; /* Global replication offset */
    // 主從連接心跳頻率
    int repl_ping_slave_period; /* Master pings the slave every N seconds */
    // 積壓空間指針
    char *repl_backlog; /* Replication backlog for partial syncs */
    // 積壓空間大小
    long long repl_backlog_size; /* Backlog circular buffer size */
    // 積壓空間中寫入的新數(shù)據的大小
    long long repl_backlog_histlen; /* Backlog actual data length */
    // 下一次向積壓空間寫入數(shù)據的起始位置
    long long repl_backlog_idx; /* Backlog circular buffer current offset */
    // 積壓數(shù)據的起始位置的所對應的全局主從復制偏移量
    long long repl_backlog_off; /* Replication offset of first byte in the
    backlog buffer. */
    // 積壓空間有效時間
    time_t repl_backlog_time_limit; /* Time without slaves after the backlog
    gets released. */
    ......
}

積壓空間中的數(shù)據變更記錄是什么時候被寫入的?在執(zhí)行一個 Redis 命令的時候,如果存在數(shù)據的修改(寫),那么就會把變更記錄傳播。Redis 源碼中是這么實現(xiàn)的:call()->propagate()->replicationFeedSlaves()。

需注意,命令真正執(zhí)行的地方在 call() 中,call() 如果發(fā)現(xiàn)數(shù)據被修改(dirty),則傳播 propagrate(),replicationFeedSlaves() 將修改記錄寫入積壓空間和所有已連接的從機。

同樣,在《AOF 持久化策略》提到的,propagrate() 也會將數(shù)據的修改記錄寫入到更新緩存中。

這里可能會有疑問:為什么把數(shù)據添加入積壓空間,又把數(shù)據分發(fā)給所有的從機?為什么不僅僅將數(shù)據分發(fā)給所有從機呢?

因為有一些從機會因特殊情況,與主機斷開連接。從機斷開前有暫存主機的狀態(tài)信息,因此這些斷開的從機就沒有及時收到更新的數(shù)據。Redis 為了讓斷開的從機在下次連接后能夠獲取更新數(shù)據,將更新數(shù)據加入了積壓空間。從replicationFeedSlaves() 實現(xiàn)來看,在線的 slave 能馬上收到數(shù)據更新記錄;因某些原因暫時斷開連接的slave,需要從積壓空間中找回斷開期間的數(shù)據更新記錄。如果斷開的時間足夠長,master 會拒絕 slave 的部分 同步請求,從而slave 只能進行全同步。

下面是更細積壓空間的核心代碼注釋:首先,在命令執(zhí)行函數(shù)中,如果發(fā)現(xiàn)是涉及寫的命令,會將修改傳播,即調用 propagrate()

// call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    ......
    /* Call the command. */
    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
    redisOpArrayInit(&server.also_propagate);
    // 臟數(shù)據標記,數(shù)據是否被修改
    dirty = server.dirty;
    // 執(zhí)行命令對應的函數(shù)
    c->cmd->proc(c);
    dirty = server.dirty-dirty;
    duration = ustime()-start;
    ......
    // 將客戶端請求的數(shù)據修改記錄傳播給AOF 和從機
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;
        // 強制主從復制
    if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
        // 強制AOF 持久化
    if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
        // 數(shù)據被修改
    if (dirty)
        flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
        // 傳播數(shù)據修改記錄
    if (flags != REDIS_PROPAGATE_NONE)
        propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
    ......
}

主要向兩個方向傳播修改記錄,一個是 AOF 持久化,另一個則是主從復制。

// 向AOF 和從機發(fā)布數(shù)據更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
**
flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
             int flags)
{
    // AOF 策略需要打開,且設置AOF 傳播標記,將更新發(fā)布給本地文件
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
        // 設置了從機傳播標記,將更新發(fā)布給從機
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

向從機傳播更新記錄的時候,Redis 主機會向所有的從機發(fā)送變更記錄,同時也會寫入到積壓空間,方便已經斷開的從機,再下一次重新連接的時候,拷貝數(shù)據。

// 向積壓空間和從機發(fā)送數(shù)據
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[REDIS_LONGSTR_SIZE];
    // 沒有積壓數(shù)據且沒有從機,直接退出
    /* If there aren't slaves, and there is no backlog buffer to populate,
    * we can return ASAP. */
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
        /* We can't have slaves attached and no backlog. */
        redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
        /* Send SELECT command to every slave if needed. */
    if (server.slaveseldb != dictid) {
        robj *selectcmd;
        // 小于等于10 的可以用共享對象
        /* For a few DBs we have pre-computed SELECT command. */
    if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
        selectcmd = shared.select[dictid];
    } else {
        // 不能使用共享對象,生成SELECT 命令對應的redis 對象
        int dictid_len;
        dictid_len = ll2string(llstr,sizeof(llstr),dictid);
        selectcmd = createObject(REDIS_STRING,
        sdscatprintf(sdsempty(),
        "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
        dictid_len, llstr));
    }
    // 這里可能會有疑問:為什么把數(shù)據添加入積壓空間,又把數(shù)據分發(fā)給所有的從機?
    // 為什么不僅僅將數(shù)據分發(fā)給所有從機呢?
    // 因為有一些從機會因特殊情況,與主機斷開連接。從機斷開前有暫存
    // 主機的狀態(tài)信息,因此這些斷開的從機就沒有及時收到更新的數(shù)據。redis 為了讓
    // 斷開的從機在下次連接后能夠獲取更新數(shù)據,將更新數(shù)據加入了積壓空間。
    // 將SELECT 命令對應的redis 對象數(shù)據添加到積壓空間
    /* Add the SELECT command into the backlog. */
    if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
        // 將數(shù)據分發(fā)所有的從機
        /* Send it to slaves. */
        listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        addReply(slave,selectcmd);
    }
    // 銷毀對象
    if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
        decrRefCount(selectcmd);
    }
    // 更新最近一次使用(訪問)的數(shù)據集
    server.slaveseldb = dictid;
    // 將命令寫入積壓空間
    /* Write the command to the replication backlog if any. */
    if (server.repl_backlog) {
        char aux[REDIS_LONGSTR_SIZE+3];
        // 命令個數(shù)
        /* Add the multi bulk reply length. */
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);
        // 逐個命令寫入
    for (j = 0; j < argc; j++) {
        long objlen = stringObjectLen(argv[j]);
        /* We need to feed the buffer with the object as a bulk reply
        * not just as a plain string, so create the $..CRLF payload len
        * ad add the final CRLF */
        aux[0] = '$';
        len = ll2string(aux+1,sizeof(aux)-1,objlen);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        /* 每個命令格式如下:
        $3
        *3
        SET
        *4
        NAME
        *4
        Jhon*/
        // 命令長度
        feedReplicationBacklog(aux,len+3);
        // 命令
        feedReplicationBacklogWithObject(argv[j]);
        // 換行
        feedReplicationBacklog(aux+len+1,2);
        }
    }
    // 立即給每一個從機發(fā)送命令
    /* Write the command to every slave. */
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        // 如果從機要求全同步,則不對此從機發(fā)送數(shù)據
        /* Don't feed slaves that are still waiting for BGSAVE to start */
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
        /* Feed slaves that are waiting for the initial SYNC (so these commands
        * are queued in the output buffer until the initial SYNC completes),
        * or are already in sync with the master. */
        // 向從機命令的長度
        /* Add the multi bulk length. */
        addReplyMultiBulkLen(slave,argc);
    }
}

主從數(shù)據同步機制概述

Redis 主從同步有兩種方式(或者所兩個階段):全同步和部分同步。

主從剛剛連接的時候,進行全同步;全同步結束后,進行部分同步。如果有需要,slave 在任何時候都可以發(fā)起全同步。Redis 策略是,無論如何,首先會嘗試進行部分同步,如不成功,要求從機進行全同步,并啟動BGSAVE??BGSAVE 結束后,傳輸 RDB 文件;如果成功,允許從機進行部分同步,并傳輸積壓空間中的數(shù)據。

http://wiki.jikexueyuan.com/project/redis/images/r1.png" alt="" />

如需設置 slave,master 需要向slave 發(fā)送 SLAVEOF hostname port,從機接收到后會自動連接主機,注冊相應讀寫事件(syncWithMaster())。

 // 修改主機
void slaveofCommand(redisClient *c) {
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        // slaveof no one 斷開主機連接
    if (server.masterhost) {
        replicationUnsetMaster();
        redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
    }
    } else {
        long port;
    if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
        return;
        // 可能已經連接需要連接的主機
        /* Check if we are already attached to the specified slave */
    if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
        && server.masterport == port) {
        redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
        return;
    }
    // 斷開之前連接主機的連接,連接新的。replicationSetMaster() 并不會真正連接主機,只是修改/* There was no previous master or the user specified a different one,
    * we can continue. */
    replicationSetMaster(c->argv[1]->ptr, port);
    redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
    server.masterhost, server.masterport);
        }
        addReply(c,shared.ok);
    }
    // 設置新主機
    /* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
    sdsfree(server.masterhost);
    server.masterhost = sdsdup(ip);
    server.masterport = port;
    // 斷開之前主機的連接
    if (server.master) freeClient(server.master);
        disconnectSlaves(); /* Force our slaves to resync with us as well. */
        // 取消緩存主機
        replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
        // 釋放積壓空間
        freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
        // cancelReplicationHandshake() 嘗試斷開數(shù)據傳輸和主機連接
        cancelReplicationHandshake();
        server.repl_state = REDIS_REPL_CONNECT;
        server.master_repl_offset = 0;
    }
    // 管理主從連接的定時程序定時程序,每秒執(zhí)行一次
    // 在serverCorn() 中調用
    /* --------------------------- REPLICATION CRON ----------------------------- */
    /* Replication cron funciton, called 1 time per second. */
void replicationCron(void) {
    ......
    // 如果需要(EDIS_REPL_CONNECT),嘗試連接主機,真正連接主機的操作在這里
    /* Check if we should connect to a MASTER */
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
        server.masterhost, server.masterport);
    if (connectWithMaster() == REDIS_OK) {
        redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
    ......
}

全同步

無論如何,Redis 首先會嘗試部分同步,如果失敗才嘗試全同步。而剛剛建立連接的 master-slave 需要全同步。

從機連接主機后,會主動發(fā)起 PSYNC 命令,從機會提供 master_runid 和offset,主機驗證 master_runid 和 offset 是否有效?master_runid 相當于主機身份驗證碼,用來驗證從機上一次連接的主機,offset 是全局積壓空間數(shù)據的偏移量。

驗證未通過則,則進行全同步:主機返回+FULLRESYNC master_runid offset(從機接收并記錄 master_runid 和 offset,并準備接收RDB 文件)接著啟動 BGSAVE 生成 RDB 文件,BGSAVE 結束后,向從機傳輸,從而完成全同步。

主機和從機之間的交互圖如下:

http://wiki.jikexueyuan.com/project/redis/images/r2.png" alt="" />

// 連接主機connectWithMaster() 的時候,會被注冊為回調函數(shù)
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    ......
    // 這里嘗試向主機請求部分同步,主機會回復以拒絕或接受請求。如果拒絕部分同步,
    // 會返回+FULLRESYNC master_runid offset
    // 從機接收后準備進行全同步
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
        "Partial Resynchronization.");
        return;
    }
    // 執(zhí)行全同步
    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
    * and the server.repl_master_runid and repl_master_initial_offset are
    * already populated. */
    // 未知結果,進行出錯處理
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,"Retrying with SYNC...");
    if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
                goto error;
        }
    }
    // 為什么要嘗試5 次???
    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries--) {
        snprintf(tmpfile,256,
        "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
    if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> "
            "SLAVE synchronization: %s",strerror(errno));
                goto error;
        }
    // 注冊讀事件,回調函數(shù)readSyncBulkPayload(), 準備讀RDB 文件
    /* Setup the non blocking download of the bulk file. */
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
        == AE_ERR)
    {
        redisLog(REDIS_WARNING,
        "Can't create readable event for SYNC: %s (fd=%d)",
        strerror(errno),fd);
        goto error;
    }
    // 設置傳輸RDB 文件數(shù)據的選項
    // 狀態(tài)
    server.repl_state = REDIS_REPL_TRANSFER;
    // RDB 文件大小
    server.repl_transfer_size = -1;
    // 已經傳輸?shù)拇笮?    server.repl_transfer_read = 0;
    // 上一次同步的偏移,為的是定時寫入磁盤
    server.repl_transfer_last_fsync_off = 0;
    // 本地RDB 文件套接字
    server.repl_transfer_fd = dfd;
    // 上一次同步IO 時間
    server.repl_transfer_lastio = server.unixtime;
    // 臨時文件名
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;
error:
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REDIS_REPL_CONNECT;
    return;
}

全同步請求的數(shù)據是RDB 數(shù)據文件和積壓空間中的數(shù)據。關于 RDB 數(shù)據文件,請參見《RDB 持久化策略》。如果沒有后臺持久化 BGSAVE 進程,那么 BGSVAE 會被觸發(fā),否則所有請求全同步的 slave 都會被標記為等待 BGSAVE 結束。BGSAVE 結束后,master 會馬上向所有的從機發(fā)送 RDB 文件。

下面 syncCommand() 摘取全同步的部分:

// 主機SYNC 和PSYNC 命令處理函數(shù),會嘗試進行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......
    // 主機嘗試部分同步,失敗的話向從機發(fā)送+FULLRESYNC master_runid offset,
    // 接著啟動BGSAVE
    // 執(zhí)行全同步:
    /* Full resynchronization. */
    server.stat_sync_full++;
    /* Here we need to check if there is a background saving operation
    * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1) {
        /* 存在BGSAVE 后臺進程。
        1. 如果master 現(xiàn)有所連接的所有從機slaves 當中有存在
        REDIS_REPL_WAIT_BGSAVE_END 的從機,那么將從機c 設置為
        REDIS_REPL_WAIT_BGSAVE_END;
        2. 否則,設置為REDIS_REPL_WAIT_BGSAVE_START*/
        /* Ok a background save is in progress. Let's check if it is a good
        * one for replication, i.e. if there is another slave that is
        * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;
        // 檢測是否已經有從機申請全同步
        listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        slave = ln->value;
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
    if (ln) {
        // 存在狀態(tài)為REDIS_REPL_WAIT_BGSAVE_END 的從機slave,
        // 就將此從機c 狀態(tài)設置為REDIS_REPL_WAIT_BGSAVE_END,
        // 從而在BGSAVE 進程結束后,可以發(fā)送RDB 文件,
        // 同時將從機slave 中的更新復制到此從機c。
        /* Perfect, the server is already registering differences for
        * another slave. Set the right state, and copy the buffer. */
        // 將其他從機上的待回復的緩存復制到從機c
        copyClientOutputBuffer(c,slave);
        // 修改從機c 狀態(tài)為「等待BGSAVE 進程結束」
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
    } else {
        // 不存在狀態(tài)為REDIS_REPL_WAIT_BGSAVE_END 的從機,就將此從機c 狀態(tài)設置為
        // REDIS_REPL_WAIT_BGSAVE_START,即等待新的BGSAVE 進程的開啟。
        // 修改狀態(tài)為「等待BGSAVE 進程開始」
        /* No way, we need to wait for the next BGSAVE in order to
        * register differences */
        c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
        redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
    }
    } else {
        // 不存在BGSAVE 后臺進程,啟動一個新的BGSAVE 進程
        * Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
    if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
        redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
        addReplyError(c,"Unable to perform background save");
        return;
    }
    // 將此從機c 狀態(tài)設置為REDIS_REPL_WAIT_BGSAVE_END,從而在BGSAVE
    // 進程結束后,可以發(fā)送RDB 文件,同時將從機slave 中的更新復制到此從機c。
    c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    // 清理腳本緩存???
    /* Flush the script cache for the new slave. */
    replicationScriptCacheFlush();
    }
    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
        c->repldbfd = -1;
        c->flags |= REDIS_SLAVE;
        server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
        listAddNodeTail(server.slaves,c);
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}

主機執(zhí)行完 BGSAVE 后,會將 RDB 文件發(fā)送給從機。

// BGSAVE 結束后,會調用
/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    // 其他操作
    ......
    // 可能從機正在等待BGSAVE 進程的終止
    /* Possibly there are slaves waiting for a BGSAVE in order to be served
    * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
    }
    // 當RDB 持久化(backgroundSaveDoneHandler()) 結束后,會調用此函數(shù)
    // RDB 文件就緒,給所有的從機發(fā)送RDB 文件
    /* This function is called at the end of every background saving.
    * The argument bgsaveerr is REDIS_OK if the background saving succeeded
    * otherwise REDIS_ERR is passed to the function.
    **
    The goal of this function is to handle slaves waiting for a successful
    * background saving in order to perform non-blocking synchronization. */
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        // 等待BGSAVE 開始。調整狀態(tài)為等待下一次BGSAVE 進程的結束
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
        startbgsave = 1;
        slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    // 等待BGSAVE 結束。準備向slave 發(fā)送RDB 文件
    } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
        struct redis_stat buf;
        // 如果RDB 持久化失敗, bgsaveerr 會被設置為REDIS_ERR
    if (bgsaveerr != REDIS_OK) {
        freeClient(slave);
        redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned "
        "an error");
        continue;
    }
    // 打開RDB 文件
    if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
        redis_fstat(slave->repldbfd,&buf) == -1) {
        freeClient(slave);
        redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after"
        " BGSAVE: %s", strerror(errno));
        continue;
    }
    slave->repldboff = 0;
    slave->repldbsize = buf.st_size;
    slave->replstate = REDIS_REPL_SEND_BULK;
    // 如果之前有注冊寫事件,取消
    aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
    // 注冊新的寫事件,sendBulkToSlave() 傳輸RDB 文件
    if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
        sendBulkToSlave, slave) == AE_ERR) {
        freeClient(slave);
        continue;
        }
    }
}
    // startbgsave == REDIS_ERR 表示BGSAVE 失敗,再一次進行BGSAVE 嘗試
    if (startbgsave) {
        /* Since we are starting a new background save for one or more slaves,
        * we flush the Replication Script Cache to use EVAL to propagate every
        * new EVALSHA for the first time, since all the new slaves don't know
        * about previous scripts. */
        replicationScriptCacheFlush();
    if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
        /*BGSAVE 可能fork 失敗,所有等待BGSAVE 的從機都將結束連接。這是
        redis 自我保護的措施,fork 失敗很可能是內存緊張*/
        listIter li;
        listRewind(server.slaves,&li);
        redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
        freeClient(slave);
        }
     }
  }
}

部分同步

如上所說,無論如何,Redis 首先會嘗試部分同步。部分同步即把積壓空間緩存的數(shù)據,即更新記錄發(fā)送給從機。

從機連接主機后,會主動發(fā)起 PSYNC 命令,從機會提供 master_runid 和offset,主機驗證 master_runid 和 offset 是否有效?驗證通過則,進行部分同步:主機返回 +CONTINUE(從機接收后會注冊積壓數(shù)據接收事件),接著發(fā)送積壓空間數(shù)據。

主機和從機之間的交互圖如下:

http://wiki.jikexueyuan.com/project/redis/images/r3.png" alt="" />

syncWithMaster() 已經被設置為回調函數(shù),當與主機建立連接后,syncWithMaster() 會被回調,這一點查閱在 connectWithMaster() 函數(shù)。首先如果該從機從未與主機有過連接,那么會進行全同步,從主機拷貝所有的數(shù)據;否則,會嘗試進行部分同步。

// 連接主機connectWithMaster() 的時候,會被注冊為回調函數(shù)
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    ......
    // 嘗試部分同步,主機允許進行部分同步會返回+CONTINUE,從機接收后注冊相應的事件
    /* Try a partial resynchonization. If we don't have a cached master
    * slaveTryPartialResynchronization() will at least try to use PSYNC
    * to start a full resynchronization so that we get the master run id
    * and the global offset, to try a partial resync at the next
    * reconnection attempt. */
    // 函數(shù)返回三種狀態(tài):
    // PSYNC_CONTINUE:表示會進行部分同步,在slaveTryPartialResynchronization()
    // 中已經設置回調函數(shù)readQueryFromClient()
    // PSYNC_FULLRESYNC:全同步,會下載RDB 文件
    // PSYNC_NOT_SUPPORTED:未知
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
        "Partial Resynchronization.");
        return;
    }
    // 執(zhí)行全同步
    ......
}

slaveTryPartialResynchronization() 主要工作是判斷是進行全同步還是部分同步。

// 函數(shù)返回三種狀態(tài):
// PSYNC_CONTINUE:表示會進行部分同步,已經設置回調函數(shù)
// PSYNC_FULLRESYNC:全同步,會下載RDB 文件
// PSYNC_NOT_SUPPORTED:未知
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
    /* Initially set repl_master_initial_offset to -1 to mark the current
    * master run_id and offset as not valid. Later if we'll be able to do
    * a FULL resync using the PSYNC command we'll set the offset at the
    * right value, so that this information will be propagated to the
    * client structure representing the master into server.master. */
    server.repl_master_initial_offset = -1;
    if (server.cached_master) {
        // 緩存了上一次與主機連接的信息,可以嘗試進行部分同步,減少數(shù)據傳輸
        psync_runid = server.cached_master->replrunid;
        snprintf(psync_offset,sizeof(psync_offset),"%lld",
        server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,"Trying a partial resynchronization "
        "(request %s:%s).", psync_runid, psync_offset);
    } else {
        // 未緩存上一次與主機連接的信息,進行全同步
        // psync ? -1 可以獲取主機的master_runid
        redisLog(REDIS_NOTICE,"Partial resynchronization not possible "
        "(no cached master)");
        psync_runid = "?";
        memcpy(psync_offset,"-1",3);
    }
    // 向主機發(fā)送命令,并接收回復
    /* Issue the PSYNC command */
    reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
    // 全同步
    if (!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;
        /* FULL RESYNC, parse the reply in order to extract the run id
        * and the replication offset. */
        runid = strchr(reply,' ');
    if (runid) {
        runid++;
        offset = strchr(runid,' ');
    if (offset) offset++;
    }
    if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
        redisLog(REDIS_WARNING,
        "Master replied with wrong +FULLRESYNC syntax.");
        /* This is an unexpected condition, actually the +FULLRESYNC
        * reply means that the master supports PSYNC, but the reply
        * format seems wrong. To stay safe we blank the master
        * runid to make sure next PSYNCs will fail. */
        memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
    } else {
        // 拷貝runid
        memcpy(server.repl_master_runid, runid, offset-runid-1);
        server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
        server.repl_master_initial_offset = strtoll(offset,NULL,10);
        redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
        server.repl_master_runid,
        server.repl_master_initial_offset);
    }
    /* We are going to full resync, discard the cached master structure. */
    replicationDiscardCachedMaster();
    sdsfree(reply);
    return PSYNC_FULLRESYNC;
    }
    // 部分同步
    if (!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        redisLog(REDIS_NOTICE,
        "Successful partial resynchronization with master.");
        sdsfree(reply);
        // 緩存主機替代現(xiàn)有主機,且為PSYNC(部分同步) 做好準備
        replicationResurrectCachedMaster(fd);
        return PSYNC_CONTINUE;
    }
    /* If we reach this point we receied either an error since the master does
    * not understand PSYNC, or an unexpected reply from the master.
    * Reply with PSYNC_NOT_SUPPORTED in both cases. */
    // 接收到主機發(fā)出的錯誤信息
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log the unexpected event. */
        redisLog(REDIS_WARNING,
        "Unexpected reply to PSYNC from master: %s", reply);
    } else {
        redisLog(REDIS_NOTICE,
        "Master does not support PSYNC or is in "
        "error state (reply: %s)", reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;
}

下面 syncCommand() 摘取部分同步的部分:

// 主機SYNC 和PSYNC 命令處理函數(shù),會嘗試進行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......
    // 主機嘗試部分同步,允許則進行部分同步,會返回+CONTINUE,接著發(fā)送積壓空間
    /* Try a partial resynchronization if this is a PSYNC command.
    * If it fails, we continue with usual full resynchronization, however
    * when this happens masterTryPartialResynchronization() already
    * replied with:
    **
    +FULLRESYNC <runid> <offset>
    **
    So the slave knows the new runid and offset to try a PSYNC later
    * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 部分同步
    if (masterTryPartialResynchronization(c) == REDIS_OK) {
        server.stat_sync_partial_ok++;
        return; /* No full resync needed, return. */
    } else {
        // 部分同步失敗,會進行全同步,這時會收到來自客戶端的runid
        char *master_runid = c->argv[1]->ptr;
        /* Increment stats for failed PSYNCs, but only if the
        * runid is not "?", as this is used by slaves to force a full
        * resync on purpose when they are not albe to partially
        * resync. */
    if (master_runid[0] != '?') server.stat_sync_partial_err++;
    }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
        * of the replication protocol (like redis-cli --slave). Flag the client
        * so that we don't expect to receive REPLCONF ACK feedbacks. */
        c->flags |= REDIS_PRE_PSYNC_SLAVE;
    }
    // 執(zhí)行全同步:
    ......
}

主機雖然收到了來自從機的部分同步的請求,但主機并不一定會允許進行部分同步。在主機側,如果收到部分同步的請求,還需要驗證從機是否適合進行部分同步。

// 主機嘗試是否能進行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
**
On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;
    /* Is the runid of this master the same advertised by the wannabe slave
    * via PSYNC? If runid changed this master is a different instance and
    * there is no way to continue. */
    if (strcasecmp(master_runid, server.runid)) {
        // 當因為異常需要與主機斷開連接的時候,從機會暫存主機的狀態(tài)信息,以便
        // 下一次的部分同步。
        // 1)master_runid 是從機提供一個因緩存主機的runid,
        // 2)server.runid 是本機(主機)的runid。
        // 匹配失敗,說明是本機(主機)不是從機緩存的主機,這時候不能進行部分同步,
        // 只能進行全同步
        // "?" 表示從機要求全同步
        // 什么時候從機會要求全同步???
    /* Run id "?" is used by slaves that want to force a full resync. */
    if (master_runid[0] != '?') {
        redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
        "Runid mismatch (Client asked for '%s', I'm '%s')",
        master_runid, server.runid);
    } else {
        redisLog(REDIS_NOTICE,"Full resync requested by slave.");
    }
        goto need_full_resync;
    }
    // 從參數(shù)中解析整數(shù),整數(shù)是從機指定的偏移量
    /* We still have the data our slave is asking for? */
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
        REDIS_OK) goto need_full_resync;
        // 部分同步失敗的情況:
        // 1、不存在積壓空間
    if (!server.repl_backlog ||
        // 2、psync_offset 太過小,即從機錯過太多更新記錄,安全起見,實行全同步
        // 我們知道,積壓空間的大小是有限的,如果某個從機錯過的更新過多,將無法
        // 在積壓空間中找到更新的記錄
        psync_offset 越界
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
        // 經檢測,不滿足部分同步的條件,轉而進行全同步
    {
    redisLog(REDIS_NOTICE,
    "Unable to partial resync with the slave for lack of backlog "
    "(Slave request was: %lld).", psync_offset);
    if (psync_offset > server. ) {
        redisLog(REDIS_WARNING,
        "Warning: slave tried to PSYNC with an offset that is "
        "greater than the master replication offset.");
    }
    goto need_full_resync;
    }
    // 執(zhí)行部分同步:
    // 1)標記客戶端為從機
    // 2)通知從機準備接收數(shù)據。從機收到+CONTINUE 會做好準備
    // 3)開發(fā)發(fā)送數(shù)據
    /* If we reached this point, we are able to perform a partial resync:
    * 1) Set client state to make it a slave.
    * 2) Inform the client we can continue with +CONTINUE
    * 3) Send the backlog data (from the offset to the end) to the slave. */
    // 將連接的客戶端標記為從機
    c->flags |= REDIS_SLAVE;
    // 表示進行部分同步
    // #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
    // updates. */
    c->replstate = REDIS_REPL_ONLINE;
    // 更新ack 的時間
    c->repl_ack_time = server.unixtime;
    // 添加入從機鏈表
    listAddNodeTail(server.slaves,c);
    // 告訴從機可以進行部分同步,從機收到后會做相關的準備(注冊回調函數(shù))
    /* We can't use the connection buffers since they are used to accumulate
    * new commands at this stage. But we are sure the socket send buffer is
    * emtpy so this write will never fail actually. */
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    // 向從機寫積壓空間中的數(shù)據,積壓空間存儲有「更新緩存」
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    redisLog(REDIS_NOTICE,
    "Partial resynchronization request accepted. Sending %lld bytes of "
    "backlog starting from offset %lld.", psync_len, psync_offset);
    /* Note that we don't need to set the selected DB at server.slaveseldb
    * to -1 to force the master to emit SELECT, since the slave already
    * has this state from the previous connection with the master. */
    refreshGoodSlavesCount();
    return REDIS_OK; /* The caller can return, no full resync needed. */
    need_full_resync:
    ......
    // 向從機發(fā)送+FULLRESYNC runid repl_offset
}

緩存主機

從機因為某些原因,譬如網絡延遲(PING 超時,ACK 超時等),可能會斷開與主機的連接。這時候,從機會嘗試保存與主機連接的信息,譬如全局積壓空間數(shù)據偏移量等,以便下一次的部分同步,并且從機會再一次嘗試連接主機。注意一點,如果斷開的時間足夠長,部分同步肯定會失敗的。

void freeClient(redisClient *c) {
    listNode *ln;
    /* If this is mar