鍍金池/ 教程/ 數(shù)據(jù)庫(kù)/ Spouts
基礎(chǔ)知識(shí)
Bolts
事務(wù)性拓?fù)?/span>
附錄 A
使用非 JVM 語(yǔ)言開發(fā)
一個(gè)實(shí)際的例子
拓?fù)?/span>
準(zhǔn)備開始
附錄 C
Spouts
附錄 B

Spouts

你將在本章了解到 spout 作為拓?fù)淙肟诤退娜蒎e(cuò)機(jī)制相關(guān)的最常見的設(shè)計(jì)策略。

可靠的消息 VS 不可靠的消息

在設(shè)計(jì)拓?fù)浣Y(jié)構(gòu)時(shí),始終在頭腦中記著的一件重要事情就是消息的可靠性。當(dāng)有無法處理的消息時(shí),你就要決定該怎么辦,以及作為一個(gè)整體的拓?fù)浣Y(jié)構(gòu)該做些什么。舉個(gè)例子,在處理銀行存款時(shí),不要丟失任何事務(wù)報(bào)文就是很重要的事情。但是如果你要統(tǒng)計(jì)分析數(shù)以百萬的 tweeter 消息,即使有一條丟失了,仍然可以認(rèn)為你的結(jié)果是準(zhǔn)確的。

對(duì)于 Storm 來說,根據(jù)每個(gè)拓?fù)涞男枰獡?dān)保消息的可靠性是開發(fā)者的責(zé)任。這就涉及到消息可靠性和資源消耗之間的權(quán)衡。高可靠性的拓?fù)浔仨毠芾韥G失的消息,必然消耗更多資源;可靠性較低的拓?fù)淇赡軙?huì)丟失一些消息,占用的資源也相應(yīng)更少。不論選擇什么樣的可靠性策略,Storm 都提供了不同的工具來實(shí)現(xiàn)它。

要在 spout 中管理可靠性,你可以在分發(fā)時(shí)包含一個(gè)元組的消息 ID(collector.emit(new Values(…),tupleId))。在一個(gè)元組被正確的處理時(shí)調(diào)用 ack** 方法,而在失敗時(shí)調(diào)用 fail** 方法。當(dāng)一個(gè)元組被所有的靶 bolt 和錨 bolt 處理過,即可判定元組處理成功(你將在第5章學(xué)到更多錨 bolt 知識(shí))。

發(fā)生下列情況之一時(shí)為元組處理失敗:

  • 提供數(shù)據(jù)的 spout 調(diào)用 collector.fail(tuple)
  • 處理時(shí)間超過配置的超時(shí)時(shí)間

讓我們來看一個(gè)例子。想象你正在處理銀行事務(wù),需求如下:

  • 如果事務(wù)失敗了,重新發(fā)送消息
  • 如果失敗了太多次,終結(jié)拓?fù)溥\(yùn)行

創(chuàng)建一個(gè) spout 和一個(gè) bolt,spout 隨機(jī)發(fā)送100個(gè)事務(wù) ID,有80%的元組不會(huì)被 bolt 收到(你可以在例子 ch04-spout 查看完整代碼)。實(shí)現(xiàn) spout 時(shí)利用 Map 分發(fā)事務(wù)消息元組,這樣就比較容易實(shí)現(xiàn)重發(fā)消息。

public void nextTuple() {
    if(!toSend.isEmpty()){
        for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){
            Integer transactionId = transactionEntry.getKey();
            String transactionMessage = transactionEntry.getValue();
            collector.emit(new Values(transactionMessage),transactionId);
        }
        toSend.clear();
    }
}  

如果有未發(fā)送的消息,得到每條事務(wù)消息和它的關(guān)聯(lián) ID,把它們作為一個(gè)元組發(fā)送出去,最后清空消息隊(duì)列。值得一提的是,調(diào)用 map 的 clear 是安全的,因?yàn)?nextTuple 失敗時(shí),只有 ack 方法會(huì)修改 map,而它們都運(yùn)行在一個(gè)線程內(nèi)。

維護(hù)兩個(gè) map 用來跟蹤待發(fā)送的事務(wù)消息和每個(gè)事務(wù)的失敗次數(shù)。ack 方法只是簡(jiǎn)單的把事務(wù)從每個(gè)列表中刪除。

public void ack(Object msgId) {
    messages.remove(msgId);
    failCounterMessages.remove(msgId);
}  

