鍍金池/ 教程/ 大數(shù)據(jù)/ Redis 集群(下)
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開(kāi)始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡(jiǎn)介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

Redis 集群(下)

剛好 Redis 集群也 release 了,也順帶在這里展開(kāi)一下。redis cluster 就是想要讓一群的節(jié)點(diǎn)實(shí)現(xiàn)自治,有自我修復(fù)的功能,數(shù)據(jù)分片和負(fù)載均衡。

數(shù)據(jù)結(jié)構(gòu)

基本上集群中的每一個(gè)節(jié)點(diǎn)都需要知道其他節(jié)點(diǎn)的情況,從而,如果網(wǎng)絡(luò)中有五個(gè)節(jié)點(diǎn)就下面的圖:

http://wiki.jikexueyuan.com/project/redis/images/d.png" alt="" />

其中每條線都代表雙向聯(lián)通。特別的,如果 redis master 還配備了 replica,圖畫(huà)起來(lái)會(huì)稍微復(fù)雜一點(diǎn)。

redis cluster 中有幾個(gè)比較重要的數(shù)據(jù)結(jié)構(gòu),一個(gè)用以描述節(jié)點(diǎn) struct clusterNode,一個(gè)用以描述集群的狀況 struct clusterState。

節(jié)點(diǎn)的信息包括:本身的一些屬性,還有它的主從節(jié)點(diǎn),心跳和主從復(fù)制信息,和與該節(jié)點(diǎn)的連接上下文。

typedef struct clusterNode {
    mstime_t ctime; /* Node object creation time. */
    char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
    int flags; /* REDIS_NODE_... */
    uint64_t configEpoch; /* Last configEpoch observed for this node */
    // 該節(jié)點(diǎn)會(huì)處理的slot
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
    int numslots; /* Number of slots handled by this node */
    // 從機(jī)信息
    int numslaves; /* Number of slave nodes, if this is a master */
    // 從機(jī)節(jié)點(diǎn)數(shù)組
    struct clusterNode **slaves; /* pointers to slave nodes */
    // 主機(jī)節(jié)點(diǎn)數(shù)組
    struct clusterNode *slaveof; /* pointer to the master node */
    // 一些有用的時(shí)間點(diǎn)
    mstime_t ping_sent; /* Unix time we sent latest ping */
    mstime_t pong_received; /* Unix time we received the pong */
    mstime_t fail_time; /* Unix time when FAIL flag was set */
    mstime_t voted_time; /* Last time we voted for a slave of this master */
    mstime_t repl_offset_time; /* Unix time we received offset for this node */
    long long repl_offset; /* Last known repl offset for this node. */
    // 最近被記錄的地址和端口
    char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */
    int port; /* Latest known port of this node */
    // 與該節(jié)點(diǎn)的連接上下文
    clusterLink *link; /* TCP/IP link with this node */
    list *fail_reports; /* List of nodes signaling this as failing */
    } clusterNode;
    集群的狀態(tài)包括下面的信息:
typedef struct clusterState {
    clusterNode *myself; /* This node */
    // 配置版本
    uint64_t currentEpoch;
    // 集群的狀態(tài)
    int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
    // 存儲(chǔ)所有節(jié)點(diǎn)的哈希表
    int size; /* Num of master nodes with at least one slot */
    dict *nodes; /* Hash table of name -> clusterNode structures */
    // 黑名單節(jié)點(diǎn),一段時(shí)間內(nèi)不會(huì)再加入到集群中
    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
    // slot 數(shù)據(jù)正在遷移到migrating_slots_to[slot] 節(jié)點(diǎn)
    clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
    // slot 數(shù)據(jù)正在從importing_slots_from[slot] 遷移到本機(jī)
    clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
    // slot 數(shù)據(jù)由slots[slot] 節(jié)點(diǎn)來(lái)處理
    clusterNode *slots[REDIS_CLUSTER_SLOTS];
    // slot 到key 的一個(gè)映射
    zskiplist *slots_to_keys;
    // 記錄了故障修復(fù)的信息
    /* The following fields are used to take the slave state on elections. */
    mstime_t failover_auth_time; /* Time of previous or next election. */
    int failover_auth_count; /* Number of votes received so far. */
    int failover_auth_sent; /* True if we already asked for votes. */
    int failover_auth_rank; /* This slave rank for current auth request. */
    uint64_t failover_auth_epoch; /* Epoch of the current election. */
    int cant_failover_reason; /* Why a slave is currently not able to
    failover. See the CANT_FAILOVER_* macros. */
    // 人工故障修復(fù)的一些信息
    ......
} clusterState;

