资讯专栏INFORMATION COLUMN

一定能看懂的RocketMQ事务消息源码分析(干货)

myshell / 1068人阅读

摘要:但是服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。既然消息的发送不能和本地事务写在一起,那如何来保证其整体具有原子性的需求呢答案就是今天我们介绍的主角事务消息。

前言

得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是经历了多年的双十一高并发挑战,其中4.3.0版本推出了事务消息的新特性,本文对RocketMQ 4.5.0版本事务消息相关的源码跟踪介绍,通过阅读读者可以知道:

事务消息解决什么样的问题

事务消息的实现原理及其设计亮点

解决什么问题

假设我所在的系统现在有这样一个场景:

本地开启数据库事务进行扣款操作,成功后发送MQ消息给库存中心进行发货。

有人会想到开启mybatis事务实现,把本地事务和MQ消息放在一起不就行了吗?如果MQ发送成功,就提交事务,发送失败就回滚事务,整套操作一气呵成。

</>复制代码

  1. transaction{
  2. 扣款();
  3. boolean success = 发送MQ();
  4. if(success){
  5. commit();
  6. }else{
  7. rollBack();
  8. }
  9. }

看似没什么问题,但是网络是不可靠的。

假设MQ返回过来的响应因为网络原因迟迟没有收到,所以在面对不确定的MQ返回结果只好进行回滚。但是MQ 服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。

既然MQ消息的发送不能和本地事务写在一起,那如何来保证其整体具有原子性的需求呢?答案就是今天我们介绍的主角:事务消息

概览

总体而言RocketMQ事务消息分为两条主线

定时任务发送流程:发送half message(半消息),执行本地事务,发送事务执行结果

定时任务回查流程:MQ服务器回查本地事务,发送事务执行结果

因此本文也通过这两条主线对源码进行分析

源码分析 半消息发送流程 本地应用(client)

在本地应用发送事务消息的核心类是TransactionMQProducer,该类通过继承DefaultMQProducer来复用大部分发送消息相关的逻辑,这个类的代码量非常少只有100来行,下面是这个类的sendMessageTransaction方法

</>复制代码

  1. @Override
  2. public TransactionSendResult sendMessageInTransaction(final Message msg,
  3. final Object arg) throws MQClientException {
  4. if (null == this.transactionListener) {
  5. throw new MQClientException("TransactionListener is null", null);
  6. }
  7. return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
  8. }

这个方法做了两件事,

检查transactionListener是否存在

调用父类执行事务消息发送

TransactionListener在事务消息流程中起到至关重要的作用,一起看看这个接口

</>复制代码

  1. public interface TransactionListener {
  2. /**
  3. * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
  4. *
  5. * @param msg Half(prepare) message
  6. * @param arg Custom business parameter
  7. * @return Transaction state
  8. */
  9. LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
  10. /**
  11. * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
  12. * method will be invoked to get local transaction status.
  13. *
  14. * @param msg Check message
  15. * @return Transaction state
  16. */
  17. LocalTransactionState checkLocalTransaction(final MessageExt msg);
  18. }

接口注释说的很明白,配合上面的概览图来看就是,executeLocalTransaction方法对应的就是执行本地事务操作,checkLocalTransaction对应的就是回查本地事务操作。

下面是DefaultMQProducer类的sendMessageInTransaction方法源码

