资讯专栏INFORMATION COLUMN

moquette改造笔记(五):设备连接频繁上下线或者相互顶替出现的设备上下线状态错乱问题

betacat / 2918人阅读

摘要:发现问题在使用中发现在设备频繁上下线和两个设备一样相互顶替连接的情况下,的和的方法调用没有先后顺序,如果在这两个方法里面来记录设备上下线状态,会造成状态不对。因为相互顶替的情况并不多见,因此两个也可以接受,在性能上并不会造成多大影响。

发现问题

在moquette使用中发现在设备频繁上下线和两个设备ClientId一样相互顶替连接的情况下,InterceptHandler的onConnect和onConnectionLost的方法调用没有先后顺序,如果在这两个方法里面来记录设备上下线状态,会造成状态不对。

io.moquette.spi.impl.ProtocolProcessor中的processConnect(Channel channel, MqttConnectMessage msg)部分代码如下

ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }

        initializeKeepAliveTimeout(channel, msg, clientId);
        storeWillMessage(msg, clientId);
        if (!sendAck(descriptor, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        m_interceptor.notifyClientConnected(msg);

可以看到existing.abort();后会m_interceptor.notifyClientConnected(msg); 先断开原来的连接,然后接着通知上线。由于Netty本身就是异步的,再加上InterceptHandler相关方法的调用都是在线程池中进行的,因此nterceptHandler的onConnect和onConnectionLost的方法调用先后顺序是无法保证的

解决方法

在ChannelHandler链中添加一个handler,专门处理设备上线事件,对于相同ClientId的连接已经存在时,连接断开和连接事件强制加上时序。

@Sharable
public class AbrotExistConnectionMqttHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(AbrotExistConnectionMqttHandler.class);
    private final ProtocolProcessor m_processor;

    private static final ReentrantLock[] locks = new ReentrantLock[8];

    static {
        for (int i = 0; i < locks.length; i++) {
            locks[i] = new ReentrantLock();
        }
    }

    public AbrotExistConnectionMqttHandler(ProtocolProcessor m_processor) {
        this.m_processor = m_processor;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
        MqttMessage msg = (MqttMessage) message;
        MqttMessageType messageType = msg.fixedHeader().messageType();
        LOG.debug("Processing MQTT message, type: {}", messageType);

        if (messageType != MqttMessageType.CONNECT) {
            super.channelRead(ctx, message);
            return;
        }

        MqttConnectMessage connectMessage = (MqttConnectMessage) msg;
        String clientId = connectMessage.payload().clientIdentifier();

        /**
         * 通过锁和sleep来解决设备互顶出现的设备上线和下线回调时序错乱的问题
         * 目前解决的方式通过sleep不是太好
         * 解决了多个连接互相顶替出现的问题(有一个连接先连接的情况)
         * */
        ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];
        lock.lock();
        try {
            if (!m_processor.isConnected(clientId)) {
                super.channelRead(ctx, message);
                return;
            }
            m_processor.abortIfExist(clientId);
            Thread.sleep(50);
            super.channelRead(ctx, message);
            Thread.sleep(30);
        } catch (Exception ex) {
            ex.printStackTrace();
            super.channelRead(ctx, message);
        } finally {
            lock.unlock();
        }

    }
}

解释:
1.通过ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];来保证相同的ClientId的连接都会获得同一个锁
2.通过两次Thread.sleep(50);将断开连接和处理设备上线变成先后顺序关系。
3.因为相互顶替的情况并不多见,因此两个Thread.sleep()也可以接受,在性能上并不会造成多大影响。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77220.html

相关文章

  • moquette改造笔记(一):整合到SpringBoot

    摘要:整合到本文更加注重代码实践,对于配置相关的知识会一笔带过,不做过多的详解。笔者是上传到私服,然后通过导入。接口是预留给开发者根据不同事件处理业务逻辑的接口。改造笔记二优化逻辑 Moquette简介 Mqtt作为物联网比较流行的协议现在已经被大范围使用,其中也有很多开源的MQTT BROKEN。Moquette是用java基于netty实现的轻量级的MQTT BROKEN. Moquet...

    young.li 评论0 收藏0
  • 程序员笔记|详解Eureka 缓存机制

    摘要:和二级缓存影响状态更新,缩短这两个定时任务周期可减少滞后时间,例如配置更新周期更新周期服务提供者保证服务正常下线。服务提供者延迟下线。 引言 Eureka是Netflix开源的、用于实现服务注册和发现的服务。Spring Cloud Eureka基于Eureka进行二次封装,增加了更人性化的UI,使用更为方便。但是由于Eureka本身存在较多缓存,服务状态更新滞后,最常见的状况是:服务...

    mgckid 评论0 收藏0
  • 网关实现灰度发布

    摘要:就是一种灰度发布方式,让一部分用户继续用,一部分用户开始用,如果用户对没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现调整问题,以保证其影响度。 一、背景互联网产品开发有个非常特别的地方,就是不停的升级,升级,再升级。采用敏捷开发的方式,基本上保持每周或者每两周一次的发布频率,系统升级总是伴随着各种风险,新旧版本兼...

    stormjun 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<