如上,在正常的 Redis 集群中的任何一個(gè)節(jié)點(diǎn)都能感知到其他節(jié)點(diǎn)。里面的細(xì)節(jié)有很多,就不一一解釋了,當(dāng)遇到的時(shí)候再有需要解釋一下。

上面頻繁出現(xiàn) slot 單詞。之前我們學(xué)哈希表的時(shí)候,可以把 slot 理解為哈希表中的桶(bucket)。為什么需要slot?這和 redis cluster 的數(shù)據(jù)分區(qū)和訪問(wèn)有關(guān)。建議大概看完 Redis 對(duì)數(shù)據(jù)結(jié)構(gòu)后,接著看 clusterCommand() 這個(gè)函數(shù),由此知道 redis cluster 能提供哪些服務(wù)和功能。接著往下看。

數(shù)據(jù)訪問(wèn)

在 http 有 301 狀態(tài)碼:301 Moved Permanently,它表示用戶所要訪問(wèn)的內(nèi)容已經(jīng)遷移到一個(gè)地址了,需要向新的地址發(fā)出請(qǐng)求。redis cluster 很明顯也是這么做的。在前面講到,redis cluster 中的每一個(gè)節(jié)點(diǎn)都需要知道其他節(jié)點(diǎn)的情況,這里就包括其他節(jié)點(diǎn)負(fù)責(zé)處理哪些鍵值對(duì)。

在主函數(shù)中,Redis 會(huì)檢測(cè)在啟用集群模式的情況下,會(huì)檢測(cè)命令中指定的 key 是否該由自己來(lái)處理,如果不是的話,會(huì)返回一個(gè)類似于重定向的錯(cuò)誤返回到客戶端。而“是否由自己來(lái)處理”就是看 hash(key) 值是否落在自己所負(fù)責(zé)的 slot 中。

http://wiki.jikexueyuan.com/project/redis/images/d1.png" alt="" />

typedef struct clusterNode {
    ......
    // 該節(jié)點(diǎn)會(huì)處理的slot
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
    int numslots; /* Number of slots handled by this node */
    ......
} clusterNode;

可能會(huì)有疑問(wèn):這樣的數(shù)據(jù)訪問(wèn)機(jī)制在不是會(huì)浪費(fèi)一個(gè)請(qǐng)求嗎?確實(shí),如果直接向集群中的節(jié)點(diǎn)盲目訪問(wèn)一個(gè) key 的話,確實(shí)需要發(fā)起兩個(gè)請(qǐng)求。為此,redis cluster 配備了 slot 表,用戶通過(guò) slots 命令先向集群請(qǐng)求這個(gè) slot 表,得到這個(gè)表可以獲取哪些節(jié)點(diǎn)負(fù)責(zé)哪些 slot,繼而客戶端可以訪問(wèn)再訪問(wèn)集群中的數(shù)據(jù)。這樣,就可以在大多數(shù)的場(chǎng)景下節(jié)省一個(gè)請(qǐng)求,直達(dá)目標(biāo)節(jié)點(diǎn)。當(dāng)然,這個(gè) slot 表是隨時(shí)出現(xiàn)變更的,所以客戶端不能夠一 本萬(wàn)利一直使用這個(gè) slot 表,可以實(shí)現(xiàn)一個(gè)定時(shí)器,超時(shí)后再向集群節(jié)點(diǎn)獲取 slot 表。