</>复制代码

  1. public TransactionSendResult sendMessageInTransaction(final Message msg,
  2. final LocalTransactionExecuter localTransactionExecuter, final Object arg)
  3. throws MQClientException {
  4. ...
  5. SendResult sendResult = null;
  6. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
  7. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
  8. ...
  9. sendResult = this.send(msg);
  10. ...
  11. switch (sendResult.getSendStatus()) {
  12. case SEND_OK: {
  13. ...
  14. localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
  15. ...
  16. break;
  17. case FLUSH_DISK_TIMEOUT:
  18. case FLUSH_SLAVE_TIMEOUT:
  19. case SLAVE_NOT_AVAILABLE:
  20. localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
  21. break;
  22. default:
  23. break;
  24. }
  25. ...
  26. this.endTransaction(sendResult, localTransactionState, localException);
  27. ...
  28. }

为了使源码的逻辑更加直观,笔者精简了核心代码。sendMessageInTransaction方法主要做了以下事情

给消息打上事务消息相关的标记,用于MQ服务端区分普通消息和事务消息

发送半消息(half message)

发送成功则由transactionListener执行本地事务

执行endTransaction方法,如果半消息发送失败本地事务执行失败告诉服务端是删除半消息,半消息发送成功本地事务执行成功则告诉服务端生效半消息。

发送半消息流程,Client端代码到这里差不多就结束了,接下来看看RocketMQ Server端是如何处理的

RocketMQ Server

Server在接收到消息过后会进行一些领域对象的转化和是否支持事务消息的权限校验,对理解事务消息用处不大,此处就省略对旁枝末节的介绍了。下面是TransactionalMessageBridge类处理half message的源码

</>复制代码

  1. public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
  2. return store.putMessage(parseHalfMessageInner(messageInner));
  3. }
  4. private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
  5. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
  6. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
  7. String.valueOf(msgInner.getQueueId()));
  8. msgInner.setSysFlag(
  9. MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
  10. msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
  11. msgInner.setQueueId(0);
  12. msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
  13. return msgInner;
  14. }

这两个方法主要做了以下事情:

</>复制代码

  1. public class Message implements Serializable {
  2. private static final long serialVersionUID = 8445773977080406428L;
  3. private String topic;
  4. private int flag;
  5. private Map properties;
  6. private byte[] body;
  7. private String transactionId;
  8. }

将消息的topic,queueId放进消息体自身的map里进行缓存

将消息的topic 设置为“RMQ_SYS_TRANS_OP_HALF_TOPIC”,queueId设置为0

将消息写入磁盘持久化

可以看到所有的事务半消息都会被放进同一个topic的同一个queue里面,通过对topic的区分,从而避免了半消息被consumer给消费到

Server将半消息持久化后然后会发送结果给我们本地的应用程序。到了这里Server端对半消息的处理就结束了,紧接着的是定时任务的登场。

定时任务回查流程 RocketMQ Server

定时任务是一个叫TransactionalMessageService类的线程,下面是该类的check方法

</>复制代码

  1. @Override
  2. public void check(long transactionTimeout, int transactionCheckMax,
  3. AbstractTransactionalMessageCheckListener listener) {
  4. ...
  5. if (!putBackHalfMsgQueue(msgExt, i)) {
  6. continue;
  7. }
  8. listener.resolveHalfMsg(msgExt);
  9. }
  10. ...
  11. }

check方法非常长,省略的代码大致都是对半消息进行过滤(如超过72小时的事务消息,就被算作过期),只保留符合条件的半消息对其进行回查。

其中很有意思的是putBackHalfMsgQueue方法,因为每次把半消息从磁盘拉到内存里进行处理都会对其属性进行改变(例如TRANSACTION_CHECK_TIMES,这是是否丢弃事务消息的关键信息),所以在发送回查消息之前需要对半消息再次放进磁盘。RocketMQ采取的方法是基于最新的物理偏移量重新写入,而不是对原有的半消息进行修改,其中的目的就是RocketMQ的存储设计采用顺序写,如果去修改消息 ,无法做到高性能。

下面是resolveHalfMsg方法,主要就是开启一个线程然后发送check消息。

</>复制代码

  1. public void resolveHalfMsg(final MessageExt msgExt) {
  2. executorService.execute(new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. sendCheckMessage(msgExt);
  7. } catch (Exception e) {
  8. LOGGER.error("Send check message error!", e);
  9. }
  10. }
  11. });
  12. }
本地应用(client)

下面是DefaultMQProducerImpl的checkTransactionState方法,是本地应用对回查消息的处理逻辑

</>复制代码

  1. @Override
  2. public void checkTransactionState(final String addr, final MessageExt msg,
  3. final CheckTransactionStateRequestHeader header) {
  4. Runnable request = new Runnable() {
  5. ...
  6. @Override
  7. public void run() {
  8. ...
  9. TransactionListener transactionListener = getCheckListener();
  10. ...
  11. localTransactionState = transactionListener.checkLocalTransaction(message);
  12. ...
  13. this.processTransactionState(
  14. localTransactionState,
  15. group,
  16. exception);
  17. }
  18. private void processTransactionState(
  19. ...
  20. DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
  21. 3000);
  22. ...
  23. }
  24. };
  25. this.checkExecutor.submit(request);
  26. }

