资讯专栏INFORMATION COLUMN

深度解析RocketMQ Topic的创建机制

gself / 1352人阅读

摘要:当接收到消息后,会在方法中调用方法,将的信息塞进缓存中,并且会定时发送心跳将发送给进行注册。这也说明了当用集群模式去创建时,集群里面每个的的数量相同,当用单个模式去创建时,每个的数量可以不一致。

</>复制代码

  1. 微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。
    老司机倾囊相授,带你一路进阶,来不及解释了快上车!

我还记得第一次使用rocketmq的时候,需要去控制台预先创建topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透rocketmq topic的创建机制。

topic在rocketmq的设计思想里,是作为同一个业务逻辑消息的组织形式,它仅仅是一个逻辑上的概念,而在一个topic下又包含若干个逻辑队列,即消息队列,消息内容实际是存放在队列中,而队列又存储在broker中,下面我用一张图来说明topic的存储模型:

其实rocketmq中存在两种不同的topic创建方式,一种是我刚刚说的预先创建,另一种是自动创建,下面我开车带大家从源码的角度来详细地解读这两种创建机制。

自动创建

默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic:

org.apache.rocketmq.common.MixAll:

</>复制代码

  1. // Will be created at broker when isAutoCreateTopicEnable
  2. public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

自动创建的开关配置在BrokerConfig中,通过autoCreateTopicEnable字段进行控制,

org.apache.rocketmq.common.BrokerConfig:

</>复制代码

  1. @ImportantField
  2. private boolean autoCreateTopicEnable = true;

在broker启动时,会调用TopicConfigManager的构造方法,autoCreateTopicEnable打开后,会将“TBW102”保存到topicConfigTable中:

org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:

</>复制代码

  1. // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
  2. if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
  3. String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
  4. TopicConfig topicConfig = new TopicConfig(topic);
  5. this.systemTopicList.add(topic);
  6. topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
  7. .getDefaultTopicQueueNums());
  8. topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
  9. .getDefaultTopicQueueNums());
  10. int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
  11. topicConfig.setPerm(perm);
  12. this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
  13. }

broker会通过发送心跳包将topicConfigTable的topic信息发送给nameserver,nameserver将topic信息注册到RouteInfoManager中。

继续看消息发送时是如何从nameserver获取topic的路由信息:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:

</>复制代码

  1. private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  2. TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  3. if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  4. this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  5. // 生产者第一次发送消息,topic在nameserver中并不存在
  6. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  7. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  8. }
  9. if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
  10. return topicPublishInfo;
  11. } else {
  12. // 第二次请求会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息
  13. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
  14. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  15. return topicPublishInfo;
  16. }
  17. }

如上方法,topic首次发送消息,此时并不能从namserver获取topic的路由信息,那么接下来会进行第二次请求namserver,这时会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息,此时的“TBW102”topic已经被broker默认注册到nameserver了:

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

</>复制代码

  1. if (isDefault && defaultMQProducer != null) {
  2. // 使用默认的“TBW102”topic获取路由信息
  3. topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
  4. if (topicRouteData != null) {
  5. for (QueueData data : topicRouteData.getQueueDatas()) {
  6. int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
  7. data.setReadQueueNums(queueNums);
  8. data.setWriteQueueNums(queueNums);
  9. }
  10. }
  11. }

如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

</>复制代码

  1. TopicRouteData old = this.topicRouteTable.get(topic);
  2. boolean changed = topicRouteDataIsChange(old, topicRouteData);
  3. if (!changed) {
  4. changed = this.isNeedUpdateTopicRouteInfo(topic);
  5. } else {
  6. log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
  7. }

从本地缓存中取出topic的路由信息,由于topic是第一次发送消息,这时本地并没有该topic的路由信息,所以对比该topic路由信息对比“TBW102”时changed为true,即有变化,进入以下逻辑:

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

</>复制代码

  1. // Update sub info
  2. {
  3. Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  4. Iterator> it = this.consumerTable.entrySet().iterator();
  5. while (it.hasNext()) {
  6. Entry entry = it.next();
  7. MQConsumerInner impl = entry.getValue();
  8. if (impl != null) {
  9. impl.updateTopicSubscribeInfo(topic, subscribeInfo);
  10. }
  11. }
  12. }

将“TBW102”topic路由信息构建TopicPublishInfo,并将用topic为key,TopicPublishInfo为value更新本地缓存,到这里就明白了,原来broker们千辛万苦创建“TBW102”topic并将其路由信息注册到nameserver,被新来的topic获取后立即用“TBW102”topic的路由信息构建出一个TopicPublishInfo并且据为己有,由于TopicPublishInfo的路由信息时默认“TBW102”topic,因此真正要发送消息的topic也会被负载发送到“TBW102”topic所在的broker中,这里我们可以将其称之为偷梁换柱的做法。

当broker接收到消息后,会在msgCheck方法中调用createTopicInSendMessageMethod方法,将topic的信息塞进topicConfigTable缓存中,并且broker会定时发送心跳将topicConfigTable发送给nameserver进行注册。

自动创建与消息发送时获取topic信息的时序图:

预先创建

其实这个叫预先创建似乎更加适合,即预先在broker中创建好topic的相关信息并注册到nameserver中,然后client端发送消息时直接从nameserver中获取topic的路由信息,但是手动创建从动作上来将更加形象通俗易懂,直接告诉你,你的topic信息需要在控制台上自己手动创建。

预先创建需要通过mqadmin提供的topic相关命令进行创建,执行:

</>复制代码

  1. ./mqadmin updateTopic

