鍍金池/ 教程/ Java/ Netty 實(shí)現(xiàn)聊天功能
Netty 實(shí)現(xiàn) WebSocket 聊天功能
總結(jié)
寫個(gè)時(shí)間客戶端
寫個(gè)丟棄服務(wù)器
問題
開始之前
關(guān)閉你的應(yīng)用
開始
用POJO代替ByteBuf
總結(jié)
架構(gòu)總覽
豐富的緩沖實(shí)現(xiàn)
解決
寫個(gè)應(yīng)答服務(wù)器
I/O API 統(tǒng)一的異步 I/O API
適用快速開發(fā)的高級(jí)組件
處理一個(gè)基于流的傳輸
Netty 實(shí)現(xiàn)聊天功能
基于攔截鏈模式的事件模型
寫個(gè)時(shí)間服務(wù)器
查看收到的數(shù)據(jù)

Netty 實(shí)現(xiàn)聊天功能

Netty 是一個(gè) Java NIO 客戶端服務(wù)器框架,使用它可以快速簡(jiǎn)單地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,比如服務(wù)器和客戶端的協(xié)議。Netty 大大簡(jiǎn)化了網(wǎng)絡(luò)程序的開發(fā)過程比如 TCP 和 UDP 的 socket 服務(wù)的開發(fā)。更多關(guān)于 Netty 的知識(shí),可以參閱《Netty 4.x 用戶指南》https://github.com/waylau/netty-4-user-guide

下面,就基于 Netty 快速實(shí)現(xiàn)一個(gè)聊天小程序。

準(zhǔn)備

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

服務(wù)端

讓我們從 handler (處理器)的實(shí)現(xiàn)開始,handler 是由 Netty 生成用來處理 I/O 事件的。

SimpleChatServerHandler.java

public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        }
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
        }
        channels.remove(ctx.channel());
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
            } else {
                channel.writeAndFlush("[you]" + s + "\n");
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常");
        // 當(dāng)出現(xiàn)異常就關(guān)閉連接
        cause.printStackTrace();
        ctx.close();
    }
}

1.SimpleChatServerHandler 繼承自 SimpleChannelInboundHandler,這個(gè)類實(shí)現(xiàn)了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實(shí)現(xiàn)接口方法。

2.覆蓋了 handlerAdded() 事件處理方法。每當(dāng)從服務(wù)端收到新的客戶端連接時(shí),客戶端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客戶端 Channel

3.覆蓋了 handlerRemoved() 事件處理方法。每當(dāng)從服務(wù)端收到客戶端斷開時(shí),客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel

4.覆蓋了 channelRead0() 事件處理方法。每當(dāng)從服務(wù)端讀到客戶端寫入信息時(shí),將信息轉(zhuǎn)發(fā)給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時(shí),需要把 channelRead0() 重命名為messageReceived()

5.覆蓋了 channelActive() 事件處理方法。服務(wù)端監(jiān)聽到客戶端活動(dòng)

6.覆蓋了 channelInactive() 事件處理方法。服務(wù)端監(jiān)聽到客戶端不活動(dòng)

7.exceptionCaught() 事件處理方法是當(dāng)出現(xiàn) Throwable 對(duì)象才會(huì)被調(diào)用,即當(dāng) Netty 由于 IO 錯(cuò)誤或者處理器在處理事件時(shí)拋出的異常時(shí)。在大部分情況下,捕獲的異常應(yīng)該被記錄下來并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個(gè)方法的處理方式會(huì)在遇到不同異常的情況下有不同的實(shí)現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個(gè)錯(cuò)誤碼的響應(yīng)消息。

SimpleChatServerInitializer.java

SimpleChatServerInitializer 用來增加多個(gè)的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleChatServerHandler 等。

public class SimpleChatServerInitializer extends
        ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatServerHandler());

        System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連接上");
    }
}

SimpleChatServer.java

編寫一個(gè) main() 方法來啟動(dòng)服務(wù)端。

public class SimpleChatServer {

    private int port;

    public SimpleChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new SimpleChatServerInitializer())  //(4)
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            System.out.println("SimpleChatServer 啟動(dòng)了");

            // 綁定端口,開始接收進(jìn)來的連接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服務(wù)器  socket 關(guān)閉 。
            // 在這個(gè)例子中,這不會(huì)發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("SimpleChatServer 關(guān)閉了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new SimpleChatServer(port).run();

    }
}