你可以閱讀 getNodeByQuery(),流程不難。

新的節(jié)點(diǎn)

Redis 剛剛啟動(dòng)時(shí)候會(huì)檢測(cè)集群配置文件中是否有預(yù)配置好的節(jié)點(diǎn),如果有的話,會(huì)添加到節(jié)點(diǎn)哈希表中,在適當(dāng)?shù)臅r(shí)候連接這個(gè)節(jié)點(diǎn),并和它打招呼–握手。

// 加載集群配置文件
int clusterLoadConfig(char *filename) {
    ......
    // 如果該節(jié)點(diǎn)不在哈希表中,會(huì)添加
    /* Create this node if it does not exist */
    n = clusterLookupNode(argv[0]);
    if (!n) {
        n = createClusterNode(argv[0],0);
        clusterAddNode(n);
    }
    /* Address and port */
    if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
    *p = '\0';
    memcpy(n->ip,argv[1],strlen(argv[1])+1);
    n->port = atoi(p+1);
    ......
}

注意,加載配置文件后,不會(huì)立即進(jìn)入握手階段。 另外兩個(gè)新增節(jié)點(diǎn)的時(shí)機(jī),當(dāng)其他節(jié)點(diǎn)向該節(jié)點(diǎn)打招呼時(shí)候,該節(jié)點(diǎn)會(huì)記錄下對(duì)端節(jié)點(diǎn),以及對(duì)端所知悉的節(jié)點(diǎn);Redis 管理人員告知,Redis 管理人員可以通過(guò)普通的 redis meet 命令,相當(dāng)于是人工將某個(gè)節(jié)點(diǎn)加入到集群中。

http://wiki.jikexueyuan.com/project/redis/images/d2.png" alt="" />

http://wiki.jikexueyuan.com/project/redis/images/d3.png" alt="" />

當(dāng)和其他節(jié)點(diǎn)開(kāi)始握手時(shí),會(huì)調(diào)用 clusterStartHandshake(),它只會(huì)初始化握手的初始信息,并不會(huì)立刻向其他節(jié)點(diǎn)發(fā)起握手,:按照 Redis 的習(xí)慣是在集群定時(shí)處理函數(shù) clusterCron() 中。

