鍍金池/ 教程/ Java/ 處理一個基于流的傳輸
Netty 實現(xiàn) WebSocket 聊天功能
總結(jié)
寫個時間客戶端
寫個丟棄服務器
問題
開始之前
關(guān)閉你的應用
開始
用POJO代替ByteBuf
總結(jié)
架構(gòu)總覽
豐富的緩沖實現(xiàn)
解決
寫個應答服務器
I/O API 統(tǒng)一的異步 I/O API
適用快速開發(fā)的高級組件
處理一個基于流的傳輸
Netty 實現(xiàn)聊天功能
基于攔截鏈模式的事件模型
寫個時間服務器
查看收到的數(shù)據(jù)

處理一個基于流的傳輸

關(guān)于 Socket Buffer的一個小警告

基于流的傳輸比如 TCP/IP, 接收到數(shù)據(jù)是存在 socket 接收的 buffer 中。不幸的是,基于流的傳輸并不是一個數(shù)據(jù)包隊列,而是一個字節(jié)隊列。意味著,即使你發(fā)送了2個獨立的數(shù)據(jù)包,操作系統(tǒng)也不會作為2個消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠程寫入的數(shù)據(jù)就會準確地讀取。舉個例子,讓我們假設(shè)操作系統(tǒng)的 TCP/TP 協(xié)議棧已經(jīng)接收了3個數(shù)據(jù)包:

http://wiki.jikexueyuan.com/project/netty-4-user-guide/images/3.1.png" alt="" />

由于基于流傳輸?shù)膮f(xié)議的這種普通的性質(zhì),在你的應用程序里讀取數(shù)據(jù)的時候會有很高的可能性被分成下面的片段

http://wiki.jikexueyuan.com/project/netty-4-user-guide/images/3.2.png" alt="" />

因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的數(shù)據(jù)整理成一個或者多個更有意思并且能夠讓程序的業(yè)務邏輯更好理解的數(shù)據(jù)。在上面的例子中,接收到的數(shù)據(jù)應該被構(gòu)造成下面的格式:

http://wiki.jikexueyuan.com/project/netty-4-user-guide/images/3.3.png" alt="" />

The First Solution 辦法一

回到 TIME 客戶端例子。同樣也有類似的問題。一個32位整型是非常小的數(shù)據(jù),他并不見得會被經(jīng)常拆分到到不同的數(shù)據(jù)段內(nèi)。然而,問題是他確實可能會被拆分到不同的數(shù)據(jù)段內(nèi),并且拆分的可能性會隨著通信量的增加而增加。

最簡單的方案是構(gòu)造一個內(nèi)部的可積累的緩沖,直到4個字節(jié)全部接收到了內(nèi)部緩沖。下面的代碼修改了 TimeClientHandler 的實現(xiàn)類修復了這個問題

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
        private ByteBuf buf;

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            buf = ctx.alloc().buffer(4); // (1)
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            buf.release(); // (1)
            buf = null;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg;
            buf.writeBytes(m); // (2)
            m.release();

            if (buf.readableBytes() >= 4) { // (3)
                long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

1.ChannelHandler 有2個生命周期的監(jiān)聽方法:handlerAdded()和 handlerRemoved()。你可以完成任意初始化任務只要他不會被阻塞很長的時間。

2.首先,所有接收的數(shù)據(jù)都應該被累積在 buf 變量里。

3.然后,處理器必須檢查 buf 變量是否有足夠的數(shù)據(jù),在這個例子中是4個字節(jié),然后處理實際的業(yè)務邏輯。否則,Netty 會重復調(diào)用channelRead() 當有更多數(shù)據(jù)到達直到4個字節(jié)的數(shù)據(jù)被積累。

The Second Solution 方法二

盡管第一個解決方案已經(jīng)解決了 TIME 客戶端的問題了,但是修改后的處理器看起來不那么的簡潔,想象一下如果由多個字段比如可變長度的字段組成的更為復雜的協(xié)議時,你的 ChannelInboundHandler 的實現(xiàn)將很快地變得難以維護。

正如你所知的,你可以增加多個 ChannelHandlerChannelPipeline ,因此你可以把一整個ChannelHandler 拆分成多個模塊以減少應用的復雜程度,比如你可以把TimeClientHandler 拆分成2個處理器:

  • TimeDecoder 處理數(shù)據(jù)拆分的問題
  • TimeClientHandler 原始版本的實現(xiàn)

幸運地是,Netty 提供了一個可擴展的類,幫你完成 TimeDecoder 的開發(fā)。

    public class TimeDecoder extends ByteToMessageDecoder { // (1)
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
            if (in.readableBytes() < 4) {
                return; // (3)
            }

            out.add(in.readBytes(4)); // (4)
        }
    }

1.ByteToMessageDecoderChannelInboundHandler 的一個實現(xiàn)類,他可以在處理數(shù)據(jù)拆分的問題上變得很簡單。

2.每當有新數(shù)據(jù)接收的時候,ByteToMessageDecoder 都會調(diào)用 decode() 方法來處理內(nèi)部的那個累積緩沖。

3.Decode() 方法可以決定當累積緩沖里沒有足夠數(shù)據(jù)時可以往 out 對象里放任意數(shù)據(jù)。當有更多的數(shù)據(jù)被接收了 ByteToMessageDecoder 會再一次調(diào)用 decode() 方法。

4.如果在 decode() 方法里增加了一個對象到 out 對象里,這意味著解碼器解碼消息成功。ByteToMessageDecoder 將會丟棄在累積緩沖里已經(jīng)被讀過的數(shù)據(jù)。請記得你不需要對多條消息調(diào)用 decode(),ByteToMessageDecoder 會持續(xù)調(diào)用 decode() 直到不放任何數(shù)據(jù)到 out 里。

現(xiàn)在我們有另外一個處理器插入到 ChannelPipeline 里,我們應該在 TimeClient 里修改 ChannelInitializer 的實現(xiàn):

    b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
        }
    });

如果你是一個大膽的人,你可能會嘗試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下 API 文檔來獲取更多的信息。

    public class TimeDecoder extends ReplayingDecoder<Void> {
        @Override
        protected void decode(
                ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            out.add(in.readBytes(4));
        }
    }

此外,Netty還提供了更多開箱即用的解碼器使你可以更簡單地實現(xiàn)更多的協(xié)議,幫助你避免開發(fā)一個難以維護的處理器實現(xiàn)。請參考下面的包以獲取更多更詳細的例子:

譯者注:翻譯版本的項目源碼見 https://github.com/waylau/netty-4-user-guide-demos 中的com.waylau.netty.demo.factorialcom.waylau.netty.demo.telnet 包下

上一篇:總結(jié)下一篇:開始之前