在TCP/IP的基于流的傳輸中,接收的數(shù)據(jù)被存儲到套接字接收緩沖器中。不幸的是,基于流的傳輸?shù)木彌_器不是分組的隊列,而是字節(jié)的隊列。 這意味著,即使將兩個消息作為兩個獨立的數(shù)據(jù)包發(fā)送,操作系統(tǒng)也不會將它們視為兩個消息,而只是一組字節(jié)(有點悲劇)。 因此,不能保證讀的是您在遠程定入的行數(shù)據(jù)。 例如,假設操作系統(tǒng)的TCP/IP
堆棧已收到三個數(shù)據(jù)包:
由于基于流的協(xié)議的這種通用屬性,在應用程序中以下面的碎片形式(只是其中的一種)讀取它們的機會很高:
因此,接收部分,無論是服務器側(cè)還是客戶端側(cè),都應該將接收到的數(shù)據(jù)碎片整理成邏輯可由應用容易地理解的一個或多個有意義的幀。 在上述示例的情況下,接收的數(shù)據(jù)應該如下成幀:
針對上面的問題,下面列出了兩個解決方案。
現(xiàn)在我們回到TIME客戶端示例。在這里有同樣的問題。 32
位整數(shù)可以算是非常少量的數(shù)據(jù)量了,并且不可能經(jīng)常被分段。 然而,問題是它可以分割,并且碎片的可能性將隨著流量增加而增加。
簡單的解決方案是創(chuàng)建一個內(nèi)部累積緩沖區(qū),并等待所有4
個字節(jié)被接收到內(nèi)部緩沖區(qū)。 以下是修正的TimeClientHandler
實現(xiàn),它修復了問題:
package com.yiibai.netty.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ChannelHandler
有兩個生命周期偵聽器方法:handlerAdded()
和handlerRemoved()
。 只要不會阻塞很長時間,您就可以執(zhí)行任意初始化任務。
首先,所有接收到的數(shù)據(jù)應累加到buf
中。
然后,處理程序必須檢查buf
是否有足夠的數(shù)據(jù)(在此示例中為4
個字節(jié)),當足夠時就繼續(xù)進行實際的業(yè)務邏輯。否則,在有更多數(shù)據(jù)到達時Netty將再次調(diào)用channelRead()
方法,最終累積到達4
個字節(jié)再執(zhí)行實際的業(yè)務。
雖然第一個解決方案已經(jīng)解決了TIME客戶端的問題,但修改的處理程序看起來不那么干凈。想象如果一個更復雜的協(xié)議,它由多個字段組成,例如:可變長度字段等。上面的ChannelInboundHandler
實現(xiàn)很快就無法維護了。
可能已經(jīng)注意到,可以向ChannelPipeline
添加多個ChannelHandler
,因此,可將一個單片的ChannelHandler
拆分為多個模塊,以降低應用程序的復雜性。 例如,可將TimeClientHandler
拆分為兩個處理程序:
幸運的是,Netty提供了一個可擴展類,可以幫助我們方便地編寫:
package com.yiibai.netty.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
ByteToMessageDecoder
是ChannelInboundHandler
的一個實現(xiàn),它使得處理碎片問題變得容易。
ByteToMessageDecoder
在接收到新數(shù)據(jù)時,使用內(nèi)部維護的累積緩沖區(qū)調(diào)用decode()
方法。
decode()
可以決定在累積緩沖區(qū)中沒有足夠數(shù)據(jù)的情況下不添加任何東西。 當接收到更多數(shù)據(jù)時,ByteToMessageDecoder
將再次調(diào)用decode()
。
如果decode()
將對象添加到out
,則意味著解碼器成功地解碼了消息。 ByteToMessageDecoder
將丟棄累積緩沖區(qū)的讀取部分。要記住,不需要解碼多個消息。 ByteToMessageDecoder
將繼續(xù)調(diào)用decode()
方法,直到它沒有再有任何東西添加。
現(xiàn)在我們有另一個處理程序插入ChannelPipeline
,應該在TimeClient
中修改ChannelInitializer
實現(xiàn):
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果您喜歡折騰,也可以想嘗試使用ReplayDecoder
,這簡化了解碼器更多的工作。但需要參考API參考以獲得更多信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty提供了現(xiàn)成的解碼器,使我們能夠非常容易地實現(xiàn)大多數(shù)的協(xié)議,并幫助您避免使用一個單一的不可維護的處理程序?qū)崿F(xiàn)。有關(guān)更多詳細示例,請參閱以下示例:
二進制協(xié)議實現(xiàn): Netty實踐-factorial服務器
基于文本行的協(xié)議實現(xiàn): Netty實踐-telnet服務器