官方给出的各项参数如下:

</>复制代码

  1. usage: mqadmin updateTopic [-b ] [-c ] [-h] [-n ] [-o ] [-p ] [-r ] [-s ]
  2. -t [-u ] [-w ]
  3. -b,--brokerAddr create topic to which broker
  4. -c,--clusterName create topic to which cluster
  5. -h,--help Print help
  6. -n,--namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
  7. -o,--order set topic"s order(true|false
  8. -p,--perm set topic"s permission(2|4|6), intro[2:W 4:R; 6:RW]
  9. -r,--readQueueNums set read queue nums
  10. -s,--hasUnitSub has unit sub (true|false
  11. -t,--topic topic name
  12. -u,--unit is unit topic (true|false
  13. -w,--writeQueueNums set write queue nums

我们直接定位到其实现类执行命令的方法:

通过broker模式创建:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

</>复制代码

  1. // -b,--brokerAddr create topic to which broker
  2. if (commandLine.hasOption("b")) {
  3. String addr = commandLine.getOptionValue("b").trim();
  4. defaultMQAdminExt.start();
  5. defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  6. return;
  7. }

从commandLine命令行工具获取运行时-b参数重的broker的地址,defaultMQAdminExt是默认的rocketmq控制台执行的API,此时调用start方法,该方法创建了一个mqClientInstance,它封装了netty通信的细节,接着就是最重要的一步,调用createAndUpdateTopicConfig将topic配置信息发送到指定的broker上,完成topic的创建。

通过集群模式创建:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

</>复制代码

  1. // -c,--clusterName create topic to which cluster
  2. else if (commandLine.hasOption("c")) {
  3. String clusterName = commandLine.getOptionValue("c").trim();
  4. defaultMQAdminExt.start();
  5. Set masterSet =
  6. CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
  7. for (String addr : masterSet) {
  8. defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  9. System.out.printf("create topic to %s success.%n", addr);
  10. }
  11. return;
  12. }

通过集群模式创建与通过broker模式创建的逻辑大致相同,多了根据集群从nameserver获取集群下所有broker的master地址这个步骤,然后在循环发送topic信息到集群中的每个broker中,这个逻辑跟指定单个broker是一致的。

这也说明了当用集群模式去创建topic时,集群里面每个broker的queue的数量相同,当用单个broker模式去创建topic时,每个broker的queue数量可以不一致。

预先创建时序图:

何时需要预先创建Topic?

建议线下开启,线上关闭,不是我说的,是官方给出的建议:

rocketmq为什么要这么设计呢?经过一波源码深度解析后,我得到了我想要的答案:

根据上面的源码分析,我们得出,rocketmq在发送消息时,会先去获取topic的路由信息,如果topic是第一次发送消息,由于nameserver没有topic的路由信息,所以会再次以“TBW102”这个默认topic获取路由信息,假设broker都开启了自动创建开关,那么此时会获取所有broker的路由信息,消息的发送会根据负载算法选择其中一台Broker发送消息,消息到达broker后,发现本地没有该topic,会在创建该topic的信息塞进本地缓存中,同时会将topic路由信息注册到nameserver中,那么这样就会造成一个后果:以后所有该topic的消息,都将发送到这台broker上,如果该topic消息量非常大,会造成某个broker上负载过大,这样消息的存储就达不到负载均衡的目的了。

扫面下方二维码,关注「Java科代表」,开车带你临摹各种源码,来不及解释了快上车! 

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74017.html

相关文章

  • RocketMQ为什么要保证订阅关系一致性?

    摘要:微信公众号后端进阶,专注后端技术分享框架分布式中间件服务治理等等。 微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。 前段时间有个朋友向我提了一个问题,他说在搭建 RocketMQ 集群过程中遇到了关于消费订阅的问题,具体问题如下: showImg(https://segmentfault.com/img/remote/1460...

    gekylin 评论0 收藏0
  • 高并发异步解耦利器:RocketMQ究竟强在哪里?

    摘要:它是阿里巴巴于年开源的第三代分布式消息中间件。是一个分布式消息中间件,具有低延迟高性能和可靠性万亿级别的容量和灵活的可扩展性,它是阿里巴巴于年开源的第三代分布式消息中间件。上篇文章消息队列那么多,为什么建议深入了解下RabbitMQ?我们讲到了消息队列的发展史:并且详细介绍了RabbitMQ,其功能也是挺强大的,那么,为啥又要搞一个RocketMQ出来呢?是重复造轮子吗?本文我们就带大家来详...

    tainzhi 评论0 收藏0
  • rocketmq之producer解析

    摘要:所以基于目前的设计,建议关闭自动创建的功能,然后根据消息量的大小,手动创建。如果发送消息,返回结果超时,这种超时不会进行重试了如果是方法本身耗时超过,还未来得及调用发送消息,此时的超时也不会重试。 先来看下producer核心的类设计,如下图: showImg(http://pbdqyl9hh.bkt.clouddn.com/rocketmq/producer%E7%B1%BB%E5%...

    luodongseu 评论0 收藏0
  • RocketMq消息中间件介绍

    摘要:消息生产者,负责发消息到。消息消费者,负责从上拉取消息进行消费,消费完进行。集群部署端完全消费正常后在进行手动确认。消息发送成功后,服务器返回确认消息给生产者。根据本地事务执行的结果向发送提交或回滚消息。 RabbitMQerlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致RabbitMQ的性能急剧下降。...

    goji 评论0 收藏0

发表评论

0条评论

gself

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<