资讯专栏INFORMATION COLUMN

RocketMQ源码学习(六)-Name Server

Joyven / 2498人阅读

摘要:完全无状态,可集群部署与集群中的其中一个节点随机选择建立长连接,定期从取路由信息,并向提供服务的建立长连接,且定时向发送心跳。既可以从订阅消息,也可以从订阅消息,订阅规则由配置决定。

问题列表:

Name Server 的作用是什么?

Name Server 存储了Broker的什么信息?

Name Server 为Producer的提供些什么信息?

Name Server 为Consuner的提供些什么信息?

Name Server 作用

Name Server在RocketMQ中犹如如它名字一样,是提供Broker发现服务的.

Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署

Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

Name Server 存储了Broker的什么信息?

RouteInfoManager

</>复制代码

  1. //主题信息
  2. private final HashMap> topicQueueTable;
  3. //broker信息
  4. private final HashMap brokerAddrTable;
  5. //集群信息
  6. private final HashMap> clusterAddrTable;
  7. //活跃broker信息
  8. private final HashMap brokerLiveTable;
  9. //过滤器信息
  10. private final HashMap/* Filter Server */> filterServerTable;

我们注意到保存broker的Map有两个,即brokerAddrTable用来保存所有的broker列表和brokerLiveTable用来保存当前活跃的broker列表,而BrokerData用来保存broker的主要新增,而BrokerLiveInfo只用来保存上次更新(心跳)时间,我们可以直接看看RouteInfoManager中扫描非活跃broker的方法:

</>复制代码

  1. public void scanNotActiveBroker() {
  2. Iterator> it = this.brokerLiveTable.entrySet().iterator();
  3. while (it.hasNext()) {
  4. Entry next = it.next();
  5. long last = next.getValue().getLastUpdateTimestamp();
  6. if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
  7. RemotingUtil.closeChannel(next.getValue().getChannel());
  8. it.remove();
  9. log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
  10. this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
  11. }
  12. }
  13. }

这个方法由在initialize的定时线程池加载,每十秒执行一次.可以看出,如果两分钟内都没收到一个broker的心跳数据,则直接将其从brokerLiveTable中移除,注意,这还会导致该broker从brokerAddrTable被删除,当然,如果该broker是Master,则它的所有Slave的broker都将被删除。具体细节可以参看RouteInfoManager的onChannelDestroy方法.

Name Server 为Producer的提供些什么信息?

</>复制代码

  1. HashMap> topicQueueTable;

</>复制代码

  1. private String brokerName; // broker的名称
  2. private int readQueueNums; // 读队列数量
  3. private int writeQueueNums; // 写队列数量
  4. private int perm; // 读写权限
  5. private int topicSynFlag; // 同步复制还是异步复制标记

NameServer 维护了key为topic,List的数据为Producer提供服务.返回TopicRouteData信息
RouteInfoManager.pickupTopicRouteData

</>复制代码

  1. public TopicRouteData pickupTopicRouteData(final String topic) {
  2. TopicRouteData topicRouteData = new TopicRouteData();
  3. boolean foundQueueData = false;
  4. boolean foundBrokerData = false;
  5. Set brokerNameSet = new HashSet();
  6. List brokerDataList = new LinkedList();
  7. topicRouteData.setBrokerDatas(brokerDataList);
  8. HashMap> filterServerMap = new HashMap>();
  9. topicRouteData.setFilterServerTable(filterServerMap);
  10. try {
  11. try {
  12. this.lock.readLock().lockInterruptibly();
  13. //根据topic获取QueueData信息
  14. List queueDataList = this.topicQueueTable.get(topic);
  15. if (queueDataList != null) {
  16. topicRouteData.setQueueDatas(queueDataList);
  17. foundQueueData = true;
  18. Iterator it = queueDataList.iterator();
  19. while (it.hasNext()) {
  20. QueueData qd = it.next();
  21. brokerNameSet.add(qd.getBrokerName());
  22. }
  23. for (String brokerName : brokerNameSet) {
  24. BrokerData brokerData = this.brokerAddrTable.get(brokerName);
  25. if (null != brokerData) {
  26. //根据broker名称获取其地址信息
  27. BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap) brokerData
  28. .getBrokerAddrs().clone());
  29. brokerDataList.add(brokerDataClone);
  30. foundBrokerData = true;
  31. for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
  32. List filterServerList = this.filterServerTable.get(brokerAddr);
  33. filterServerMap.put(brokerAddr, filterServerList);
  34. }
  35. }
  36. }
  37. }
  38. } finally {
  39. this.lock.readLock().unlock();
  40. }
  41. } catch (Exception e) {
  42. log.error("pickupTopicRouteData Exception", e);
  43. }
  44. if (log.isDebugEnabled()) {
  45. log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
  46. }
  47. if (foundBrokerData && foundQueueData) {
  48. return topicRouteData;
  49. }
  50. return null;
  51. }
Name Server 为Consuner的提供些什么信息?

Consumer需要哪些信息?

</>复制代码

  1. 1. Consumer需要的topic的broker信息
  2. 2. 每一个consumer group都有哪些consumer,对应的topic是谁

</>复制代码

  1. 1.如上节所述
  2. 2.此信息保存在Broker中
总结

Name Server比较简单,如同一个简单的web服务,提供配置信息,只不过CRUD的不是数据库而是json文件.

此次RocketMQ学习就告一段落了,只描述了我比较关心的流程,很多细节没能涉及到,有时间再写吧,如有疑问和错误请在评论中指出,thx!

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70362.html

相关文章

  • RocketMQ源码学习(一)-概述

    摘要:每个与集群中的所有节点建立长连接,定时注册信息到所有。完全无状态,可集群部署。本系列源码解析主要参照原理简介来追寻其代码实现虽然版本不太一致但这也是能找到的最详细的资料了接下来根据其模块来源码阅读目录如下 为什么选择读RocketMQ? 对MQ的理解一直不深,上周看了,还是觉得不够深入,找个成熟的产品来学习吧,RabbitMQ是erLang写的,Kafka是Scala写的,非Java写...

    godlong_X 评论0 收藏0
  • 写这么多系列博客,怪不得找不到女朋友

    摘要:前提好几周没更新博客了,对不断支持我博客的童鞋们说声抱歉了。熟悉我的人都知道我写博客的时间比较早,而且坚持的时间也比较久,一直到现在也是一直保持着更新状态。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好几周没更新博客了,对不断支持我博客的童鞋们说声:抱歉了!。自己这段时...

    JerryWangSAP 评论0 收藏0
  • 让你看懂的RocketMQ事务消息源码分析(干货)

    摘要:但是服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。所有的半消息都会写在为的半消息队列里,并且每条半消息,在整个链路里会被写多次,如果并发很大且大部分消息都是事务消息的话,可靠性会存在问题。 前言 得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是...

    zsirfs 评论0 收藏0
  • 一定能看懂的RocketMQ事务消息源码分析(干货)

    摘要:但是服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。既然消息的发送不能和本地事务写在一起,那如何来保证其整体具有原子性的需求呢答案就是今天我们介绍的主角事务消息。 前言 得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是经历了多年的双十一高并发挑战...

    myshell 评论0 收藏0

发表评论

0条评论

Joyven

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<