int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[REDIS_IP_STR_LEN];
    struct sockaddr_storage sa;
    // 分IPV4 和IPV6 兩種情況
    /* IP sanity check */
    if (inet_pton(AF_INET,ip,
        &(((struct sockaddr_in *)&sa)->sin_addr)))
    {
        sa.ss_family = AF_INET;
    } else if (inet_pton(AF_INET6,ip,
        &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
    {
        sa.ss_family = AF_INET6;
    } else {
        errno = EINVAL;
        return 0;
    }
    // 端口有效性檢測(cè)
    /* Port sanity check */
    if (port <= 0 || port > (65535-REDIS_CLUSTER_PORT_INCR)) {
        errno = EINVAL;
        return 0;
    }
    // 標(biāo)準(zhǔn)化ip
    /* Set norm_ip as the normalized string representation of the node
    * IP address. */
    memset(norm_ip,0,REDIS_IP_STR_LEN);
    if (sa.ss_family == AF_INET)
        inet_ntop(AF_INET,
            (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
            norm_ip,REDIS_IP_STR_LEN);
    else
        inet_ntop(AF_INET6,
            (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
            norm_ip,REDIS_IP_STR_LEN);
    // 如果這個(gè)節(jié)點(diǎn)正在握手狀態(tài),則不需要重復(fù)進(jìn)入,直接退出
    if (clusterHandshakeInProgress(norm_ip,port)) {
        errno = EAGAIN;
        return 0;
    }
    // 創(chuàng)建一個(gè)新的節(jié)點(diǎn),并加入到節(jié)點(diǎn)哈希表中
    /* Add the node with a random address (NULL as first argument to
    * createClusterNode()). Everything will be fixed during the
    * handshake. */
    n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}

其他節(jié)點(diǎn)收到后在 clusterProcessGossipSection() 中將新的節(jié)點(diǎn)添加到哈希表中。

心跳機(jī)制

還是那句話,redis cluster 中的每一個(gè)節(jié)點(diǎn)都需要知道其他節(jié)點(diǎn)的情況。要達(dá)到這個(gè)目標(biāo),必須有一個(gè)心跳機(jī)制來(lái)保持每個(gè)節(jié)點(diǎn)是可達(dá)的,監(jiān)控的,并且節(jié)點(diǎn)的信息變更,也可以通過(guò)心跳中的數(shù)據(jù)包來(lái)傳遞。這樣就很容易理解Redis 的心跳機(jī)制是怎么實(shí)現(xiàn)的。這有點(diǎn)類似于主從復(fù)制中的實(shí)現(xiàn)方法,總之就是一個(gè)心跳。在 redis cluster 只,這種心跳又叫 gossip 機(jī)制。

集群之間交互信息是用其內(nèi)部專用連接的。redis cluster 中的每一個(gè)節(jié)點(diǎn)都監(jiān)聽(tīng)了一個(gè)集群專用的端口,專門(mén)用作集群節(jié)點(diǎn)之間的信息交換。在 redis cluster 初始化函數(shù) clusterInit() 中監(jiān)聽(tīng)了該端口,并在事件中心注冊(cè)了 clusterAcceptHandler()。 從 clusterAcceptHandler()的邏輯來(lái)看,當(dāng)有新的連接到來(lái)時(shí),會(huì)為新的連接注冊(cè) clusterReadHandler()回調(diào)函數(shù)。這一點(diǎn)和 redis 本身初始化的行為是一致的。

clusterSendPing() 中發(fā)送心跳數(shù)據(jù)。發(fā)送的包括:所知節(jié)點(diǎn)的名稱,ip 地址等。這樣,改節(jié)點(diǎn)就將主機(jī)所知的信息傳播到了其他的節(jié)點(diǎn)。注意,從 clusterSendPing() 的實(shí)現(xiàn)來(lái)看,redis cluster 并不是一開(kāi)始就向所有的節(jié)點(diǎn)發(fā)送心跳數(shù)據(jù),而選取幾個(gè)節(jié)點(diǎn)發(fā)送,因?yàn)?redis 考慮到集群網(wǎng)的形成并不需要每個(gè)節(jié)點(diǎn)向像集群中的所有其他節(jié)點(diǎn)發(fā)送 ping。

故障修復(fù)

故障修復(fù)曾經(jīng)在主從復(fù)制中提到過(guò)。redis cluster 的故障修復(fù)分兩種途徑,一種是集群自治實(shí)現(xiàn)的故障修復(fù),一種是人工觸發(fā)的故障修復(fù)。

集群自治實(shí)現(xiàn)的故障修復(fù)中,是由從機(jī)發(fā)起的。上面所說(shuō),集群中的每個(gè)節(jié)點(diǎn)都需要和其他節(jié)點(diǎn)保持連接。從機(jī)如果檢測(cè)到主機(jī)節(jié)點(diǎn)出錯(cuò)了(標(biāo)記為 REDIS_NODE_FAIL),會(huì)嘗試進(jìn)行主從切換。在 cluster 定時(shí)處理函數(shù)中,有一段只有從機(jī)才會(huì)執(zhí)行的代碼段:

// 集群定時(shí)處理函數(shù)
/* This is executed 10 times every second */
void clusterCron(void) {
    ......
    // 從機(jī)才需要執(zhí)行下面的邏輯
    if (nodeIsSlave(myself)) {
        ......
        // 從機(jī)-> 主機(jī)替換
        clusterHandleSlaveFailover();
        ......
    }
    ......
}

從機(jī)的 clusterCron() 會(huì)調(diào)用 clusterHandleSlaveFailover() 已決定是否需要執(zhí)行故障修復(fù)。通常,故障修復(fù)的觸發(fā)點(diǎn)就是在其主機(jī)被標(biāo)記為出錯(cuò)節(jié)點(diǎn)的時(shí)候。

故障修復(fù)的協(xié)議

在決定故障修復(fù)后,會(huì)開(kāi)始進(jìn)行協(xié)商是否可以將自己升級(jí)為主機(jī)。

// 主機(jī)已經(jīng)是一個(gè)出錯(cuò)節(jié)點(diǎn)了,自己作為從機(jī)可以升級(jí)為主機(jī)
void clusterHandleSlaveFailover(void) {
    ......
    // 故障修復(fù)超時(shí),重新啟動(dòng)故障修復(fù)
    if (auth_age > auth_retry_time) { // 兩次故障修復(fù)間隔不能過(guò)短
        // 更新一些時(shí)間
        ......
        redisLog(REDIS_WARNING,
        "Start of election delayed for %lld milliseconds "
        "(rank #%d, offset %lld).",
        server.cluster->failover_auth_time - mstime(),
        server.cluster->failover_auth_rank,
        replicationGetSlaveOffset());
        // 告知其他從機(jī)
        /* Now that we have a scheduled election, broadcast our offset
        * to all the other slaves so that they'll updated their offsets
        * if our offset is better. */
        clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
        return;
    }
    ......
    // 開(kāi)頭投票
    /* Ask for votes if needed. */
    if (server.cluster->failover_auth_sent == 0) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        redisLog(REDIS_WARNING,"Starting a failover election for epoch %llu.",
        (unsigned long long) server.cluster->currentEpoch);
        clusterRequestFailoverAuth();
        server.cluster->failover_auth_sent = 1;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
        CLUSTER_TODO_UPDATE_STATE|
        CLUSTER_TODO_FSYNC_CONFIG);
        return; /* Wait for replies. */
    }
}

