资讯专栏INFORMATION COLUMN

moquette改造笔记(四):解决InterceptHandler中的onConnectionLo

joyqi / 1412人阅读

摘要:发现问题在使用中设备异常断开中的。在中事件都是在链中依次传递的。事件最后传递到。解决方法添加会导致调用两次解释会在该从链中移除掉时被调用,一般的话没有手动从链中删除时,会在连接断开后回调该方法。

发现问题

在使用中设备异常断开,InterceptHandler中的onConnectionLost()。经过调试发现是MoquetteIdleTimeoutHandler中的代码导致的,代码如下:

@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState e = ((IdleStateEvent) evt).state();
            if (e == IdleState.READER_IDLE) {
                LOG.info("Firing channel inactive event. MqttClientId = {}.", NettyUtils.clientID(ctx.channel()));
                // fire a channelInactive to trigger publish of Will
                ctx.fireChannelInactive();
                ctx.close().addListener(CLOSE_ON_FAILURE);
            }
        } 
        ......
    }

这部分代码的大致含义是:当在一段时间内没有收到任何数据后,就会调用触发ChannelInactive事件然后关掉连接。
在netty中事件都是在handler链中依次传递的。ChannelInactive事件最后传递到NettyMQTTHandler。处理逻辑如下:

  public void channelInactive(ChannelHandlerContext ctx) {
        String clientID = NettyUtils.clientID(ctx.channel());
        if (clientID != null && !clientID.isEmpty()) {
            LOG.info("N otifying connection lost event. MqttClientId = {}", clientID);
            m_processor.processConnectionLost(clientID, ctx.channel());
        }
        ctx.close().addListener(CLOSE_ON_FAILURE);
    }

如果条件成立,会调用一次m_processor.processConnectionLost(clientID, ctx.channel());这会导致InterceptHandler中的onConnectionLost()调用一次。因为连接紧接着又被关闭了,连接关闭同样会导致ChannelInactive事件,因此以上方法又会被触发一次,因此这样就会造成异常断开会调用两次onConnectionLost()。

解决方法

添加handlerRemove

 @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        /** modify by ljq 2018.6.11 会导致processConnectionLost调用两次*/
//        String clientID = NettyUtils.clientID(ctx.channel());
//        if (clientID != null && !clientID.isEmpty()) {
//            LOG.info("N otifying connection lost event. MqttClientId = {}", clientID);
//            m_processor.processConnectionLost(clientID, ctx.channel());
//        }
        ctx.close().addListener(CLOSE_ON_FAILURE);
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        String clientID = NettyUtils.clientID(ctx.channel());
        if (clientID != null && !clientID.isEmpty()) {
            LOG.info("Notifying connection lost event. MqttClientId = {}", clientID);
            m_processor.processConnectionLost(clientID, ctx.channel());
        }
    }

解释:
handler remove会在该handler从链中移除掉时被调用,一般的话没有手动从链中删除时,会在连接断开后回调该方法。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77222.html

相关文章

  • moquette改造笔记(三):优化BrokerInterceptor 中的线程池

    摘要:修改的实现,实现接口在改造笔记一整合到中修改实现,添加对的实现如下负载过大,处理不过来时,会回调该方法例如可以发生邮件通知相关人员改造笔记四解决中的调用两次 发现问题 在io.moquette.spi.impl.BrokerInterceptor的构造函数中,新建了一个线程池,代码如下: private BrokerInterceptor(int poolSize, List hand...

    cfanr 评论0 收藏0
  • moquette改造笔记(二):优化BrokerInterceptor notifyTopicPu

    摘要:优化逻辑优化方向向启动方法一样,每次调用的方法都是在线程池中新建一个任务具体代码解释新建一个用来实现调用方法。改造笔记三优化中的线程池 发现问题 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源码 @Override public void notifyClientConnected(final MqttConnectMes...

    liangzai_cool 评论0 收藏0
  • moquette改造笔记(一):整合到SpringBoot

    摘要:整合到本文更加注重代码实践,对于配置相关的知识会一笔带过,不做过多的详解。笔者是上传到私服,然后通过导入。接口是预留给开发者根据不同事件处理业务逻辑的接口。改造笔记二优化逻辑 Moquette简介 Mqtt作为物联网比较流行的协议现在已经被大范围使用,其中也有很多开源的MQTT BROKEN。Moquette是用java基于netty实现的轻量级的MQTT BROKEN. Moquet...

    young.li 评论0 收藏0
  • moquette改造笔记(五):设备连接频繁上下线或者相互顶替出现的设备上下线状态错乱问题

    摘要:发现问题在使用中发现在设备频繁上下线和两个设备一样相互顶替连接的情况下,的和的方法调用没有先后顺序,如果在这两个方法里面来记录设备上下线状态,会造成状态不对。因为相互顶替的情况并不多见,因此两个也可以接受,在性能上并不会造成多大影响。 发现问题 在moquette使用中发现在设备频繁上下线和两个设备ClientId一样相互顶替连接的情况下,InterceptHandler的onConn...

    betacat 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<