摘要:完全无状态,可集群部署与集群中的其中一个节点随机选择建立长连接,定期从取路由信息,并向提供服务的建立长连接,且定时向发送心跳。既可以从订阅消息,也可以从订阅消息,订阅规则由配置决定。
问题列表:
Name Server 的作用是什么?
Name Server 存储了Broker的什么信息?
Name Server 为Producer的提供些什么信息?
Name Server 为Consuner的提供些什么信息?
Name Server 作用Name Server在RocketMQ中犹如如它名字一样,是提供Broker发现服务的.
Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署
Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
Name Server 存储了Broker的什么信息?RouteInfoManager
</>复制代码
//主题信息
private final HashMap> topicQueueTable;
//broker信息
private final HashMap brokerAddrTable;
//集群信息
private final HashMap> clusterAddrTable;
//活跃broker信息
private final HashMap brokerLiveTable;
//过滤器信息
private final HashMap/* Filter Server */> filterServerTable;
我们注意到保存broker的Map有两个,即brokerAddrTable用来保存所有的broker列表和brokerLiveTable用来保存当前活跃的broker列表,而BrokerData用来保存broker的主要新增,而BrokerLiveInfo只用来保存上次更新(心跳)时间,我们可以直接看看RouteInfoManager中扫描非活跃broker的方法:
</>复制代码
public void scanNotActiveBroker() {
Iterator> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
这个方法由在initialize的定时线程池加载,每十秒执行一次.可以看出,如果两分钟内都没收到一个broker的心跳数据,则直接将其从brokerLiveTable中移除,注意,这还会导致该broker从brokerAddrTable被删除,当然,如果该broker是Master,则它的所有Slave的broker都将被删除。具体细节可以参看RouteInfoManager的onChannelDestroy方法.
Name Server 为Producer的提供些什么信息?</>复制代码
HashMap
> topicQueueTable;
</>复制代码
private String brokerName; // broker的名称
private int readQueueNums; // 读队列数量
private int writeQueueNums; // 写队列数量
private int perm; // 读写权限
private int topicSynFlag; // 同步复制还是异步复制标记
NameServer 维护了key为topic,List
RouteInfoManager.pickupTopicRouteData
</>复制代码
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set brokerNameSet = new HashSet();
List brokerDataList = new LinkedList();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap> filterServerMap = new HashMap>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
this.lock.readLock().lockInterruptibly();
//根据topic获取QueueData信息
List queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
//根据broker名称获取其地址信息
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
if (log.isDebugEnabled()) {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
}
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
Name Server 为Consuner的提供些什么信息?
Consumer需要哪些信息?
</>复制代码
1. Consumer需要的topic的broker信息
2. 每一个consumer group都有哪些consumer,对应的topic是谁
答
总结</>复制代码
1.如上节所述
2.此信息保存在Broker中
Name Server比较简单,如同一个简单的web服务,提供配置信息,只不过CRUD的不是数据库而是json文件.
此次RocketMQ学习就告一段落了,只描述了我比较关心的流程,很多细节没能涉及到,有时间再写吧,如有疑问和错误请在评论中指出,thx!
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70362.html
摘要:每个与集群中的所有节点建立长连接,定时注册信息到所有。完全无状态,可集群部署。本系列源码解析主要参照原理简介来追寻其代码实现虽然版本不太一致但这也是能找到的最详细的资料了接下来根据其模块来源码阅读目录如下 为什么选择读RocketMQ? 对MQ的理解一直不深,上周看了,还是觉得不够深入,找个成熟的产品来学习吧,RabbitMQ是erLang写的,Kafka是Scala写的,非Java写...
摘要:前提好几周没更新博客了,对不断支持我博客的童鞋们说声抱歉了。熟悉我的人都知道我写博客的时间比较早,而且坚持的时间也比较久,一直到现在也是一直保持着更新状态。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好几周没更新博客了,对不断支持我博客的童鞋们说声:抱歉了!。自己这段时...
摘要:但是服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。所有的半消息都会写在为的半消息队列里,并且每条半消息,在整个链路里会被写多次,如果并发很大且大部分消息都是事务消息的话,可靠性会存在问题。 前言 得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是...
摘要:但是服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。既然消息的发送不能和本地事务写在一起,那如何来保证其整体具有原子性的需求呢答案就是今天我们介绍的主角事务消息。 前言 得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是经历了多年的双十一高并发挑战...
阅读 741·2021-11-24 09:39
阅读 2447·2021-11-22 13:54
阅读 2282·2021-09-23 11:46
阅读 3334·2019-08-30 15:55
阅读 2756·2019-08-30 15:54
阅读 2493·2019-08-30 14:18
阅读 1614·2019-08-29 14:15
阅读 2819·2019-08-29 13:49
极致性价比!云服务器续费无忧!
Tesla A100/A800、Tesla V100S等多种GPU云主机特惠2折起,不限台数,续费同价。
NVIDIA RTX 40系,高性价比推理显卡,满足AI应用场景需要。
乌兰察布+上海青浦,满足东推西训AI场景需要