Redis 提供兩個訂閱模式:頻道(channel)訂閱和 glob-style 模式(pattern)頻道訂閱。頻道訂閱容易理解,即CA(client A)向服務(wù)器訂閱了頻道 news,當 CB 向 news 發(fā)布消息的時候,CA 便能收到。
glob-style 模式(pattern)頻道訂閱,需要先解釋什么是 glob-style?舉一個簡單的例子,rm *.jpg linux
下這條命令刪除當前目錄下所有 jpg 圖片,所用到的是 glob-style 模式匹配,你可以將他理解為某種 style 的正則表達式;)
舉例,CA(client A)向服務(wù)器訂閱了頻道*.news
:
struct redisServer 和 struct redisClient 都維護了頻道和模式頻道,前者維護了所有頻道和訂閱頻道的客戶端,后者維護了客戶端自己訂閱的頻道。
struct redisServer {
......
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
......
}
typedef struct redisClient {
......
// 用戶感興趣的頻道
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
// 用戶感興趣的模式
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
......
} redisClient;
// 模式頻道數(shù)據(jù)結(jié)構(gòu),list *pubsub_patterns 里的每個節(jié)點數(shù)據(jù)都是struct
// pubsubPattern。
typedef struct pubsubPattern {
redisClient *client;
robj *pattern;
} pubsubPattern;
頻道訂閱是一個 dict,每個 channel 被哈希進相應(yīng)的桶,每個 channel 對應(yīng)一個 clients,clients 都訂閱了此 channel。當有消息發(fā)布的時候,檢索 channel,遍歷 clients,發(fā)布消息。
http://wiki.jikexueyuan.com/project/redis/images/redis20.png" alt="" />
模式頻道訂閱是一個 list。當有消息發(fā)布的時候,channel 與 glob-style pattern 匹配,發(fā)布消息。
http://wiki.jikexueyuan.com/project/redis/images/redis21.png" alt="" />
兩種訂閱模式是維護上述兩種數(shù)據(jù)結(jié)構(gòu)的過程,
// 訂閱頻道
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
// redisClient.pubsub_channels 中保存客戶端訂閱的所有頻道,可以查看客戶端
// 訂閱了多少頻道以及客戶端是否訂閱某個頻道
// server.pubsub_channels 中保存所有的頻道和每個頻道的訂閱客戶端,可以將
// 消息發(fā)布到訂閱客戶端
// 將頻道加入redisClient.pubsub_channels
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
// 在服務(wù)器負責(zé)維護的channel->clients 哈希表中尋找指定的頻道
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
// 未找到客戶端指定的頻道,需要創(chuàng)建
if (de == NULL) {
clients = listCreate();
// 將頻道加入server.pubsub_channels
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
// 找到客戶端指定的頻道,直接獲取這個頻道
} else {
clients = dictGetVal(de);
}
// 將客戶端添加到鏈表的尾部
listAddNodeTail(clients,c);
}
// 通知客戶端
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
c->pubsub_patterns));
return retval;
}
// 訂閱模式頻道
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded,
or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
int retval = 0;
// redisClient.pubsub_patterns 中保存客戶端訂閱的所有模式頻道,可以查看
// 客戶端訂閱了多少頻道以及客戶端是否訂閱某個頻道
// server.pubsub_patterns 中保存所有的模式頻道和每個模式頻道的訂閱客戶端
// ,可以將消息發(fā)布到訂閱客戶端
// 未訂閱模式頻道,插入
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
// 將模式頻道加入redisClient.pubsub_patterns
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
// 將模式頻道加入server.pubsub_patterns
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
}
// 通知客戶端
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
c->pubsub_patterns));
return retval;
}
取消訂閱的過程則相反。
發(fā)布消息的過程則遍歷上述兩個數(shù)據(jù)結(jié)構(gòu)(dict 和list),并將消息發(fā)布到匹配頻道的所有客戶端。
// 發(fā)布消息
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
listNode *ln;
listIter li;
// 發(fā)布消息有兩個步驟,
// 指定頻道的所有訂閱者發(fā)布消息
// 指定模式頻道的所有訂閱者發(fā)布消息
//
// 尋找頻道
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
// 向頻道所有訂閱者發(fā)布信息
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
//
// 進行g(shù)lob-style 模式匹配
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
// 匹配成功,向訂閱者發(fā)布消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
注意, 只要客戶端訂閱了頻道, 除了SUBCRIBE,UNSUBCRIBE,PSUBCRIBE,PSUBCRIBE,就不能執(zhí)行其他命令。
int processCommand(redisClient *c) {
......
// 在訂閱發(fā)布模式下,只允許處理SUBSCRIBE 或者UNSUBSCRIBE 命令
// 從下面的檢測條件可以看出:只要存在redisClient.pubsub_channels 或者
// redisClient.pubsub_patterns,就代表處于訂閱發(fā)布模式下
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed
"in this context");
return REDIS_OK;
}
......
}