fail 方法決定應(yīng)該重新發(fā)送一條消息,還是已經(jīng)失敗太多次而放棄它。

NOTE:如果你使用全部數(shù)據(jù)流組,而拓?fù)淅锏乃?bolt 都失敗了,spout 的 fail 方法才會(huì)被調(diào)用。

public void fail(Object msgId) {
    Integer transactionId = (Integer) msgId;
    //檢查事務(wù)失敗次數(shù)
    Integer failures = transactionFailureCount.get(transactionId) + 1;

    if(failes >= MAX_FAILS){
        //失敗數(shù)太高了,終止拓?fù)?        throw new RuntimeException("錯(cuò)誤, transaction id 【"+

         transactionId+"】 已失敗太多次了 【"+failures+"】");
    }

    //失敗次數(shù)沒有達(dá)到最大數(shù),保存這個(gè)數(shù)字并重發(fā)此消息
    transactionFailureCount.put(transactionId, failures);
    toSend.put(transactionId, messages.get(transactionId));
    LOG.info("重發(fā)消息【"+msgId+"】");
}  

首先,檢查事務(wù)失敗次數(shù)。如果一個(gè)事務(wù)失敗次數(shù)太多,通過拋出 RuntimeException 終止發(fā)送此條消息的工人。否則,保存失敗次數(shù),并把消息放入待發(fā)送隊(duì)列(toSend),它就會(huì)再次調(diào)用 nextTuple 時(shí)得以重新發(fā)送。

NOTE:Storm 節(jié)點(diǎn)不維護(hù)狀態(tài),因此如果你在內(nèi)存保存信息(就像本例做的那樣),而節(jié)點(diǎn)又不幸掛了,你就會(huì)丟失所有緩存的消息。 Storm 是一個(gè)快速失敗的系統(tǒng)。拓?fù)鋾?huì)在拋出異常時(shí)掛掉,然后再由 Storm 重啟,恢復(fù)到拋出異常前的狀態(tài)。

獲取數(shù)據(jù)

接下來你會(huì)了解到一些設(shè)計(jì) spout 的技巧,幫助你從多數(shù)據(jù)源獲取數(shù)據(jù)。

直接連接

在一個(gè)直接連接的架構(gòu)中,spout 直接與一個(gè)消息分發(fā)器連接。

http://wiki.jikexueyuan.com/project/storm/images/05.png" alt="" />

圖 直接連接的 spout

這個(gè)架構(gòu)很容易實(shí)現(xiàn),尤其是在消息分發(fā)器是已知設(shè)備或已知設(shè)備組時(shí)。已知設(shè)備滿足:拓?fù)鋸膯?dòng)時(shí)就已知道該設(shè)備,并貫穿拓?fù)涞恼麄€(gè)生命周期保持不變。未知設(shè)備就是在拓?fù)溥\(yùn)行期添加進(jìn)來的。已知設(shè)備組就是從拓?fù)鋯?dòng)時(shí)組內(nèi)所有設(shè)備都是已知的。

下面舉個(gè)例子說明這一點(diǎn)。創(chuàng)建一個(gè) spout 使用 Twitter 流 API 讀取 twitter 數(shù)據(jù)流。spout 把 API 當(dāng)作消息分發(fā)器直接連接。從數(shù)據(jù)流中得到符合 track 參數(shù)的公共 tweets(參考 twitter 開發(fā)頁(yè)面)。完整的例子可以在鏈接 https://github.com/storm-book/examples-ch04-spouts/找到。

spout 從配置對(duì)象得到連接參數(shù)(track,user,password),并連接到 API(在這個(gè)例子中使用 ApacheDefaultHttpClient)。它一次讀一行數(shù)據(jù),并把數(shù)據(jù)從 JSON 轉(zhuǎn)化成 Java 對(duì)象,然后發(fā)布它。

