资讯专栏INFORMATION COLUMN

moquette改造笔记(一):整合到SpringBoot

young.li / 2440人阅读

摘要:整合到本文更加注重代码实践,对于配置相关的知识会一笔带过,不做过多的详解。笔者是上传到私服,然后通过导入。接口是预留给开发者根据不同事件处理业务逻辑的接口。改造笔记二优化逻辑

Moquette简介

Mqtt作为物联网比较流行的协议现在已经被大范围使用,其中也有很多开源的MQTT BROKEN。Moquette是用java基于netty实现的轻量级的MQTT BROKEN. Moquette基于Netty实现,性能问题至少前期可以不用考虑,在使用过程中还算稳定,没有出现过较大的问题。github地址:https://github.com/andsel/moq...。

整合到SpringBoot

本文更加注重代码实践,对于配置相关的知识会一笔带过,不做过多的详解。
假设已经搭建好SpringBoot环境,下载完Moquette。至于怎么引用Moquette,可以在原项目上修改,也可以达成Jar包添加到lib调用,也可以上传到Maven私服后通过配置pom引用。笔者是上传到Maven私服,然后通过maven导入。

自定义包装类,实现io.moquette.server.Server的自动注入

@Slf4j
@Service
public class MoquetteServer {
    @Value("${mqtt-server.config-path}")
    private String configFilePath;
    @Autowired
    private IAuthorizator authorizator;

    /**
     * Safety相关的拦截器,如果有其它业务,可以再去实现一个拦截器处理其它业务
     */
    @Autowired
    @Qualifier("safetyInterceptHandler")
    private InterceptHandler safetyinterceptHandler;

    private Server mqttServer;

    public void startServer() throws IOException {
        IResourceLoader configFileResourceLoader = new ClasspathResourceLoader(configFilePath);
        final IConfig config = new ResourceLoaderConfig(configFileResourceLoader);

        mqttServer = new Server();

        /**添加处理Safety相关的拦截器,如果有其它业务,可以再去实现一个拦截器处理其它业务,然后也添加上即可*/
        List interceptHandlers = Arrays.asList(safetyinterceptHandler);
        /**
         * Authenticator 不显示设置,Server会默认以password_file创建一个ResourceAuthenticator
         * 如果需要更灵活的连接验证方案,可以继承IAuthenticator接口,自定义实现
         */
        mqttServer.startServer(config, interceptHandlers, null, null, authorizator);
    }


    public void stop() {
        mqttServer.stopServer();
    }
    
}

解释:
(1)添加@Service
(2)configFilePath是Moquette需要的moquette.conf的配置文件路径,笔者将配置文件放到了resouces目录下,在application.xml配置的路径,因此通过@Value自动注入路径值。
(3)因为是将配置文件放在了resources目录下,所以用Moquette提供的ClasspathResourceLoader加载的配置文件
(4)IAuthorizator 是权限验证接口
(5)InterceptHandler是Moquette订阅、发布、建立连接等相关事件的拦截回调业务处理逻辑接口

2.实现IAuthorizator接口

@Component
public class PermitAllAuthorizator implements IAuthorizator {
    @Override
    public boolean canWrite(Topic topic, String user, String client) {
        /**可以控制某个用户的client,是否具有发布某个主题的权限,目前默认任何Client可以发布任主题*/
        return true;
    }

    @Override
    public boolean canRead(Topic topic, String user, String client) {
        /**可以控制某个用户的client,是否具有接收某个主题的权限,目前默认任何Client可以接收任何主题*/
        return true;
    }
}

解释:实现另一个很简单的授权接口,允许任何用户所有的读写请求

3.实现InterceptHandler接口

@Slf4j
@Component
public class SafetyInterceptHandler extends AbstractInterceptHandler{

    @Override
    public String getID() {
        return SafetyInterceptHandler.class.getName();
    }

    @Override
    public void onConnect(InterceptConnectMessage msg) {
        
    }

    @Override
    public void onConnectionLost(InterceptConnectionLostMessage msg) {
       
    }