上面有兩個(gè)關(guān)鍵的函數(shù):clusterBroadcastPong() 和clusterRequestFailoverAuth()。

在 clusterBroadcastPong()中,會(huì)向其他屬于同一主從關(guān)系的其他從機(jī)發(fā)送 pong,以傳遞主機(jī)已經(jīng)出錯(cuò)的信息。

clusterRequestFailoverAuth() 中,會(huì)向集群中的所有其他節(jié)點(diǎn)發(fā)送 CLUSTERMSG_-TYPE_FAILOVER_AUTH_REQUEST 信令,意即詢問(wèn)是否投票。

那收到這個(gè)信令的節(jié)點(diǎn),是否會(huì)向源節(jié)點(diǎn)投票呢?先來(lái)看看 FAILOVERAUTH-REQUEST 信令中帶有什么數(shù)據(jù),順著 clusterRequestFailoverAuth() 下去,會(huì)找到 cluster-BuildMessageHdr() 函數(shù),它打包了一些數(shù)據(jù)。這里主要包括:

  1. RCmb 四個(gè)字符,相當(dāng)于是 redis cluster 信令的頭部校驗(yàn)值,
  2. type,命令號(hào),這是屬于什么信令
  3. sender info,發(fā)送信令節(jié)點(diǎn)的信息
  4. 發(fā)送信令節(jié)點(diǎn)的配置版本號(hào)
  5. 主從復(fù)制偏移量

http://wiki.jikexueyuan.com/project/redis/images/cd.png" alt="" />