精简代码逻辑后可以清晰的看到

开启一个线程来执行回查的逻辑

执行transactionListener的checkLocalTransaction方法来获取本地事务执行的结果

RocketMQ Server

RocketMQ 服务器在收到Client发过来的Commit消息后会

读出半消息——>恢复topic等原消息体的信息——>和普通消息一样再次写入磁盘——>删除之前的半消息

如果是Rollback消息则直接删除之前的半消息

到此,整条RocketMQ 事务消息的调用链就结束了

思考

1. 分布式事务等于事务消息吗?

两者并没有关系,事务消息仅仅保证本地事务和MQ消息发送形成整体的原子性,而投递到MQ服务器后,消费者是否能一定消费成功是无法保证的。

2. 源码设计上有什么亮点吗?

通过对整条链路源码的学习理解发现还是有不少亮点的

server端回查消息的发送,client端回查消息逻辑的处理,client端commit/rollback消息的提交都是用了异步进行,可以说能异步的地方都用了异步,通过异步+重试的方式保证了在分布式环境中即使短暂的网络状况不良好,也不会影响整体逻辑。

引入TransactionListener,真正做到了开闭原则以及依赖倒置原则,面向接口编程。整体扩展性做得非常好,使用者只需要编写自己的Listener就可以做到事务消息的发送,非常方便

TransactionMQProducer通过继承DefaultMQProducer极大地复用了关于发送消息相关的逻辑

3. 源码设计上有什么不足吗?

RocketMQ作为一款极其成功的消息中间件,要发现不足不是那么容易了,笔者谈几点看法

sendMessageIntransaction等事务相关的方法被划分在了DefaultMQProducer里面,从内聚的角度来说这是跟事务相关的发送消息方法应该被划分在TransactionMQProducer。

所有topic的半消息都会写在topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息队列里,并且每条半消息,在整个链路里会被写多次,如果并发很大且大部分消息都是事务消息的话,可靠性会存在问题。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75328.html

相关文章

  • 让你看懂的RocketMQ事务消息源码分析(干货)

    摘要:但是服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。所有的半消息都会写在为的半消息队列里,并且每条半消息,在整个链路里会被写多次,如果并发很大且大部分消息都是事务消息的话,可靠性会存在问题。 前言 得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是...

    zsirfs 评论0 收藏0
  • 新手也看懂消息队列其实很简单

    摘要:通过以上分析我们可以得出消息队列具有很好的削峰作用的功能即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 16k)。地址:https://github.com/Snailclimb... 本文内容思维导图:showImg(ht...

    Clect 评论0 收藏0
  • 后端经验

    摘要:在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁入队服务教程在它提出十多年后的今天,已经成为最重要的应用技术之一。随着编程经验的日积月累,越来越感觉到了解虚拟机相关要领的重要性。 JVM 源码分析之 Jstat 工具原理完全解读 http://click.aliyun.com/m/8315/ JVM 源码分析之 Jstat 工具原理完全解读 http:...

    i_garfileo 评论0 收藏0
  • 高并发异步解耦利器:RocketMQ究竟强在哪里?

    摘要:它是阿里巴巴于年开源的第三代分布式消息中间件。是一个分布式消息中间件,具有低延迟高性能和可靠性万亿级别的容量和灵活的可扩展性,它是阿里巴巴于年开源的第三代分布式消息中间件。上篇文章消息队列那么多,为什么建议深入了解下RabbitMQ?我们讲到了消息队列的发展史:并且详细介绍了RabbitMQ,其功能也是挺强大的,那么,为啥又要搞一个RocketMQ出来呢?是重复造轮子吗?本文我们就带大家来详...

    tainzhi 评论0 收藏0
  • 【备战春招/秋招系列】美团Java面经总结进阶篇 (附详解答案)

    摘要:我在前面的文章中也提到了应该怎么做自我介绍与项目介绍,详情可以查看这篇文章备战春招秋招系列初出茅庐的程序员该如何准备面试。因此基于事件消息对象驱动的业务架构可以是一系列流程。 showImg(https://user-gold-cdn.xitu.io/2018/11/14/16711ac29c2ae52c?w=928&h=531&f=png&s=798562); 一 消息队列MQ的...

    chengjianhua 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<