    @Override
    public void onPublish(InterceptPublishMessage msg) {
       
    }


    @Override
    public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) {
        
    }

}

解释:
1.简单实现InterceptHandler,继承自AbstractInterceptHandler,并重写了部分方法。可以根据业务需要实现不同的方法。InterceptHandler接口是Moquette预留给开发者根据不同事件处理业务逻辑的接口。

4.通过SpringBoot启动Moquette

@SpringBootApplication
public class MqttServiceApplication {

    public static void main(String[] args) throws IOException {
        SpringApplication application = new SpringApplication(MqttServiceApplication.class);
        final ApplicationContext context = application.run(args);
        MoquetteServer server = context.getBean(MoquetteServer.class);
        server.startServer();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                server.stop();
                log.info("Moquette Server stopped");
            }
        });
    }



}

如果发现MoquetteServer无法启动,是否是SpringBoot默认的包扫描机制的问题,可以通过@ComponentScan解决。

通过以上操作,就可以在任何想要使用MoquetteServer的地方,通过@Autowired自动注入。

当然在MoquetteServer中笔者只是简单实现了封装,并没有实现其它方法,读者完全可以根据自己的需要在MoquetteServer中实现自己需要的功能,但前提是你要对Moquette的源码熟悉。

moquette改造笔记(二):优化BrokerInterceptor notifyTopicPublished()逻辑

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77205.html

相关文章

  • moquette改造笔记(三):优化BrokerInterceptor 中的线程池

    摘要:修改的实现,实现接口在改造笔记一整合到中修改实现,添加对的实现如下负载过大,处理不过来时,会回调该方法例如可以发生邮件通知相关人员改造笔记四解决中的调用两次 发现问题 在io.moquette.spi.impl.BrokerInterceptor的构造函数中,新建了一个线程池,代码如下: private BrokerInterceptor(int poolSize, List hand...

    cfanr 评论0 收藏0
  • moquette改造笔记(四):解决InterceptHandler中的onConnectionLo

    摘要:发现问题在使用中设备异常断开中的。在中事件都是在链中依次传递的。事件最后传递到。解决方法添加会导致调用两次解释会在该从链中移除掉时被调用,一般的话没有手动从链中删除时,会在连接断开后回调该方法。 发现问题 在使用中设备异常断开,InterceptHandler中的onConnectionLost()。经过调试发现是MoquetteIdleTimeoutHandler中的代码导致的,代码...

    joyqi 评论0 收藏0
  • moquette改造笔记(二):优化BrokerInterceptor notifyTopicPu

    摘要:优化逻辑优化方向向启动方法一样,每次调用的方法都是在线程池中新建一个任务具体代码解释新建一个用来实现调用方法。改造笔记三优化中的线程池 发现问题 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源码 @Override public void notifyClientConnected(final MqttConnectMes...

    liangzai_cool 评论0 收藏0
  • moquette改造笔记(五):设备连接频繁上下线或者相互顶替出现的设备上下线状态错乱问题

    摘要:发现问题在使用中发现在设备频繁上下线和两个设备一样相互顶替连接的情况下,的和的方法调用没有先后顺序,如果在这两个方法里面来记录设备上下线状态,会造成状态不对。因为相互顶替的情况并不多见,因此两个也可以接受,在性能上并不会造成多大影响。 发现问题 在moquette使用中发现在设备频繁上下线和两个设备ClientId一样相互顶替连接的情况下,InterceptHandler的onConn...

    betacat 评论0 收藏0
  • 传统Http服务与SpringCloud微服务的整合

    摘要:没有显式的通常调用需要指定域名才能定位到某个服务器上的某个具体应用,而通过的注册中心,在调用时只需指定服务名称,注册中心会自动发现对应的具体服务。 起因 公司要做系统间的互通,所以需要程序之间互相调用接口,这块一直是其他同事在做,但是今天一个新项目需要调用到其他系统的接口,所以看了下他们的调用方法,发现都是传统的httpclient调用,外面做了一层封装,类似这样: HttpGet h...

    jsummer 评论0 收藏0

发表评论

0条评论

young.li

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<