在心跳機(jī)制那一節(jié)講過(guò),集群節(jié)點(diǎn)會(huì)為與其他節(jié)點(diǎn)的連接注冊(cè)clusterReadHandler() 回調(diào)函數(shù),F(xiàn)AILOVER_AUTH_REQUEST 信令的處理也在里面,對(duì)應(yīng)的是 clusterSendFailover-AuthIfNeeded() 處理函數(shù),在這里決定是否投對(duì)端節(jié)點(diǎn)一票。這里的決定因素有幾個(gè):配置版本號(hào),節(jié)點(diǎn)本身和投票時(shí)間。

1.如果需要投票,索取投票的節(jié)點(diǎn)當(dāng)前版本號(hào)必須比當(dāng)前記錄的版本一樣,這樣才有權(quán)索取投票;新的版本號(hào)必須是最新的。第二點(diǎn),可能比較繞,譬如下面的場(chǎng)景,slave 是無(wú)法獲得其他主機(jī)的投票的,other slave 才可以。這里的意思是,如果一個(gè)從機(jī)想要升級(jí)為主機(jī),它與它的主機(jī)必須保持狀態(tài)一致。

http://wiki.jikexueyuan.com/project/redis/images/cd1.png" alt="" />

2.索取投票的節(jié)點(diǎn)必須是從機(jī)節(jié)點(diǎn)。這是當(dāng)然,因?yàn)楣收闲迯?fù)是由從機(jī)發(fā)起的

3.最后一個(gè)是投票的時(shí)間,因?yàn)楫?dāng)一個(gè)主機(jī)有多個(gè)從機(jī)的時(shí)候,多個(gè)從機(jī)都會(huì)發(fā)起故障修復(fù),一段時(shí)間內(nèi)只有一個(gè)從機(jī)會(huì)進(jìn)行故障修復(fù),其他的會(huì)被推遲。

這三點(diǎn)都在 clusterSendFailoverAuthIfNeeded() 中都有所體現(xiàn)。

當(dāng)都滿足了上述要求過(guò)后,即可開(kāi)始投票:

// 決定是否投票,redis cluster 將根據(jù)配置的版本號(hào)決定是否投票
/* Vote for the node asking for our vote if there are the conditions. */
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
    ......
    // 投票走起
    /* We can vote for this slave. */
    clusterSendFailoverAuth(node);
    server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
    node->slaveof->voted_time = mstime(); // 更新投票的時(shí)間
    redisLog(REDIS_WARNING, "Failover auth granted to %.40s for epoch %llu",
    node->name, (unsigned long long) server.cluster->currentEpoch);
}

投票函數(shù) lusterSendFailoverAuth() 只是放一個(gè) CLUSTERMSG_TYPEFAILOVER-AUTH_ACK 信令到達(dá)索取投票的從機(jī)節(jié)點(diǎn),從而該從機(jī)獲取了一票。讓我們?cè)倩氐剿魅⊥镀钡膹臋C(jī)節(jié)點(diǎn)接下來(lái)會(huì)怎么做。

// 主機(jī)已經(jīng)是一個(gè)出錯(cuò)節(jié)點(diǎn)了,自己作為從機(jī)可以升級(jí)為主機(jī)
void clusterHandleSlaveFailover(void) {
    ......
    // 獲得的選票必須是集群節(jié)點(diǎn)數(shù)的一般以上
    /* Check if we reached the quorum. */
    if (server.cluster->failover_auth_count >= needed_quorum) {
        /* We have the quorum, we can finally failover the master. */
        redisLog(REDIS_WARNING,
        "Failover election won: I'm the new master.");
        /* Update my configEpoch to the epoch of the election. */
    if (myself->configEpoch < server.cluster->failover_auth_epoch) {
        myself->configEpoch = server.cluster->failover_auth_epoch;
        redisLog(REDIS_WARNING,
        "configEpoch set to %llu after successful failover",
        (unsigned long long) myself->configEpoch);
    }
    // 正式轉(zhuǎn)換為主機(jī),代替主機(jī)的功能
    /* Take responsability for the cluster slots. */
    clusterFailoverReplaceYourMaster();
    } else {
        clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_WAITING_VOTES);
    }
}

