资讯专栏INFORMATION COLUMN

Netty学习-Echo服务器客户端

dreamGong / 1148人阅读

摘要:服务器构成至少一个该组件实现了服务器对从客户端接受的数据的处理,即它的业务逻辑引导配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上。需要注意的是,由服务器发送的消息可能会被分块接受。

Netty服务器构成

至少一个ChannelHandler——该组件实现了服务器对从客户端接受的数据的处理,即它的业务逻辑

引导——配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上。

ChannelHandler和业务逻辑

 ChannelHandler是一个接口族的父接口,它的实现负责接受并响应事件通知,在Netty应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中

 Echo服务器会响应传入的消息,因此需要实现ChannelInboundHandler接口,用来定义响应入站事件的方法。由于Echo服务器的应用程序只需要用到少量的方法,所以只需要继承ChannelInboundHandlerAdapter类,它提供了ChannelInboundHandler的默认实现。

 在ChannelInboundHandler中,我们感兴趣的方法有:

channelRead()——对于每个传入的消息都要调用

channelReadComplete()——通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息

execeptionCaught()——在读取操作期间,有异常抛出时会调用。

EchoServerHandler实现
package cn.sh.demo.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author sh
 * @ChannelHandler.Sharable 标示一个ChannelHandler可以被多个Channel安全地共享
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        //将接受到的消息输出到客户端
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //将接收到的消息写给发送者,而不冲刷出站消息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将消息冲刷到客户端,并且关闭该Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //打印异常堆栈跟踪
        cause.printStackTrace();
        //关闭该Channel
        ctx.close();
    }
}

备注

ChannelInbounHandlerAdapter每个方法都可以被重写然后挂钩到事件生命周期的恰当点上。

重写exceptionCaught()方法允许你对Throwable的任何子类型作出反应。

每个Channel都拥有一个与之关联的ChannelPipeline,ChannelPipeline持有一个ChannelHandler的实例链。在默认情况下,ChannelHandler会把对方法的调用转发给链中的下一个ChannelHandler。因此,如果exceptionCaught()方法没有被该链中的某处实现,那么异常将会被传递到ChannelPipeline的末端进行记录

引导服务器

主要涉及的内容

绑定监听并接受传入连接请求的端口

配置Channel,将有关的入站消息通知给EchoServerHandler实例

Echo服务引导示例代码

package cn.sh.demo.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void startServer() throws InterruptedException {
        EchoServerHandler serverHandler = new EchoServerHandler();
        //创建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        //创建ServerBootstrap
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                //指定所使用的NIO传输Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口套接字
                .localAddress(new InetSocketAddress(port))
                //添加一个EchoServerHandler到子Channel的ChannelPipeline
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //此处由于EchoServerHandler被注解标注为@Shareble,所以我们总是使用相同的实例
                        channel.pipeline().addLast(serverHandler);
                    }
                });
        try {
            //异步的绑定服务器,调用sync()方法阻塞等待直到绑定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            //获取Channel的CloseFuture,并且阻塞当前线程直到它完成
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭EventLoopGroup,释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1) {
            System.err.println("参数类型或者个数不正确");
            return;
        }
        //设置端口值
        int port = Integer.parseInt(args[0]);
        //启动Echo服务器
        new EchoServer(port).startServer();
    }
}

备注

此处使用了一个特殊的类——ChannelInitializer。当一个新的连接被接受时,一个新的子Channel会被创建,此时ChannelInitializer就会把一个EchoServerHandler的示例添加到该Channel的ChannelPipeline中,这个ChannelHandler将会收到有关入站消息的通知。

回顾引导服务

创建一个ServerBootStrap实例来引导和绑定服务器

创建并分配一个NioEventLoopGroup实例进行事件的处理,如接受新连接以及读/写数据

指定服务器绑定的本地InetSocketAddress

使用EchoServerHandler的实例初始化每一个新的Channel

调用ServerBootstrap.bind()方法来绑定服务器

Echo客户端

客户端主要包括的操作:

连接到服务器

发送一个或多个消息

对于每个消息,等待并接受从服务器发回相同的消息

关闭连接

编写客户端主要包括业务逻辑和引导

ChannelHandler实现客户端逻辑

在该示例中,我们使用SimpleChannelInboundHandler类来处理所有的事件,主要的方法有:

channelActive()——在和服务器的连接已经建立之后被调用

channelRead0()——当从服务器接收到一条消息时被调用

exceptionCaught()——在处理过程中引发异常时被调用

示例代码如下:

package cn.sh.demo.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author sh
 * @ChannelHandler.Sharable 标记该类的示例可以被多个Channel共享
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //当一个连接被服务器接受并建立后,发送一条消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        //记录客户端接收到服务器的消息
        System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在发生异常时,记录错误并关闭Channel
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

备注

 每次在接受数据时,都会调用channelRead0()方法。需要注意的是,由服务器发送的消息可能会被分块接受。也就是说,如果服务器发送了5字节,那么不能保证这5字节会被一次性接受。即使是对于这么少量的数据,channelRead0()方法也可能会被调用两次,第一次使用一个持有3字节的ByteBuf(Netty的字节容器),第二次使用一个持有2字节的ByteBuf。作为一个面向流的协议,TCP保证了字节数组会按照服务器发送它们的顺序被接受。