public void nextTuple() {
    //創(chuàng)建http客戶端
    client = new DefaultHttpClient();
    client.setCredentialsProvider(credentialProvider);
    HttpGet get = new HttpGet(STREAMING_API_URL+track);
    HttpResponse response;
    try {
        //執(zhí)行http訪問
        response = client.execute(get);
        StatusLine status = response.getStatusLine();
        if(status.getStatusCode() == 200){
            InputStream inputStream = response.getEntity().getContent();
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
            String in;
            //逐行讀取數(shù)據(jù)
            while((in = reader.readLine())!=null){
                try{
                    //轉(zhuǎn)化并發(fā)布消息
                    Object json = jsonParser.parse(in);
                    collector.emit(new Values(track,json));
                }catch (ParseException e) {
                    LOG.error("Error parsing message from twitter",e);
                }
            }
        }
    } catch (IOException e) {
        LOG.error("Error in communication with twitter api ["+get.getURI().toString()+"], 
           sleeping 10s");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e1) {}
    }
}  

NOTE:在這里你鎖定了 nextTuple 方法,所以你永遠(yuǎn)也不會(huì)執(zhí)行 ack** 和 fail** 方法。在真實(shí)的應(yīng)用中,我們推薦你在一個(gè)單獨(dú)的線程中執(zhí)行鎖定,并維持一個(gè)內(nèi)部隊(duì)列用來交換數(shù)據(jù)(你會(huì)在下一個(gè)例子中學(xué)到如何實(shí)現(xiàn)這一點(diǎn):消息隊(duì)列)。

棒極了! 現(xiàn)在你用一個(gè) spout 讀取 Twitter 數(shù)據(jù)。一個(gè)明智的做法是,采用拓?fù)洳⑿谢?,多個(gè) spout 從同一個(gè)流讀取數(shù)據(jù)的不同部分。那么如果你有多個(gè)流要讀取,你該怎么做呢?Storm 的第二個(gè)有趣的特性(譯者注:第一個(gè)有趣的特性已經(jīng)出現(xiàn)過,這句話原文都是一樣的,不過按照中文的行文習(xí)慣還是不重復(fù)使用措詞了)是,你可以在任意組件內(nèi)(spouts/bolts)訪問TopologyContext。利用這一特性,你能夠把流劃分到多個(gè) spouts 讀取。

public void open(Map conf, TopologyContext context,
          SpoutOutputCollector collector) {
    //從context對(duì)象獲取spout大小
    int spoutsSize = 
context.getComponentTasks(context.getThisComponentId()).size();
    //從這個(gè)spout得到任務(wù)id
    int myIdx = context.getThisTaskIndex();
    String[] tracks = ((String) conf.get("track")).split(",");
    StringBuffer tracksBuffer = new StringBuffer();
    for(int i=0; i< tracks.length;i++){
        //Check if this spout must read the track word
        if( i % spoutsSize == myIdx){
            tracksBuffer.append(",");
            tracksBuffer.append(tracks[i]);
        }
    }
    if(tracksBuffer.length() == 0) {
        throw new RuntimeException("沒有為spout得到track配置" +
 " [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的數(shù)量必須高于spout的數(shù)量");
 this.track =tracksBuffer.substring(1).toString();
    }
 ...
 }  

利用這一技巧,你可以把 collector 對(duì)象均勻的分配給多個(gè)數(shù)據(jù)源,當(dāng)然也可以應(yīng)用到其它的情形。比如說,從web服務(wù)器收集日志文件

http://wiki.jikexueyuan.com/project/storm/images/05.png" alt="" />

圖 直連 hash

通過上一個(gè)例子,你學(xué)會(huì)了從一個(gè) spout 連接到已知設(shè)備。你也可以使用相同的方法連接未知設(shè)備,不過這時(shí)你需要借助于一個(gè)協(xié)同系統(tǒng)維護(hù)的設(shè)備列表。協(xié)同系統(tǒng)負(fù)責(zé)探察列表的變化,并根據(jù)變化創(chuàng)建或銷毀連接。比如,從 web 服務(wù)器收集日志文件時(shí),web 服務(wù)器列表可能隨著時(shí)間變化。當(dāng)添加一臺(tái) web 服務(wù)器時(shí),協(xié)同系統(tǒng)探查到變化并為它創(chuàng)建一個(gè)新的 spout。

http://wiki.jikexueyuan.com/project/storm/images/06.png" alt="" />

圖 直連協(xié)同

消息隊(duì)列

第二種方法是,通過一個(gè)隊(duì)列系統(tǒng)接收來自消息分發(fā)器的消息,并把消息轉(zhuǎn)發(fā)給 spout。更進(jìn)一步的做法是,把隊(duì)列系統(tǒng)作為 spout 和數(shù)據(jù)源之間的中間件,在許多情況下,你可以利用多隊(duì)列系統(tǒng)的重播能力增強(qiáng)隊(duì)列可靠性。這意味著你不需要知道有關(guān)消息分發(fā)器的任何事情,而且添加或移除分發(fā)器的操作比直接連接簡(jiǎn)單的多。這個(gè)架構(gòu)的問題在于隊(duì)列是一個(gè)故障點(diǎn),另外你還要為處理流程引入新的環(huán)節(jié)。

