鍍金池/ 教程/ 大數(shù)據(jù)/ 消息中間件
Redis 數(shù)據(jù)淘汰機制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲 session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機制

消息中間件

消息隊列簡介

接觸 Linux 系統(tǒng)編程的時候,曾經(jīng)學(xué)到消息隊列是 IPC 的一種方式,這種通訊方式通常只用于本地的進(jìn)程,基于共享內(nèi)存的《無鎖消息隊列》即是一個很好的中間件,詳見這里。但這篇提到的消息隊列,也被稱為消息中間件,通常在分布式系統(tǒng)中用到。

提及消息中間件的時候,還會涉及生產(chǎn)者和消費者兩個概念。消息中間件是負(fù)責(zé)接收來自生產(chǎn)者的消息,并存儲并轉(zhuǎn)發(fā)給對應(yīng)的消費者,生產(chǎn)者可以按 topic 發(fā)布各樣消息,消費者也可以按 topic 訂閱各樣消息。生產(chǎn)者只管往消息隊列里推送消息,不用等待消費者的回應(yīng);消費者只管從消息隊列中取出數(shù)據(jù)并處理,可用可靠性等問題都交由消息中間件來負(fù)責(zé)。

http://wiki.jikexueyuan.com/project/redis/images/w.png" alt="" />

說白了,這種分布式的消息中間件即是網(wǎng)絡(luò)上一個服務(wù)器,我們可以往里面扔數(shù)據(jù),里面的數(shù)據(jù)會被消息中間件推送或者被別人拉取,消息中間件取到一個數(shù)據(jù)中轉(zhuǎn)的作用。生產(chǎn)者和消費者通常有兩種對應(yīng)關(guān)系,一個生產(chǎn)者對應(yīng)一個消費者,以及一個生產(chǎn)者對應(yīng)多個消費者。在這篇文章中,介紹了消息中間件的三個特點:解耦,異步和并行。讀者可以自行理解。一些不需要及時可靠響應(yīng)的業(yè)務(wù)場景,消息中間件可以大大提高業(yè)務(wù)上層的 吞吐量。

目前消息中間件一族里邊有一些優(yōu)秀的作品,RabbitMQ, Jafka/Kafka。redis 也可以作為一個入門級的消息隊列。上面提到的一個生產(chǎn)者對應(yīng)一個消費者,Redis 的 blist 可以實現(xiàn);一個生產(chǎn)者對應(yīng)多個消費者,Redis 的pub/sub 模式可以實現(xiàn)。值得注意的是,使用 Redis 作為消息中間件,假如消費者有一段時間斷開了與 Redis 的連接,它將不會收到這段時間內(nèi) Redis 內(nèi)的數(shù)據(jù),這一點從 pub/sub 的實現(xiàn)可以知道。嚴(yán)格意義上的消息中間件,需要保證數(shù)據(jù)的可靠性。

分布式的消息隊列

在平時的開發(fā)當(dāng)中,消息隊列算是最常見的應(yīng)用了。在本機的時候,可以使用系統(tǒng)提供的消息隊列,或者基于共享內(nèi)存的循環(huán)消息隊列,來實現(xiàn)本機進(jìn)程以及進(jìn)程之間的通信。對于異機部署的多個進(jìn)程,就需要用到分布式的消息隊列了,來看看這個場景:

http://wiki.jikexueyuan.com/project/redis/images/w1.png" alt="" />

生產(chǎn)者,基于 Redis 的消息隊列,3 個 worker 組都分別部署在不同的機器上,生產(chǎn)者會快速將產(chǎn)出內(nèi)容(如需要存儲的數(shù)據(jù)或者日志等)推送到消息隊列服務(wù)器上,這是 worker group 就能消費了。

這種實現(xiàn)可以借助 Redis 中的 blist 實現(xiàn)。在這里用 C 實現(xiàn)了一個生產(chǎn)者和 worker group 的示例代碼:

// comm.h
#ifndef COMM_H__
#define COMM_H__
#include <inttypes.h>
typedef struct {
    char ip[32];
    uint16_t port;
    char queue_name[256];
} config_t ;
void Usage(char *program) {
    printf("Usage: %s -h ip -p port -l test\n",program);
    abort();
}
const size_t max_cmd_len = 512;
#endif

生產(chǎn)者的代碼:
// producer.cc
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#include "comm.h"

#include "hiredis/hiredis.h"

void test_redis_client()
{
    redisContext *rc = redisConnect("127.0.0.1",6379);
    if(NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return;
    }
    // set name
    redisReply *reply = (redisReply *)redisCommand(rc,"set name dylan");
    printf("%s\n",reply->str);
    // get name
    reply = (redisReply *)redisCommand(rc,"get name");
    printf("%s\n",reply->str);
    }
    int main(int argc, char *argv[]) {
    if (argc < 7)
        Usage(argv[0]);
    config_t config;
    for (int i = EOF;
            (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
        switch (i) {
            case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
            case 'p': config.port = atoi(optarg); break;
            case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
            optarg); break;
            default: Usage(argv[0]); break;
        }
    }
    redisContext *rc = redisConnect(config.ip,config.port);
    if (NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return -1;
    }
    redisReply *reply = NULL;
    char cmd[max_cmd_len];
    snprintf(cmd,sizeof(cmd),"LPUSH %s task",config.queue_name);
    printf("cmd=%s\n",cmd);
    int count = 100;
    while (count--) {
        reply = (redisReply *)redisCommand(rc,cmd);
    if (reply && reply->type == REDIS_REPLY_INTEGER) {
    } else {
        printf("BLPUSH error\n");
        }
    }
return 0;
}

消費者的代碼:
// consumer.cc
#include "comm.h"
#include "hiredis/hiredis.h"
int DoLogic(char *data, size_t len);
int main(int argc, char *argv[]) {
    if (argc < 7)
        Usage(argv[0]);
    config_t config;
        for (int i = EOF;
            (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
        switch (i) {
            case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
            case 'p': config.port = atoi(optarg); break;
            case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
            optarg); break;
            default: Usage(argv[0]); break;
        }
    }
    redisContext *rc = redisConnect(config.ip,config.port);
    if (NULL == rc || rc != NULL && rc->err) {
        fprintf(stderr,"error: %s\n",rc->errstr);
        return -1;
    }
    redisReply *reply = NULL;
    char cmd[max_cmd_len];
    snprintf(cmd,sizeof(cmd),"BRPOP %s task 30",config.queue_name);
    int seq = 0;
    while (true) {
        reply = (redisReply *)redisCommand(rc,cmd);
    if (reply && reply->type == REDIS_REPLY_STRING) {
        DoLogic(reply->str,reply->len);
    } else if (reply && reply->type == REDIS_REPLY_ARRAY) {
        for (size_t i=0; i<reply->elements; i+=2) {
        printf("%d->%s\n",seq++,reply->element[i]->str);
    }
    } else {
    printf("BRPOP error, reply->type=%d\n",reply->type);
    break;
        }
    }
    return 0;
}
    int DoLogic(char *data, size_t len) {
        printf("reply=%s\n",data);
return 0;
}