资讯专栏INFORMATION COLUMN

NIO之Reactor模式,Netty序章

gougoujiang / 2909人阅读

摘要:单线程模式流程服务器端的是一个线程对象,该线程会启动事件循环,并使用选择器来实现的多路复用。线程池分配一个线程给这个,即,将关注的事件以及对应的事件处理器注册到线程中。多线程模式将接受客户端的连接请求和与该客户端的通信分在了两个线程来完成。

Reactor模式

反应堆模式:“反应”器名字中”反应“的由来:

“反应”即“倒置”,“控制逆转”,具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示自己对某些事件感兴趣,有时间来了,具体事件处理程序通过事件处理器对某个指定的事件发生做出反应。

单线程Reactor模式流程:

①服务器端的Reactor是一个线程对象,该线程会启动事件循环,并使用Selector(选择器)来实现IO的多路复用。channel注册一个Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。

②客户端向服务器端发起一个连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ事件了。

③当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过SocketChannel的read()方法读取数据,此时read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。

④每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。

注意,Reactor的单线程模式的单线程主要是针对于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。

基于单线程反应器模式手写一个NIO通信

先简单介绍NIO中几个重要对象:
Selector

Selector的英文含义是“选择器”,也可以称为为“轮询代理器”、“事件订阅器”、“channel容器管理机”都行。

事件订阅和Channel管理: 应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。

Channels
通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据。

所有被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。

ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。

ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP:端口 到
服务器IP:端口的通信连接。

DatagramChannel:UDP 数据报文的监听通道。

通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

服务端处理器:

</>复制代码

  1. /**
  2. * 类说明:nio通信服务端处理器
  3. */
  4. public class NioServerHandle implements Runnable{
  5. private Selector selector;
  6. private ServerSocketChannel serverChannel;
  7. private volatile boolean started;
  8. /**
  9. * 构造方法
  10. * @param port 指定要监听的端口号
  11. */
  12. public NioServerHandle(int port) {
  13. try {
  14. selector = Selector.open();
  15. serverChannel = ServerSocketChannel.open();
  16. serverChannel.configureBlocking(false);
  17. serverChannel.socket().bind(new InetSocketAddress(port));
  18. serverChannel.register(selector,SelectionKey.OP_ACCEPT);
  19. started = true;
  20. System.out.println("服务器已启动,端口号:"+port);
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public void stop(){
  26. started = false;
  27. }
  28. @Override
  29. public void run() {
  30. //循环遍历selector
  31. while(started){
  32. try{
  33. //阻塞,只有当至少一个注册的事件发生的时候才会继续.
  34. selector.select();
  35. Set keys = selector.selectedKeys();
  36. Iterator it = keys.iterator();
  37. SelectionKey key = null;
  38. while(it.hasNext()){
  39. key = it.next();
  40. it.remove();
  41. try{
  42. handleInput(key);
  43. }catch(Exception e){
  44. if(key != null){
  45. key.cancel();
  46. if(key.channel() != null){
  47. key.channel().close();
  48. }
  49. }
  50. }
  51. }
  52. }catch(Throwable t){
  53. t.printStackTrace();
  54. }
  55. }
  56. //selector关闭后会自动释放里面管理的资源
  57. if(selector != null)
  58. try{
  59. selector.close();
  60. }catch (Exception e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. private void handleInput(SelectionKey key) throws IOException{
  65. if(key.isValid()){
  66. //处理新接入的请求消息
  67. if(key.isAcceptable()){
  68. //获得关心当前事件的channel
  69. ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
  70. //通过ServerSocketChannel的accept创建SocketChannel实例
  71. //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
  72. SocketChannel sc = ssc.accept();
  73. System.out.println("======socket channel 建立连接" );
  74. //设置为非阻塞的
  75. sc.configureBlocking(false);
  76. //连接已经完成了,可以开始关心读事件了
  77. sc.register(selector,SelectionKey.OP_READ);
  78. }
  79. //读消息
  80. if(key.isReadable()){
  81. System.out.println("======socket channel 数据准备完成," +
  82. "可以去读==读取=======");
  83. SocketChannel sc = (SocketChannel) key.channel();
  84. //创建ByteBuffer,并开辟一个1M的缓冲区
  85. ByteBuffer buffer = ByteBuffer.allocate(1024);
  86. //读取请求码流,返回读取到的字节数
  87. int readBytes = sc.read(buffer);
  88. //读取到字节,对字节进行编解码
  89. if(readBytes>0){
  90. //将缓冲区当前的limit设置为position=0,
  91. // 用于后续对缓冲区的读取操作
  92. buffer.flip();
  93. //根据缓冲区可读字节数创建字节数组
  94. byte[] bytes = new byte[buffer.remaining()];
  95. //将缓冲区可读字节数组复制到新建的数组中
  96. buffer.get(bytes);
  97. String message = new String(bytes,"UTF-8");
  98. System.out.println("服务器收到消息:" + message);
  99. //处理数据
  100. String result = response(message) ;
  101. //发送应答消息
  102. doWrite(sc,result);
  103. }
  104. //链路已经关闭,释放资源
  105. else if(readBytes<0){
  106. key.cancel();
  107. sc.close();
  108. }
  109. }
  110. }
  111. }
  112. //发送应答消息
  113. private void doWrite(SocketChannel channel,String response)
  114. throws IOException {
  115. //将消息编码为字节数组
  116. byte[] bytes = response.getBytes();
  117. //根据数组容量创建ByteBuffer
  118. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  119. //将字节数组复制到缓冲区
  120. writeBuffer.put(bytes);
  121. //flip操作
  122. writeBuffer.flip();
  123. //发送缓冲区的字节数组
  124. channel.write(writeBuffer);
  125. }
  126. }
  127. public class NioServer {
  128. private static NioServerHandle nioServerHandle;
  129. public static void start(){
  130. if(nioServerHandle !=null)
  131. nioServerHandle.stop();
  132. nioServerHandle = new NioServerHandle(DEFAULT_PORT);
  133. new Thread(nioServerHandle,"Server").start();
  134. }
  135. public static void main(String[] args){
  136. start();
  137. }
  138. }

客户端处理器

</>复制代码

  1. public class NioClientHandle implements Runnable{
  2. private String host;
  3. private int port;
  4. private Selector selector;
  5. private SocketChannel socketChannel;
  6. private volatile boolean started;
  7. public NioClientHandle(String ip, int port) {
  8. this.host = ip;
  9. this.port = port;
  10. try {
  11. //创建选择器
  12. selector = Selector.open();
  13. //打开通道
  14. socketChannel = SocketChannel.open();
  15. //如果为 true,则此通道将被置于阻塞模式;
  16. // 如果为 false,则此通道将被置于非阻塞模式
  17. socketChannel.configureBlocking(false);
  18. started = true;
  19. } catch (IOException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. public void stop(){
  24. started = false;
  25. }
  26. @Override
  27. public void run() {
  28. try {
  29. doConnect();
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. System.exit(1);
  33. }
  34. //循环遍历selector
  35. while(started){
  36. try {
  37. //阻塞,只有当至少一个注册的事件发生的时候才会继续
  38. selector.select();
  39. //获取当前有哪些事件可以使用
  40. Set keys = selector.selectedKeys();
  41. //转换为迭代器
  42. Iterator it = keys.iterator();
  43. SelectionKey key = null;
  44. while(it.hasNext()){
  45. key = it.next();
  46. it.remove();
  47. try {
  48. handleInput(key);
  49. } catch (IOException e) {
  50. e.printStackTrace();
  51. if(key!=null){
  52. key.cancel();
  53. if(key.channel()!=null){
  54. key.channel().close();
  55. }
  56. }
  57. }
  58. }
  59. } catch (IOException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. //selector关闭后会自动释放里面管理的资源
  64. if(selector!=null){
  65. try {
  66. selector.close();
  67. } catch (IOException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }
  72. //具体的事件处理方法
  73. private void handleInput(SelectionKey key) throws IOException{
  74. if(key.isValid()){
  75. //获得关心当前事件的channel
  76. SocketChannel sc = (SocketChannel)key.channel();
  77. if(key.isConnectable()){//连接事件
  78. if(sc.finishConnect()){}
  79. else{System.exit(1);}
  80. }
  81. //有数据可读事件
  82. if(key.isReadable()){
  83. //创建ByteBuffer,并开辟一个1M的缓冲区
  84. ByteBuffer buffer = ByteBuffer.allocate(1024);
  85. //读取请求码流,返回读取到的字节数
  86. int readBytes = sc.read(buffer);
  87. //读取到字节,对字节进行编解码
  88. if(readBytes>0){
  89. //将缓冲区当前的limit设置为position,position=0,
  90. // 用于后续对缓冲区的读取操作
  91. buffer.flip();
  92. //根据缓冲区可读字节数创建字节数组
  93. byte[] bytes = new byte[buffer.remaining()];
  94. //将缓冲区可读字节数组复制到新建的数组中
  95. buffer.get(bytes);
  96. String result = new String(bytes,"UTF-8");
  97. System.out.println("accept message:"+result);
  98. }else if(readBytes<0){
  99. key.cancel();
  100. sc.close();
  101. }
  102. }
  103. }
  104. }
  105. //发送消息
  106. private void doWrite(SocketChannel channel,String request)
  107. throws IOException {
  108. //将消息编码为字节数组
  109. byte[] bytes = request.getBytes();
  110. //根据数组容量创建ByteBuffer
  111. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  112. //将字节数组复制到缓冲区
  113. writeBuffer.put(bytes);
  114. //flip操作
  115. writeBuffer.flip();
  116. //发送缓冲区的字节数组
  117. channel.write(writeBuffer);
  118. }
  119. private void doConnect() throws IOException {
  120. /*如果此通道处于非阻塞模式,
  121. 则调用此方法将启动非阻塞连接操作。
  122. 如果立即建立连接,就像本地连接可能发生的那样,则此方法返回true
  123. 否则,此方法返回false
  124. 稍后必须通过调用finishConnect方法完成连接操作。*/
  125. if(socketChannel.connect(new InetSocketAddress(host,port))){}
  126. else{
  127. //连接还未完成,所以注册连接就绪事件,向selector表示关注这个事件
  128. socketChannel.register(selector,SelectionKey.OP_CONNECT);
  129. }
  130. }
  131. //写数据对外暴露的API
  132. public void sendMsg(String msg) throws Exception{
  133. socketChannel.register(selector,SelectionKey.OP_READ);
  134. doWrite(socketChannel,msg);
  135. }
  136. }
  137. public class NioClient {
  138. private static NioClientHandle nioClientHandle;
  139. public static void start(){
  140. if(nioClientHandle !=null)
  141. nioClientHandle.stop();
  142. nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
  143. new Thread(nioClientHandle,"Client").start();
  144. }
  145. //向服务器发送消息
  146. public static boolean sendMsg(String msg) throws Exception{
  147. nioClientHandle.sendMsg(msg);
  148. return true;
  149. }
  150. public static void main(String[] args) throws Exception {
  151. start();
  152. System.out.println("请输入请求信息:");
  153. Scanner scanner = new Scanner(System.in);
  154. while(NioClient.sendMsg(scanner.next()));
  155. }
  156. }

服务端过程:

启动服务端,完成一些初始化工作,ServerSocketChannel绑定端口并且注册接受连接事件.

循环里selector.select()阻塞,只有当至少一个注册的事件发生的时候才会继续,循环里面处理发生的注册事件

注册事件发生时交给处理器,若为接受连接则accept取出socketChannel并完成连接,然后就是关注read读取事件即注册,有数据读取了则处理器读取请求数据并返回.

客户端过程:

启动客户端,完成一些初始化工作.

根据服务端ip及端口发起连接.

往服务端发送数据,并注册read读取事件

循环里selector.select()阻塞,只有当至少一个注册的事件发生的时候才会继续,循环里面处理发生的注册事件.

注册事件发生时交给处理器,若为连接事件并且连接成功则跳过即不予处理等待读取事件发送.

初始化工作如打开selector,channel,设置通道模式是否阻塞.

单线程Reactor,工作者线程池

但在单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。所以我们应该将非I/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应.

添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理。

改进的版本中,所以的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作。
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:

① 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送;

②当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

多Reactor线程模式

Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的事件循环逻辑。
mainReactor可以只有一个,但subReactor一般会有多个。mainReactor线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信。

流程:

①注册一个Acceptor事件处理器到mainReactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样mainReactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。启动mainReactor的事件循环。

②客户端向服务器端发起一个连接请求,mainReactor监听到了该ACCEPT事件并将该ACCEPT事件派发给Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个SocketChannel传递给subReactor线程池。

③subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事件以及对应的READ事件处理器注册到subReactor线程中。当然你也注册WRITE事件以及WRITE事件处理器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的循环逻辑。

④当有I/O事件就绪时,相关的subReactor就将事件派发给响应的处理器处理。注意,这里subReactor线程只负责完成I/O的read()操作,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后需要返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。

注意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)依旧还是在Reactor线程(mainReactor线程 或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处理非I/O操作的逻辑。
多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。

Netty服务端使用了“多Reactor线程模式”

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77728.html

相关文章

  • Netty 源码分析 三 我就是大名鼎鼎的 EventLoop(一)

    摘要:目录源码之下无秘密做最好的源码分析教程源码分析之番外篇的前生今世的前生今世之一简介的前生今世之二小结的前生今世之三详解的前生今世之四详解源码分析之零磨刀不误砍柴工源码分析环境搭建源码分析之一揭开神秘的红盖头源码分析之一揭开神秘的红盖头客户端 目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NI...

    livem 评论0 收藏0
  • netty学习总结(一)

    摘要:是什么是一个异步的,事件驱动的网络编程框架。责任链模式通过将组装起来,通过向里添加来监听处理发生的事件。相比于的的不仅易用,而且还支持自动扩容。入站入站事件一般是由外部触发的,如收到数据。 netty是什么? netty是一个异步的,事件驱动的网络编程框架。 netty的技术基础 netty是对Java NIO和Java线程池技术的封装 netty解决了什么问题 使用Java IO进行...

    CntChen 评论0 收藏0
  • 彻底理解Netty,这一篇文章就够了

    摘要:如果什么事都没得做,它也不会死循环,它会将线程休眠起来,直到下一个事件来了再继续干活,这样的一个线程称之为线程。而请求处理逻辑既可以使用单独的线程池进行处理,也可以跟放在读写线程一块处理。 Netty到底是什么 从HTTP说起 有了Netty,你可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的P...

    yy13818512006 评论0 收藏0
  • Netty序章BIO NIO AIO演变

    摘要:后改良为用线程池的方式代替新增线程,被称为伪异步。最大的问题是阻塞,同步。每次请求都由程序执行并返回,这是同步的缺陷。这些都会被注册在多路复用器上。多路复用器提供选择已经就绪状态任务的能力。并没有采用的多路复用器,而是使用异步通道的概念。 Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠的网络服务器和客户端程序。Netty简化了网络程序的开发,是很多框架和公司...

    VincentFF 评论0 收藏0
  • Netty序章BIO NIO AIO演变

    摘要:后改良为用线程池的方式代替新增线程,被称为伪异步。最大的问题是阻塞,同步。每次请求都由程序执行并返回,这是同步的缺陷。这些都会被注册在多路复用器上。多路复用器提供选择已经就绪状态任务的能力。并没有采用的多路复用器,而是使用异步通道的概念。 Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠的网络服务器和客户端程序。Netty简化了网络程序的开发,是很多框架和公司...

    CntChen 评论0 收藏0

发表评论

0条评论

gougoujiang

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<