1.NioEventLoopGroup是用來處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的EventLoopGroup的實(shí)現(xiàn)用來處理不同的傳輸。在這個(gè)例子中我們實(shí)現(xiàn)了一個(gè)服務(wù)端的應(yīng)用,因此會(huì)有2個(gè) NioEventLoopGroup 會(huì)被使用。第一個(gè)經(jīng)常被叫做‘boss’,用來接收進(jìn)來的連接。第二個(gè)經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會(huì)把連接信息注冊(cè)到‘worker’上。如何知道多少個(gè)線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的 Channel上都需要依賴于 EventLoopGroup 的實(shí)現(xiàn),并且可以通過構(gòu)造函數(shù)來配置他們的關(guān)系。

2.ServerBootstrap是一個(gè)啟動(dòng) NIO 服務(wù)的輔助啟動(dòng)類。你可以在這個(gè)服務(wù)中直接使用 Channel,但是這會(huì)是一個(gè)復(fù)雜的處理過程,在很多情況下你并不需要這樣做。

3.這里我們指定使用NioServerSocketChannel類來舉例說明一個(gè)新的 Channel 如何接收進(jìn)來的連接。

4.這里的事件處理類經(jīng)常會(huì)被用來處理一個(gè)最近的已經(jīng)接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個(gè)特殊的處理類,他的目的是幫助使用者配置一個(gè)新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個(gè)新的 Channel 或者其對(duì)應(yīng)的ChannelPipeline來實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。

5.你可以設(shè)置這里指定的 Channel 實(shí)現(xiàn)的配置參數(shù)。我們正在寫一個(gè)TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket 的參數(shù)選項(xiàng)比如tcpNoDelay 和 keepAlive。請(qǐng)參考ChannelOption和詳細(xì)的ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對(duì)ChannelOption 的有一個(gè)大概的認(rèn)識(shí)。

6.option() 是提供給NioServerSocketChannel用來接收進(jìn)來的連接。childOption() 是提供給由父管道ServerChannel接收到的連接,在這個(gè)例子中也是 NioServerSocketChannel。

7.我們繼續(xù),剩下的就是綁定端口然后啟動(dòng)服務(wù)。這里我們?cè)跈C(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind() 方法(基于不同綁定地址)。

恭喜!你已經(jīng)完成了基于 Netty 聊天服務(wù)端程序。

客戶端

SimpleChatClientHandler.java

客戶端的處理類比較簡(jiǎn)單,只需要將讀到的信息打印出來即可

public class SimpleChatClientHandler extends  SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(s);
    }
}

SimpleChatClientInitializer.java

與服務(wù)端類似

public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatClientHandler());
    }
}

SimpleChatClient.java

編寫一個(gè) main() 方法來啟動(dòng)客戶端。

public class SimpleChatClient {
    public static void main(String[] args) throws Exception{
            new SimpleChatClient("localhost", 8080).run();
        }

        private final String host;
        private final int port;

        public SimpleChatClient(String host, int port){
            this.host = host;
            this.port = port;
        }

        public void run() throws Exception{
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap  = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new SimpleChatClientInitializer());
                Channel channel = bootstrap.connect(host, port).sync().channel();
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                while(true){
                    channel.writeAndFlush(in.readLine() + "\r\n");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }

        }
    }

}

運(yùn)行效果

先運(yùn)行 SimpleChatServer,再可以運(yùn)行多個(gè) SimpleChatClient,控制臺(tái)輸入文本繼續(xù)測(cè)試

http://wiki.jikexueyuan.com/project/netty-4-user-guide/images/5.1.jpg" alt="" />

http://wiki.jikexueyuan.com/project/netty-4-user-guide/images/5.2.jpg" alt="" />

http://wiki.jikexueyuan.com/project/netty-4-user-guide/images/5.3.jpg" alt="" />

源碼

https://github.com/waylau/netty-4-user-guide-demos中 simplechat

參考

Netty 4.x 用戶指南https://github.com/waylau/netty-4-user-guide