鍍金池/ 教程/ 大數(shù)據(jù)/ 訂閱發(fā)布機制
Redis 數(shù)據(jù)淘汰機制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲 session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機制

訂閱發(fā)布機制

兩種訂閱

Redis 提供兩個訂閱模式:頻道(channel)訂閱和 glob-style 模式(pattern)頻道訂閱。頻道訂閱容易理解,即CA(client A)向服務(wù)器訂閱了頻道 news,當 CB 向 news 發(fā)布消息的時候,CA 便能收到。

glob-style 模式(pattern)頻道訂閱,需要先解釋什么是 glob-style?舉一個簡單的例子,rm *.jpg linux 下這條命令刪除當前目錄下所有 jpg 圖片,所用到的是 glob-style 模式匹配,你可以將他理解為某種 style 的正則表達式;)

舉例,CA(client A)向服務(wù)器訂閱了頻道*.news

  • 當 CB 向 China.news 發(fā)布消息的時候,CA 能收到,
  • 當 CB 向 America.news 發(fā)布消息的時候,CA 能收到,
  • 當 CB 向 AV.news 發(fā)布消息的時候,CA 便能收到。

訂閱相關(guān)數(shù)據(jù)結(jié)構(gòu)

struct redisServer 和 struct redisClient 都維護了頻道和模式頻道,前者維護了所有頻道和訂閱頻道的客戶端,后者維護了客戶端自己訂閱的頻道。

struct redisServer {
    ......
    /* Pubsub */
    dict *pubsub_channels; /* Map channels to list of subscribed clients */
    list *pubsub_patterns; /* A list of pubsub_patterns */
    ......
}
typedef struct redisClient {
    ......
    // 用戶感興趣的頻道
    dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
    // 用戶感興趣的模式
    list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
    ......
} redisClient;
    // 模式頻道數(shù)據(jù)結(jié)構(gòu),list *pubsub_patterns 里的每個節(jié)點數(shù)據(jù)都是struct
    // pubsubPattern。
typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

頻道訂閱是一個 dict,每個 channel 被哈希進相應(yīng)的桶,每個 channel 對應(yīng)一個 clients,clients 都訂閱了此 channel。當有消息發(fā)布的時候,檢索 channel,遍歷 clients,發(fā)布消息。

http://wiki.jikexueyuan.com/project/redis/images/redis20.png" alt="" />

模式頻道訂閱是一個 list。當有消息發(fā)布的時候,channel 與 glob-style pattern 匹配,發(fā)布消息。

http://wiki.jikexueyuan.com/project/redis/images/redis21.png" alt="" />

訂閱過程

兩種訂閱模式是維護上述兩種數(shù)據(jù)結(jié)構(gòu)的過程,

// 訂閱頻道
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;
    // redisClient.pubsub_channels 中保存客戶端訂閱的所有頻道,可以查看客戶端
    // 訂閱了多少頻道以及客戶端是否訂閱某個頻道
    // server.pubsub_channels 中保存所有的頻道和每個頻道的訂閱客戶端,可以將
    // 消息發(fā)布到訂閱客戶端
    // 將頻道加入redisClient.pubsub_channels
    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        // 在服務(wù)器負責(zé)維護的channel->clients 哈希表中尋找指定的頻道
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        // 未找到客戶端指定的頻道,需要創(chuàng)建
    if (de == NULL) {
        clients = listCreate();
        // 將頻道加入server.pubsub_channels
        dictAdd(server.pubsub_channels,channel,clients);
        incrRefCount(channel);
        // 找到客戶端指定的頻道,直接獲取這個頻道
    } else {
        clients = dictGetVal(de);
    }
        // 將客戶端添加到鏈表的尾部
        listAddNodeTail(clients,c);
    }
    // 通知客戶端
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
    c->pubsub_patterns));
    return retval;
}
// 訂閱模式頻道
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded,
or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    int retval = 0;
    // redisClient.pubsub_patterns 中保存客戶端訂閱的所有模式頻道,可以查看
    // 客戶端訂閱了多少頻道以及客戶端是否訂閱某個頻道
    // server.pubsub_patterns 中保存所有的模式頻道和每個模式頻道的訂閱客戶端
    // ,可以將消息發(fā)布到訂閱客戶端
    // 未訂閱模式頻道,插入
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        // 將模式頻道加入redisClient.pubsub_patterns
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        // 將模式頻道加入server.pubsub_patterns
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    // 通知客戶端
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
    c->pubsub_patterns));
    return retval;
}

取消訂閱的過程則相反。

消息發(fā)布

發(fā)布消息的過程則遍歷上述兩個數(shù)據(jù)結(jié)構(gòu)(dict 和list),并將消息發(fā)布到匹配頻道的所有客戶端。

// 發(fā)布消息
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;
    // 發(fā)布消息有兩個步驟,
    // 指定頻道的所有訂閱者發(fā)布消息
    // 指定模式頻道的所有訂閱者發(fā)布消息
    // 
    // 尋找頻道
    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    // 向頻道所有訂閱者發(fā)布信息
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
        listRewind(list,&li);
    while ((ln = listNext(&li)) != NULL) {
        redisClient *c = ln->value;
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.messagebulk);
        addReplyBulk(c,channel);
        addReplyBulk(c,message);
        receivers++;
    }
}
// 
// 進行g(shù)lob-style 模式匹配
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
    listRewind(server.pubsub_patterns,&li);
    channel = getDecodedObject(channel);
    while ((ln = listNext(&li)) != NULL) {
        pubsubPattern *pat = ln->value;
        // 匹配成功,向訂閱者發(fā)布消息
        if (stringmatchlen((char*)pat->pattern->ptr,
                sdslen(pat->pattern->ptr),
                (char*)channel->ptr,
                sdslen(channel->ptr),0)) {
        addReply(pat->client,shared.mbulkhdr[4]);
        addReply(pat->client,shared.pmessagebulk);
        addReplyBulk(pat->client,pat->pattern);
        addReplyBulk(pat->client,channel);
        addReplyBulk(pat->client,message);
        receivers++;
        }
      }
   decrRefCount(channel);
  }
return receivers;
}

注意, 只要客戶端訂閱了頻道, 除了SUBCRIBE,UNSUBCRIBE,PSUBCRIBE,PSUBCRIBE,就不能執(zhí)行其他命令。

int processCommand(redisClient *c) {
    ......
    // 在訂閱發(fā)布模式下,只允許處理SUBSCRIBE 或者UNSUBSCRIBE 命令
    // 從下面的檢測條件可以看出:只要存在redisClient.pubsub_channels 或者
    // redisClient.pubsub_patterns,就代表處于訂閱發(fā)布模式下
    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
        &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed
                "in this context");
        return REDIS_OK;
        }
    ......
}