下圖展示了這一架構(gòu)模型

http://wiki.jikexueyuan.com/project/storm/images/07.png" alt="" />

圖 使用隊(duì)列系統(tǒng)

NOTE:你可以通過輪詢隊(duì)列或哈希隊(duì)列(把隊(duì)列消息通過哈希發(fā)送給 spouts 或創(chuàng)建多個(gè)隊(duì)列使隊(duì)列 spouts 一一對(duì)應(yīng))在多個(gè) spouts 之間實(shí)現(xiàn)并行性。

接下來我們利用 Redishttp://redis.io/ 和它的 java 庫(kù) Jedis 創(chuàng)建一個(gè)隊(duì)列系統(tǒng)。在這個(gè)例子中,我們創(chuàng)建一個(gè)日志處理器從一個(gè)未知的來源收集日志,利用 lpush 命令把消息插入隊(duì)列,利用 blpop 命令等待消息。如果你有很多處理過程,blpop 命令采用了輪詢方式獲取消息。

我們?cè)?spout 的 open** 方法創(chuàng)建一個(gè)線程,用來獲取消息(使用線程是為了避免鎖定nextTuple** 在主循環(huán)的調(diào)用):

new Thread(new Runnable() {
    @Override
    public void run() {
        try{
           Jedis client= new Jedis(redisHost, redisPort);
           List res = client.blpop(Integer.MAX_VALUE, queues);
           messages.offer(res.get(1));
        }catch(Exception e){
            LOG.error("從redis讀取隊(duì)列出錯(cuò)",e);
            try {
                Thread.sleep(100);
            }catch(InterruptedException e1){}
        }
    }
}).start();  

這個(gè)線程的惟一目的就是,創(chuàng)建 redis 連接,然后執(zhí)行 blpop 命令。每當(dāng)收到了一個(gè)消息,它就被添加到一個(gè)內(nèi)部消息隊(duì)列,然后會(huì)被 nextTuple**** 消費(fèi)。對(duì)于 spout 來說數(shù)據(jù)源就是 redis 隊(duì)列,它不知道消息分發(fā)者在哪里也不知道消息的數(shù)量。

NOTE:我們不推薦你在 spout 創(chuàng)建太多線程,因?yàn)槊總€(gè) spout 都運(yùn)行在不同的線程。一個(gè)更好的替代方案是增加拓?fù)洳⑿行?,也就是通過 Storm 集群在分布式環(huán)境創(chuàng)建更多線程。

nextTuple 方法中,要做的惟一的事情就是從內(nèi)部消息隊(duì)列獲取消息并再次分發(fā)它們。

public void nextTuple(){
    while(!messages.isEmpty()){
        collector.emit(new Values(messages.poll()));
    }
}  

NOTE:你還可以借助 redis 在 spout 實(shí)現(xiàn)消息重發(fā),從而實(shí)現(xiàn)可靠的拓?fù)?。(譯者注:這里是相對(duì)于開頭的可靠的消息VS不可靠的消息講的)

DRPC

DRPCSpout從DRPC 服務(wù)器接收一個(gè)函數(shù)調(diào)用,并執(zhí)行它(見第三章的例子)。對(duì)于最常見的情況,使用 backtype.storm.drpc.DRPCSpout 就足夠了,不過仍然有可能利用 Storm 包內(nèi)的DRPC類創(chuàng)建自己的實(shí)現(xiàn)。

小結(jié)

現(xiàn)在你已經(jīng)學(xué)習(xí)了常見的spout實(shí)現(xiàn)模式,它們的優(yōu)勢(shì),以及如何確保消息可靠性。不存在適用于所有拓?fù)涞募軜?gòu)模式。如果你知道數(shù)據(jù)源,并且能夠控制它們,你就可以使用直接連接;然而如果你需要添加未知數(shù)據(jù)源或從多種數(shù)據(jù)源接收數(shù)據(jù),就最好使用消息隊(duì)列。如果你要執(zhí)行在線過程,你可以使用 DRPCSpout 或類似的實(shí)現(xiàn)。

你已經(jīng)學(xué)習(xí)了三種常見連接方式,不過依賴于你的需求仍然有無限的可能。