为什么客户端使用SimpleChannelInboundHandler而不是ChannelInboundHandlerAdapter?

主要和业务逻辑如何处理消息以及Netty如何管理资源有关

客户端中,当channelRead0()方法完成时,已经接受了消息并且处理完毕,当该方法返回时,SimpleChannelInboundHandler负责释放指向保存该消息的ByteBuf的内存引用

但是在服务器端,你需要将消息返回给客户端,write()操作是异步的,直到channelRead()方法返回后有可能仍然没有完成,ChannelInboundHandlerAdapter在这个时间点上不会释放消息。

服务端的消息是在channelComplete()方法中,通过writeAndFlush()方法调用时被释放

引导客户端

客户端使用主机和端口参数来连接远程地址,也就是Echo服务器的地址,而不是绑定到一个一直被监听的端口。

示例代码如下:

package cn.sh.demo.echo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoClient {

    private final String host;

    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        //创建客户端引导器
        Bootstrap bootstrap = new Bootstrap();
        //指定使用NioEventLoopGroup来处理客户端事件
        bootstrap.group(group)
                //指定使用NIO传输的Channel类型
                .channel(NioSocketChannel.class)
                //设置服务器的InetSocketAddress
                .remoteAddress(new InetSocketAddress(host, port))
                //在创建Channel时,向ChannelPipeline中添加一个EchoHandler实例
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new EchoClientHandler());
                    }
                });
        try {
            //连接到远程节点,阻塞等待直到连接完成
            ChannelFuture future = bootstrap.connect().sync();
            //阻塞直到Channel关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭线程池并且释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 2) {
            System.err.println("参数个数不正确");
            return;
        }
        int port = Integer.parseInt(args[1]);
        new EchoClient(args[0], port).start();
    }
}

备注

服务器和客户端均使用了NIO传输,但是,客户端和服务端可以使用不同的传输,例如,在服务器使用NIO传输,客户端可以使用OIO传输

回顾引导服务器

创建一个Bootstrap实例,引导并创建客户端

创建一个NioEventLoopGroup实例来进行事件处理,其中事件处理包括创建新的连接以及处理入站和出站数据

为服务器连接创建了一个InetSocketAddress实例

当连接建立时,一个EchoClientHandler实例会被添加到(该Channel)的ChannelPipeline中

设置完成后,调用Bootstrap.connetc()连接到远程节点

运行程序

启动服务端

再启动客户端

服务端的输出如下:

客户端的输出如下:

代码地址

该文章的示例代码位于cn.sh.demo.echo包下。

示例代码,点击查看

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69234.html

相关文章

  • Netty 4.1 源代码学习:线程模型

    摘要:前言本文以自带的示例工程为例,简要介绍线程模型示例工程的代码位于很简单,仅包含一个方法用于初始化以及,我们来看看其中和线程模型相关的一些代码在的初始化代码中实例化了两个对象和,它们有着公共基类,这个是线程模型的核心类名让人联想到组合模式, 前言 本文以 netty 4.1 自带的示例工程 netty-example 为例,简要介绍 netty 线程模型 EchoServer echo ...

    monw3c 评论0 收藏0
  • Netty 4.x 官方入门指南 [译]

    摘要:目前为止,我们已经完成了一半的工作,剩下的就是在方法中启动服务器。第一个通常被称为,负责接收已到达的。这两个指针恰好标记着数据的起始终止位置。 前言 本篇翻译自netty官方Get Start教程,一方面能把好的文章分享给各位,另一方面能巩固所学的知识。若有错误和遗漏,欢迎各位指出。 https://netty.io/wiki/user-gu... 面临的问题 我们一般使用专用软件或者...

    刘玉平 评论0 收藏0
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (户端)

    摘要:目录源码分析之番外篇的前生今世的前生今世之一简介的前生今世之二小结的前生今世之三详解的前生今世之四详解源码分析之零磨刀不误砍柴工源码分析环境搭建源码分析之一揭开神秘的红盖头源码分析之一揭开神秘的红盖头客户端源码分析之一揭开神秘的红盖头服务器 目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 ...

    zhaot 评论0 收藏0
  • netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统

    摘要:结构作为服务端作为序列化数据的协议前端通讯演示地址服务端实现启动类长连接示例主线程组从线程组请求的解码和编码把多个消息转换为一个单一的或是,原因是解码器会在每个消息中生成多个消息对象主要用于处理大数据流,比如一个大小的文件如果你直接传输肯定 结构 netty 作为服务端 protobuf 作为序列化数据的协议 websocket 前端通讯 演示 GitHub 地址 showImg(...

    wua_wua2012 评论0 收藏0
  • netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统

    摘要:结构作为服务端作为序列化数据的协议前端通讯演示地址服务端实现启动类长连接示例主线程组从线程组请求的解码和编码把多个消息转换为一个单一的或是,原因是解码器会在每个消息中生成多个消息对象主要用于处理大数据流,比如一个大小的文件如果你直接传输肯定 结构 netty 作为服务端 protobuf 作为序列化数据的协议 websocket 前端通讯 演示 GitHub 地址 showImg(...

    Shihira 评论0 收藏0

发表评论

0条评论

dreamGong

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<