本教程中實(shí)現(xiàn)的協(xié)議是TIME協(xié)議。 它與先前的示例不同,時(shí)間服務(wù)器只發(fā)送包含32
位整數(shù)的消息,而不接收任何請(qǐng)求,并在消息發(fā)送后關(guān)閉連接。 在本示例中,您將學(xué)習(xí)如何構(gòu)造和發(fā)送消息,以及在完成時(shí)關(guān)閉連接。
因?yàn)闀r(shí)間服務(wù)器將忽略任何接收到的數(shù)據(jù),但是一旦建立連接就發(fā)送消息,所以我們不能使用channelRead()
方法。而是覆蓋channelActive()
方法。 以下是代碼的實(shí)現(xiàn):
package com.yiibai.netty.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
下面我們來(lái)看看上面代碼的一些解釋分析:
channelActive()
方法。現(xiàn)在在這個(gè)方法中編寫(xiě)一個(gè)32
位的整數(shù)來(lái)表示當(dāng)前的時(shí)間。要發(fā)送新消息,需要分配一個(gè)包含消息的新緩沖區(qū)。我們要寫(xiě)入一個(gè)32位整數(shù),因此需要一個(gè)ByteBuf,其容量至少為4
個(gè)字節(jié)。 通過(guò)ChannelHandlerContext.alloc()
獲取當(dāng)前的ByteBufAllocator
并分配一個(gè)新的緩沖區(qū)。
像之前一樣,編寫(xiě)構(gòu)造的消息。
但是,在NIO中發(fā)送消息之前,我們是否曾調(diào)用java.nio.ByteBuffer.flip()
? ByteBuf沒(méi)有這樣的方法,它只有兩個(gè)指針; 一個(gè)用于讀取操作,另一個(gè)用于寫(xiě)入操作。 當(dāng)您向ByteBuf
寫(xiě)入內(nèi)容時(shí),寫(xiě)入索引會(huì)增加,而讀取器索引不會(huì)更改。讀取器索引和寫(xiě)入器索引分別表示消息的開(kāi)始和結(jié)束位置。
相比之下,NIO緩沖區(qū)不提供一個(gè)干凈的方式來(lái)確定消息內(nèi)容開(kāi)始和結(jié)束,而不用調(diào)用flip
方法。當(dāng)您忘記翻轉(zhuǎn)緩沖區(qū)時(shí),就將會(huì)遇到麻煩,因?yàn)椴粫?huì)發(fā)送任何或發(fā)送不正確的數(shù)據(jù)。但是這樣的錯(cuò)誤不會(huì)發(fā)生在Netty中,因?yàn)椴煌牟僮黝愋臀覀冇胁煌闹羔槨?br>另一點(diǎn)要注意的是ChannelHandlerContext.write()
(和writeAndFlush()
)方法返回一個(gè)ChannelFuture
。 ChannelFuture
表示尚未發(fā)生的I/O
操作。這意味著,任何請(qǐng)求的操作可能尚未執(zhí)行,因?yàn)樗胁僮髟贜etty中是異步的。 例如,以下代碼可能會(huì)在發(fā)送消息之前關(guān)閉連接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,需要在ChannelFuture
完成后調(diào)用close()
方法,該方法由write()
方法返回,并在寫(xiě)入操作完成時(shí)通知其監(jiān)聽(tīng)器。 請(qǐng)注意,close()
也可能不會(huì)立即關(guān)閉連接,并返回一個(gè)ChannelFuture。
當(dāng)寫(xiě)請(qǐng)求完成時(shí),我們?nèi)绾蔚玫酵ㄖ?這就像向返回的ChannelFuture
添加ChannelFutureListener
一樣簡(jiǎn)單。 在這里,我們創(chuàng)建了一個(gè)新的匿名ChannelFutureListener
,當(dāng)操作完成時(shí)關(guān)閉Channel
。
或者,可以使用預(yù)定義的偵聽(tīng)器來(lái)簡(jiǎn)化代碼:
f.addListener(ChannelFutureListener.CLOSE);
要測(cè)試我們的時(shí)間服務(wù)器是否按預(yù)期工作,可以使用UNIX rdate
命令:
$ rdate -o <port> -p <host>
其中<port>
是在main()
方法中指定的端口號(hào),<host>
通常是localhost
或服務(wù)器的IP地址。
與DISCARD
和ECHO
服務(wù)器不同,我們需要一個(gè)用于TIME協(xié)議的客戶端,因?yàn)槲覀儫o(wú)法將32
位二進(jìn)制數(shù)據(jù)轉(zhuǎn)換為日歷上的日期。 在本節(jié)中,我們討論如何確保服務(wù)器正常工作并學(xué)習(xí)如何使用Netty編寫(xiě)客戶端。
Netty中服務(wù)器和客戶端之間最大的和唯一的區(qū)別是使用了不同的Bootstrap
和Channel
實(shí)現(xiàn)。 請(qǐng)看看下面的代碼:
package com.yiibai.netty.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
Bootstrap
與ServerBootstrap
類似,只是它用于非服務(wù)器通道,例如客戶端或無(wú)連接通道。
如果只指定一個(gè)EventLoopGroup
,它將同時(shí)用作boss
組和worker
組。boss
組和worker
組不是用于客戶端。
不使用NioServerSocketChannel
,而是使用NioSocketChannel
來(lái)創(chuàng)建客戶端通道。
注意,這里不像我們使用的ServerBootstrap
,所以不使用childOption()
,因?yàn)榭蛻舳?code>SocketChannel沒(méi)有父類。
應(yīng)該調(diào)用connect()
方法,而不是bind()
方法。
如上面所見(jiàn),它與服務(wù)器端代碼沒(méi)有什么不同。 ChannelHandler
實(shí)現(xiàn)又是怎么樣的呢? 它應(yīng)該從服務(wù)器接收一個(gè)32
位整數(shù),將其轉(zhuǎn)換為人類可讀的格式,打印轉(zhuǎn)換為我們熟知的時(shí)間格式 ,并關(guān)閉連接:
package com.yiibai.netty.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
Date currentTime = new Date(currentTimeMillis);
System.out.println("Default Date Format:" + currentTime.toString());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
// 轉(zhuǎn)換一下成中國(guó)人的時(shí)間格式
System.out.println("Date Format:" + dateString);
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
(1). 在TCP/IP
中,Netty讀取從對(duì)端發(fā)送的ByteBuf
數(shù)據(jù)。
客戶端看起來(lái)很簡(jiǎn)單,與服務(wù)器端示例沒(méi)什么區(qū)別。 但是,這個(gè)處理程序有時(shí)會(huì)拒絕拋出IndexOutOfBoundsException
。 我們將在下一節(jié)討論為什么會(huì)發(fā)生這種情況。
先運(yùn)行 TimeServer.java
程序,然后再運(yùn)行 TimeClient.java
, 當(dāng)運(yùn)行 TimeClient.java
時(shí)就可以到有一個(gè)時(shí)間日期輸出,然后程序自動(dòng)退出。輸出結(jié)果如下 -
Default Date Format:Thu Mar 02 20:50:23 CST 2017
Date Format:2017-03-02 20:50:23