從機(jī)在獲取集群節(jié)點(diǎn)數(shù)量半數(shù)以上的投票時(shí),就可以正式升級(jí)為主機(jī)了。來(lái)回顧一下投票的全過(guò)程:

http://wiki.jikexueyuan.com/project/redis/images/cd2.png" alt="" />

clusterFailoverReplaceYourMaster() 就是將其自身的配置從從機(jī)更新到主機(jī),最后廣播給所有的節(jié)點(diǎn):我轉(zhuǎn)正了。實(shí)際上,是發(fā)送一個(gè) pong。

redis cluster 還提供了一種人工故障修復(fù)的模式,管理人員可以按需使用這些功能。你可以從 clusterCommand() 下找到人工故障修復(fù)流程開(kāi)始執(zhí)行的地方:

1.cluster failover takeover. 會(huì)強(qiáng)制將從機(jī)升級(jí)為主機(jī),不需要一個(gè)投票的過(guò)程。

2.cluster failover force. 會(huì)強(qiáng)制啟用故障修復(fù),這和上面講的故障修復(fù)過(guò)程一樣。如果你留意 clusterHandleSlaveFailover() 中的處理邏輯的話,實(shí)際 cluster force 也是在其中處理的,同樣需要一個(gè)投票的過(guò)程。

3.cluster failover. 默認(rèn)的模式,會(huì)先告知主機(jī)需要開(kāi)始進(jìn)行故障修復(fù)流程,主機(jī)被告知會(huì)停止服務(wù)。之后再走接下來(lái)的主從修復(fù)的流程。

// cluster 命令處理。
void clusterCommand(redisClient *c) {
    ......
    // 啟動(dòng)故障修復(fù)
    } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
        (c->argc == 2 || c->argc == 3))
    {
    ......
    if (takeover) {
        ......
        // 產(chǎn)生一個(gè)新的配置版本號(hào)
        clusterBumpConfigEpochWithoutConsensus();
        // 直接將自己升級(jí)為主機(jī),接著通知到所有的節(jié)點(diǎn)
        clusterFailoverReplaceYourMaster();
    } else if (force) {
        ......
        // 直接標(biāo)記為可以開(kāi)始進(jìn)行故障修復(fù)了,并不用告知主機(jī)
        server.cluster->mf_can_start = 1;
    } else {
        // 先通知我的主機(jī)開(kāi)始人工故障修復(fù),再執(zhí)行接下來(lái)的故障修復(fù)流程
        clusterSendMFStart(myself->slaveof);
        }
        addReply(c,shared.ok);
    }
    ......
}

4.發(fā)送信令節(jié)點(diǎn)的配置版本號(hào)

5.主從復(fù)制偏移量

人工故障修復(fù)模式,和自治實(shí)現(xiàn)的故障修復(fù)模式最大的區(qū)別在于對(duì)于從機(jī)來(lái)說(shuō),其主機(jī)是否可達(dá)。人工故障修復(fù)模式,允許主機(jī)可達(dá)的情況下,實(shí)現(xiàn)故障修復(fù)。因此,相比自治的故障修復(fù),人工的還會(huì)多一道工序:主從復(fù)制的偏移量相等過(guò)后,才開(kāi)始進(jìn)行故障修復(fù)的過(guò)程。

從下面兩種模式的處理來(lái)看,有很明顯的區(qū)別:

// cluster 命令處理
void clusterCommand(redisClient *c) {
    ......
    // 啟動(dòng)故障修復(fù)
    } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
        (c->argc == 2 || c->argc == 3))
    {
        ......
    if (takeover) {
        ......
        // 產(chǎn)生一個(gè)新的配置版本號(hào)
        clusterBumpConfigEpochWithoutConsensus();
        // 直接將自己升級(jí)為主機(jī),接著通知到所有的節(jié)點(diǎn)
        clusterFailoverReplaceYourMaster();
    } else if (force) {
        ......
        // 直接標(biāo)記為可以開(kāi)始進(jìn)行故障修復(fù)了,并不用告知主機(jī)
        server.cluster->mf_can_start = 1;
    } else {
        // 先通知我的主機(jī)開(kāi)始人工故障修復(fù),再執(zhí)行接下來(lái)的故障修復(fù)流程
        clusterSendMFStart(myself->slaveof);
        }
    addReply(c,shared.ok);
    }
}
  1. takeover 模式直接將自己升級(jí)為主機(jī)
  2. force 模式直接進(jìn)入故障修復(fù)模式
  3. 默認(rèn)模式會(huì)先告知(clusterSendMFStart())主機(jī),接著再進(jìn)行故障修復(fù)流程

來(lái)看看人工故障修復(fù)模式的狀態(tài)機(jī) clusterHandleManualFailover(),這個(gè)函數(shù)只會(huì)在 clusterCron() 中調(diào)用:

// 人工恢復(fù)狀態(tài)機(jī), 只在clusterCron() 中調(diào)用
void clusterHandleManualFailover(void) {
    /* Return ASAP if no manual failover is in progress. */
    if (server.cluster->mf_end == 0) return;
        /* If mf_can_start is non-zero, the failover was already triggered so the
        * next steps are performed by clusterHandleSlaveFailover(). */
    if (server.cluster->mf_can_start) return;
    if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
    if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
        /* Our replication offset matches the master replication offset
        * announced after clients were paused. We can start the failover. */
        server.cluster->mf_can_start = 1;
        redisLog(REDIS_WARNING,
        "All master replication stream processed, "
        "manual failover can start.");
    }
}

主要的幾個(gè)變量這里解說(shuō)一下:

  1. mf_end:在觸發(fā)人工故障修復(fù)的時(shí)候就會(huì)被設(shè)置
  2. mf_master_offset:從機(jī)需要等待主機(jī)發(fā)送主從復(fù)制偏移量,如上所說(shuō),從機(jī)升級(jí)為 主機(jī),需要和主機(jī)的偏移量相等
  3. mf_can_start:主從機(jī)偏移量相等時(shí),就可以進(jìn)行故障修復(fù)了

自治故障修復(fù)和人工故障修復(fù)流程都是在 clusterHandleSlaveFailover() 中開(kāi)始執(zhí)行的。這里不再?gòu)?fù)述。

這里大概總結(jié)一下人工故障修復(fù)默認(rèn)模式的流程:

http://wiki.jikexueyuan.com/project/redis/images/cd3.png" alt="" />

數(shù)據(jù)遷移

在之前有講過(guò) migrate 系列的命令,即數(shù)據(jù)遷移。在 redis cluster 中,搬遷 slot 的時(shí)候,就會(huì)用到 migrate 系列的命令。

http://wiki.jikexueyuan.com/project/redis/images/cd5.png" alt="" />

為了管理連接,redis cluster 還實(shí)現(xiàn)了長(zhǎng)連接的管理,你可以在 migrateGetSocket() 中查看它的實(shí)現(xiàn)。

在集群狀態(tài)結(jié)構(gòu)體中存儲(chǔ)了兩個(gè)與數(shù)據(jù)遷移的數(shù)據(jù):

typedef struct clusterState {
    ......
    // slot 數(shù)據(jù)正在遷移到migrating_slots_to[slot] 節(jié)點(diǎn)
    clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
    // slot 數(shù)據(jù)正在從importing_slots_from[slot] 遷移到本機(jī)
    clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
    ......
} clusterState;

這些信息在數(shù)據(jù)訪問(wèn)的時(shí)候會(huì)有用。

總結(jié)

這篇文章依據(jù)筆者感興趣的幾個(gè)問(wèn)題分了幾個(gè)大的部分介紹 redis cluster,一些小的細(xì)節(jié)大家可以在源碼中尋找答案。

上一篇:分布式鎖下一篇:RDB 持久化策略