资讯专栏INFORMATION COLUMN

Netty-ChannelHandler-ChannelPipeline

warkiz / 484人阅读

摘要:只有在详尽的测试之后才应设置为这值使用的默认采样率检测并报告任何发现的泄漏。这是默认级别,适合绝大部分情况使用默认的采样率,报告所发现的任何的泄漏以及对应的消息被访问的位置类似于但是其将会对每次对消息的访问都进行采样。

ChannelHandler Channel生命周期
状态 描述
ChannelUnregistered Channel已经被创建,但未注册到EventLoop
ChannelRegistered Channel已经被注册到了EventLoop
ChannelActive Channel处于活动状态(已经连接到它的远程节点)。现在Channel可以接受和发送数据
ChannelInActive Channel没有连接到远程节点

一般Channel的生命周期顺序ChannelRegistered -> ChannelActive -> ChannelInactive -> ChannelUnregistered。

当Channel的状态发生变化时,将会生成对应的事件。与此同时,这些事件会被转发给ChannelPipeline中的ChannelHandler。

ChannelHandler生命周期

ChannelHandler定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用这些方法。这些方法中都可以接受一个ChannelHandlerContext参数

类型 描述
handlerAdded 当把ChannelHandler添加到ChannelPipeline中时被调用
handlerRemoved 当从ChannelHandler在ChannelPipeline移除时调用
exceptionCaught 当处理过程中在ChannelPipeline中有错误产生时被调用

Netty中定义了下面两个重要的ChannelHandler接口:

ChannelInboundHandler——处理入站数据以及各种状态变化

CHannelOutboundHandler——处理出站数据并且允许拦截所有的操作

ChanneInboundHandler接口
类型 描述
channelRegistered 当Channel已经注册到它的EventLoop并且能够处理I/O时被调用
channelUnregistered 当Channel从它的EventLoop注销并且无法处理任何I/O时被调用
channelActive 当Channel处于活动状态时被调用;Channel已经连接/绑定并且已经就绪
channelInactive 当Channel离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete 当Channel的一个读操作完成时被调用
channelRead 当从Channel读取数据时被调用
channelWritabilityChanged 当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成的太快(以避免发生OutOfMemoryError)或者可以在Channel变为再次可写时恢复写入。Channel的isWriteable()方法可以来检测Channel的可写性。与可写性相关的阀值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置
userEventTriggered 当ChannelInboundHandler.fireUserEventTriggered()方法被调用时被调用。

当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显示地释放与池化的ByteBuf实例相关的内存。ReferenceCountUtil.release()

Netty会使用WARN级别的日志消息记录未释放的资源,但是以这种方式管理资源可能非常繁琐,Netty采用SimpleChannelInboundHandler来简化这种操作。

ChannelOutboundHandler接口

出站操作和数据将由ChannelOutboundHandler处理。它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。

ChannelOutboundHandler可以按需推迟操作或者事件。

类型 描述
bind(ChannelHandlerContext, SockertAddress, ChannelPromise) 当请求将Channel绑定到本地地址时被调用
connect(ChannelHandlerContext, SocketAddress, SockertAddress, ChannelPromise) 当请求将Channel连接到远程节点时被调用
disconnect(ChannelHandlerContext, ChannelPromise) 当请求将Channel从远程节点断开时被调用
close(ChannelHandlerContext, ChannelPromise) 当请求关闭Channel时被调用
deregister(ChannelHandlerContext, ChannelPromise) 当请求将Channel丛它的EventLoop注销时被调用
read(ChannelHandlerContext) 当请求从Channel读取更多的数据时被调用
flush(ChannelHandlerContext) 当请求通过Channel将入队数据冲刷到远程节点时被调用
write(ChannelHandlerContext, Object, ChannelPromise) 当请求通过Channel将数据写到远程节点时被调用

ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,方便在操作完成时获取通知。ChannelPromise时ChannelFuture的一个子类,定义了一些可写方法,如setSuccess()和setFailure()方法

ChannelHandler适配器

ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler的方法。

ChannelHandlerAdapter提供了isSharable(),如果其对应的实现被注解标注为Sharable,这方法将返回true,表示它可以被添加到多个ChannelPipeline。

ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

资源管理

Netty目前定义了4种泄露检测级别:

级别 描述
DISABLED 禁用泄漏检测。只有在详尽的测试之后才应设置为这值
SIMPLE 使用1%的默认采样率检测并报告任何发现的泄漏。这是默认级别,适合绝大部分情况
ADVANCED 使用默认的采样率,报告所发现的任何的泄漏以及对应的消息被访问的位置
PARANOID 类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。会对性能有很大影响,只能在调试阶段使用

java -Dio.netty.leakDetectionLevel=ADVANCED

如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

ChannelPipeline接口

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另外一个ChannelPipeline,也不能分离当前的。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理。随后,会调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler。

ChannelHandlerContext使ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互。ChannelHandler可以通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline。

在ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个ChannelHandler的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(ChannelHandler可以同时实现ChannelInboundHandler和ChannelOutboundHandler接口

修改ChannelPipeline

ChannelHandler可以通过添加、删除或者替换其他的ChannelHandler来实时地修改ChannelPipeline的布局。

名称 描述
addFirst(),addBefore(),addAfter(),addLast() 将一个ChannelHandler添加到ChannelPipeline
remove() 将一个ChannelHandler从ChannelPipeline中移除
replace() 将ChannelPipeline中的一个ChannelHandler替换为另一个ChannelHandler
get() 通过类型或者名称返回ChannelHandler
context() 返回和ChannelHandler绑定的ChannelHandlerContext
names() 返回ChannelPipeline中所有的ChannelHandle的名称
触发事件

ChannelPipeline的API公开了用于调用入站和出站操作的附加方法,用于通知ChannelInboundHandler在ChannelPipeline中所发生的事件。

名称 描述
fireChannelRegistered 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered 调用ChannelPipeline中下一个ChannelInboundHandler的channelUnregistered(ChannelHandlerContext)方法
fireChannelActive 调用ChannelPipeline中下一个ChannelInboundHandler的channelActive(ChannelHandlerContext)
fireChannelInActive 调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive(ChannelHandlerContext)方法
fireExceptionCaught 调用ChannelPipeline中下一个ChannelInboundHandler的exceptionCaught(ChannelHandlertext, Throwable)方法
fireUserEventTriggerd 调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered(ChannelHandlertext, Object)方法
fireChannelRead 调用ChannelPipeline中下一个ChannelInboundHandler的channelRead(ChannelHandlertext, Object msg)方法
fireChannelReadComplete 调用ChannelPipeline中下一个ChannelInboundHandler的channelReadComplete(ChannelHandlertext)方法
fireChannelWritabilityChanged 调用ChannelPipeline中下一个ChannelInboundHandler的channelWritabilityChanged(ChannelHandlertext)方法

ChannelPipelin的出站操作

名称 描述
bind 将Channel绑定到一个本地地址,将调用ChannelPipeline中的下一个ChannelOutboundHandler的bind(ChannelHandlerContext,Socket,ChannelPromise)方法
connect 将Channel连接到一个远程地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的connect(ChannelHandlerContext,Socket,ChannelPromise)方法
disconnect 将Channel断开连接。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的disconnect(ChannelHandlerContext,Socket,ChannelPromise)方法
close 将Channel关闭。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的close(ChannelHandlerContext,ChannelPromise)方法
deregister 将Channel从它先前分配的EventExecutor(即EventLoop)中注销,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的deregister(ChannelHandlerContext,ChannelPromise)方法
flush 冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的flush(ChannelHandlerContext)方法
write 将消息写入Channel。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的write(ChannelContext,Object msg,ChannelPromise)方法。这并不会将消息写入底层的Socket,而只会将它放入到队列中。要将它写入到Socket,需要调用flush或者writeAndFlush方法
writeAndFlush 先调用write再调用flush的便利方法
read 请求从Channel中读取更多的数据。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的read(ChannelHandlerContext)方法

ChannelPipeline保存了与Channel相关联的ChannelHandler

ChannelPipeline可以根据需要,通过添加或者删除ChannelHandler来动态修改

ChannelPipeline有着丰富的API调用,以响应入站和出站事件

ChanneHandlerContext接口

ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联。每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。

ChannelHandlerContext有很多方法,其中一些方法也存在于Channel和ChannelPipeline本身上。如果调用Channel或者ChannelPipeline上的方法,它们将沿着整个ChannelPipeline进行传播,而调用ChannelHandlerContext上的相同方法,则从当前所关联的ChannelHandler开始,并且只会传播给位于该ChannelHandler的下一个能够处理该事件的ChannelHandler。

方法名称 描述
alloc 返回和这个实例相关的Channel所配置的ByteBufAllocator
bind 绑定到给定的SocketAddress,并返回ChannelFuture
channel 返回绑定到这个实例的Channel
close 关闭Channel,并返回ChannelFuture
connect 连接给定的SocketAddress,并返回ChannelFuture
deregister 从之前分配的EventExecutor注销,并返回ChannelFuture
disconnect 从远程节点断开,并返回ChannelFuture
executor 返回调度事件的EventExecutor
fireChannelActive 触发对下一个ChannelInboundHandler上的channelActive()方法的调用
fireChannelInActive 触发下一个ChannelInboundHandler上的channelInActive()方法
fireChannelRead 触发对下一个ChannelInboundHandler上的channelRead()方法
fireChannelReadComplete 触发对下一个ChannelInboundHandler上的channelReadComplete()方法的调用
fireChannelRegistered 触发对下一个ChanneInboundHandler上的fireChannelRegistered方法的调用
fireChannelUnregistered 触发对下一个ChannelInboundHandler上的fireChannelUnregistered方法的调用
fireChannelWritabilityChanged 触发对下一个ChannelInboundHandler上的fireChannelWritabilityChanged方法的调用
fireExceptionCaught 触发对下一个ChannelInboundHandler上的fireExceptionCaught方法的调用
fireUserEventTriggered 触发对下一个ChannelInboundHandler上的fireUserEventTriggered(Object evt)方法的调用
handler 返回绑定到这个实例的ChannelHandler
isRemoved 如果从关联的ChannelHandler已经被从ChannelPipeline中移除则返回true
name 返回这个实例的唯一名称
pipeline 返回这个实例相关联的ChannelPipeline
read 将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个channelRead事件,并(在最后一个消息被被读取完成后)通知ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
write 通过这个实例写入消息并经过ChannelPipeline
writeAndFlush 通过这个实例写入并冲刷消息并经过ChannelPipeline

ChannelHanlderContext和ChannelHandler之间的关联是永远不会改变的,所以缓存对它的引用是安全的

ChannelHandlerContext上的方法产生的事件流更短,应该利用这个特性尽可能的获得最大的性能

使用ChannelHandlerContext

让ChannePipeline从某个特定的点开始传播的原因:

为了减少将事件传经对它不感兴趣的ChannelHandler所带来的开销

为了减少将事件传经那些可能会对它感兴趣的ChannelHandler

要想调用从某个特定的ChannelHandler开始处理的过程,必须获取到在(ChannelPipeline)该ChannelHandler之前的hannelHandler所关联的ChannelHandlerContext。这个ChannelHandlerContext将调用和它所关联的ChannelHandler之后的ChannelHandler。

一个ChannelHandler可以从属于多个ChannelPipeline,所以它(同一个ChannelHandler)也可以绑定到多个ChannelHandlerContext实例。用于这种用法的ChannelHandler必须使用@Shareable注解标注;否则将它添加到多个ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

为何要共享一个ChannelHandler?
在多个ChannelPipeline中安装同一个ChannelHandler的一个常见原因是用于收集跨越多个Channel的统计信息。

异常处理 处理入站异常

如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。如果想要处理这种类型的入站异常,必须在你的ChannelInboundHandler中重写exceptionCaught(ChannelHandlerContext,Throwable)

异常会按照入站方向流动,所以一般处理异常的逻辑通常都在最后一个ChannelInboundHandler中实现

ChannelHandler.exceptionCaught()的默认实现是简单的将当前异常转发给ChannelPipeline中的下一个ChannelHandler

如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理

要想定义自定义的处理逻辑,你需要重写exceptionCaught()方法,然后决定是否将该异常传播出去。

处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制:

每个出站操作都返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了。

几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例。作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器。

添加ChannelFuturListener只需要调用ChannelFuture实例上的addListener(ChannelFutureListener)方法,并且有两种不同的方式去实现。第一种是调用出站操作(如write方法)所返回的ChanneFuture上的addListener方法;第二种方式是将ChannelFutureListener添加到即将作为参数传递给ChannelOutboundHandler的方法的ChannelPromise

通过调用ChannelPromise上的setSuccess和setFailure方法,可以使一个操作的状态在ChannelHandler的方法返回给其调用者时即刻被感知到。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69705.html

相关文章

  • 使用PHPWord合并Word文档,在文档指定页插入另一个文档的内容

    摘要:提示不支持文件的读取有一个客户有这样的需求,需要在里使用组件,把一个文档的内容,插入另一个文档的指定页内。由于两个文档的内容都不是固定的,所以不能使用的功能。当读取到指定的分页符之后,再读取的内容,跟着前面的内容插入,最后保存新的文档。 提示:不支持.doc文件的读取有一个客户有这样的需求,需要在ThinkPHP里使用PHPWord组件,把一个文档(DOC1)的内容,插入另一个文档(D...

    RobinTang 评论0 收藏0
  • Spring Cloud 参考文档(Hystrix超时和Ribbon客户端)

    摘要:要运行仪表板,请使用注解主类,然后访问并将仪表板指向客户端应用程序中的单个实例的端点。连接到使用的端点时,必须信任服务器使用的证书,如果证书不受信任,则必须将证书导入,以便仪表板成功连接到流端点。 Hystrix超时和Ribbon客户端 使用包装Ribbon客户端的Hystrix命令时,要确保将Hystrix超时配置为长于配置的Ribbon超时,包括可能进行的任何可能的重试,例如,如果...

    pf_miles 评论0 收藏0
  • CSS学习部分知识点记录

    摘要:整理一些最近几天学习的一些知识点,好记性不如烂笔头,写下来敲一遍代码为自己写哈。这点就不献丑了,也是才学习。脱离文档流的元素,其高度不再计算到高度内。 整理一些最近几天学习CSS的一些知识点,好记性不如烂笔头,写下来敲一遍代码为自己写哈。 左右两栏或三栏布局1、常用方法是给div加float浮动方式实现,加了浮动后div不再独占一行。 2、还有就是position属性实现,通过posi...

    maxmin 评论0 收藏0
  • CSS学习部分知识点记录

    摘要:整理一些最近几天学习的一些知识点,好记性不如烂笔头,写下来敲一遍代码为自己写哈。这点就不献丑了,也是才学习。脱离文档流的元素,其高度不再计算到高度内。 整理一些最近几天学习CSS的一些知识点,好记性不如烂笔头,写下来敲一遍代码为自己写哈。 左右两栏或三栏布局1、常用方法是给div加float浮动方式实现,加了浮动后div不再独占一行。 2、还有就是position属性实现,通过posi...

    lookSomeone 评论0 收藏0

发表评论

0条评论

warkiz

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<