资讯专栏INFORMATION COLUMN

RocketMQ源码学习(四)-Consumer

周国辉 / 1453人阅读

摘要:的都是从消息来消费,但是为了能做到实时收消息,使用长轮询方式,可以保证消息实时性同方式一致。这种情况建议应用,再消费下一条消息,这样可以减轻重试消息的压力。逻辑请求按参数返回按照重置消费从而实现回溯消费

这次源码学习的方法是带着问题学习源码实现,问题列表如下

Consumer Group的概念是什么?

Consumer pull过程是怎样的?

Consumer 支持push吗?

Consumer 怎么实现单队列并行消费?

Consumer 怎么过滤消息?

Consumer 怎么保证一条消息只被Group中的一个服务消费?

Consumer 负载均衡怎么实现?

Consumer 消费失败怎么办?

Consumer 可以回溯消费吗?

Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

Consumer Group的概念是什么?

一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。一般情况下group中Consumer的数量不能超过订阅的topic中queue的数量,不然会有闲置的Consumer.

Consumer pull过程是怎样的?

分析过Producer,看Consumer有种似曾相识的感觉

主要逻辑

1. 根据mq信息去找broker路由信息
2. 根据相关参数构建请求头
3. 委托netty去broker获取消息

代码走读

MQPullConsumer.pull的参数需指定MessageQueue,和offset(位置偏移)的.

    PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

再看Pull操作的返回,有本次获取的数据信息MessageExt,即位置信息offset

public class PullResult {
    //pull状态
    private final PullStatus pullStatus;
    //下次pull的偏移量
    private final long nextBeginOffset;
    //最小偏移量
    private final long minOffset;
    //最大偏移量
    private final long maxOffset;
    //获取到的消息
    private List msgFoundList;
}

MQPullConsumer.pull
-> DefaultMQPullConsumer.pull
-> DefaultMQPullConsumerImpl.pull
-> DefaultMQPullConsumerImpl.pullSyncImpl
-> DefaultMQPullConsumerImpl.pullKernelImpl

  public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //获取broker信息
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            // 构建pull请求头
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            //委托Netty去获取信息
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

pull消息比较简单,一次请求返回,由Consumer管理offset.一般来说一个Consumer Group中Consumer的数量不能大于MessageQueue的数量.

Consumer 支持push吗?

Push Consumer

Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立

刻回调 Listener 接口方法。JMS标准中为MessageListener类的onMessage方法.

Pull Consumer

Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

RocketMQ的Consumer都是从Broker pull消息来消费,但是为了能做到实时收消息,RocketMQ 使用长轮询方式,可以保证消息实时性同Push方式一致。这种长轮询方式类似于WebQQ收发消息机制。请参考以下信息了解更多Comet:基于 HTTP 长连接的“服务器推”技术

虽然RocketMQ的consumer都是通过pull来实现的但是其封装了push接口,我们先来看其使用方法

 public static void main(String[] args) throws InterruptedException, MQClientException {  
        /** 
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例 
         * 注意:ConsumerGroupName需要由应用来保证唯一 
         */  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup");  
        consumer.setNamesrvAddr("ip:port");  
          
        /** 
         * 订阅指定topic下tags分别等于TagA或TagB 
         */  
        consumer.subscribe("broker-a", "TagB || TagA");  
  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 
         * 如果非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        //真正的处理消息逻辑在这里
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            /** 
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 
             */  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,  
                    ConsumeConcurrentlyContext context) {  
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
                MessageExt msg = msgs.get(0);  
                if (msg.getTopic().equals("broker-a")) {  
                    // 执行TopicTest1的消费逻辑  
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {  
                        // 执行TagA的消费  
                        String message = new String(msg.getBody());  
                        System.out.println(message);  
                    }  
                    else if (msg.getTags() != null && msg.getTags().equals("TagB")) {  
                        // 执行TagB的消费  
                        String message = new String(msg.getBody());  
                        System.out.println(message);  
                    }  
                    
                }  
                //消费者向mq服务器返回消费成功的消息  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
  
       //Consumer对象在使用之前必须要调用start初始化,初始化一次即可
        consumer.start();  
  
    }  

RocketMQ push的实现 :

消息的拉取逻辑

维护一个pullRequestQueue,先放入一个pullRequest,当pullResult为成功时,再构建新的pullRequest放入pullRequestQueue,另起一个线程监测pullRequestQueue,当起不为空时,轮询pull消息

DefaultMQPushConsumer.start
-> DefaultMQPushConsumerImpl.start
-> MQClientInstance.start
-> PullMessageService.start
我们来看PullMessageService的run方法,

    //请求消息阻塞链表
    private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue();
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        //只要有请求就去pull消息
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

pullRequestQueue在在哪里put呢?
在class里找到在executePullRequestLater方法内会put

   public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    }

  public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

查看此方法的调用关系,发现在run中的pullMessage方法中onSuccess回调中会构建下一次的pullRequestQueue待下次请求

 PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
              //请求成功就构建新的pullRequest              pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);

                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                                boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);
//放到pullRequestQueue
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }

                            if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
                                    pullResult.getNextBeginOffset(), //
                                    firstMsgOffset, //
                                    prevRequestOffset);
                            }

                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}", //
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);

                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }

至此获取消息已经搞定,再看怎么触发MessageListener的消费方法.
还是在DefaultMQPushConsumerImpl.pullMessage方法内的回调,有下列代码,把消息提供给consumeMessageService处理.

  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);

构建ConsumeRequest,然后提交至线程池消费

    @Override
    public void submitConsumeRequest(//
        final List msgs, //
        final ProcessQueue processQueue, //
        final MessageQueue messageQueue, //
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List msgThis = new ArrayList(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

终于在ConsumeRequest的run方法中找到了listner的consumeMessage

 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

这下整个pull逻辑就完成了.

Consumer 怎么实现单队列并行消费

上节代码就是取得并行的例子,简单来说就是把消息提交给线程池,而不阻塞,就单队列并行消费了

Consumer 怎么过滤消息?

入口还是在DefaultMQPushConsumerImpl.pullMessage

pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

往里面看,发现有过滤消息的逻辑

   public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            List msgList = MessageDecoder.decodes(byteBuffer);

            List msgListFilterAgain = msgList;
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                msgListFilterAgain = new ArrayList(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }
            //消息过滤
            if (this.hasHook()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }

            for (MessageExt msg : msgListFilterAgain) {
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                    Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                    Long.toString(pullResult.getMaxOffset()));
            }

            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }
Consumer 怎么保证一条消息只被Group中的一个服务消费?

因为topic的MessageQueue只能对应Group中的一个Consumer,所以一条消息只被Group中的一个服务消费

Consumer 负载均衡怎么实现?

概念:

consumer同时消费多个MessageQueue,当topic中的MessageQueue变更时,动态调整消费MessageQueue的数量

 //RebalanceImpl
 public void doRebalance(final boolean isOrder) {
        Map subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }

我们只关心集群模式
主要逻辑:

1. 获取topic所有MessageQueue
2. 获取同ConsumerGroup组所有Consumer信息
3. 根据制定策略分配给此Consumer
 private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}", //
                            consumerGroup, //
                            topic, //
                            mqSet, //
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
            //获取该topic所有MessageQueue
                Set mqSet = this.topicSubscribeInfoTable.get(topic);
            //获取同consumerGroup信息
                List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List mqAll = new ArrayList();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List allocateResult = null;
                    try {
                    //根据分配策略分配MessageQueue给当前Consumer
                        allocateResult = strategy.allocate(//
                            this.consumerGroup, //
                            this.mQClientFactory.getClientId(), //
                            mqAll, //
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set allocateResultSet = new HashSet();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

RocketMQ提供了几种策略供使用

实现类 策略名
AllocateMessageQueueAveragelyByCircle 轮询平均分配策略
AllocateMessageQueueByMachineRoom 根据机房分配策略
AllocateMessageQueueConsistentHash 一致Hash分配策略

本节编写参考分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不同点

Consumer 消费失败怎么办

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为 有以下几种情况

由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其他消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10s 秒后再重试。

由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再 消费下一条消息,这样可以减轻 Broker 重试消息的压力。

具体到代码实现,会根据消费状态进行处理,当无返回时会重试.

   if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                //设置状态为重试
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
public void processConsumeResult(//
        final ConsumeConcurrentlyStatus status, //
        final ConsumeConcurrentlyContext context, //
        final ConsumeRequest consumeRequest//
    ) {
        int ackIndex = context.getAckIndex();

        if (consumeRequest.getMsgs().isEmpty())
            return;

        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                //请求重试消费
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
Consumer 可以回溯消费吗?

回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费 1 小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

逻辑: 请求broker按参数返回offset,按照offset重置消费offset,从而实现回溯消费

    public Map invokeBrokerToResetOffset(final String addr, final String topic, final String group,
        final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
        throws RemotingException, MQClientException, InterruptedException {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timestamp);
        requestHeader.setForce(isForce);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
        if (isC) {
            request.setLanguage(LanguageCode.CPP);
        }

        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                if (response.getBody() != null) {
                    ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                    return body.getOffsetTable();
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70344.html

相关文章

  • 关于 RocketMQ ClientID 相同引发的消息堆积的问题

    摘要:本篇着重聊聊为什么会消息堆积。默认的策略很好理解,将平均的分配给。那么最后时,本来不同的,会取到相同的举个例子,和都取到了前个,从而造成有些如果有的话没有对其消费,而没有被消费,消息也在不停的投递进来,就会造成消息的大量堆积。 首先,造成这个问题的 BUG RocketMQ 官方已经在 3月16号 的这个提交中...

    psychola 评论0 收藏0
  • RocketMQ源码学习(一)-概述

    摘要:每个与集群中的所有节点建立长连接,定时注册信息到所有。完全无状态,可集群部署。本系列源码解析主要参照原理简介来追寻其代码实现虽然版本不太一致但这也是能找到的最详细的资料了接下来根据其模块来源码阅读目录如下 为什么选择读RocketMQ? 对MQ的理解一直不深,上周看了,还是觉得不够深入,找个成熟的产品来学习吧,RabbitMQ是erLang写的,Kafka是Scala写的,非Java写...

    godlong_X 评论0 收藏0
  • RocketMQ源码学习(六)-Name Server

    摘要:完全无状态,可集群部署与集群中的其中一个节点随机选择建立长连接,定期从取路由信息,并向提供服务的建立长连接,且定时向发送心跳。既可以从订阅消息,也可以从订阅消息,订阅规则由配置决定。 问题列表: Name Server 的作用是什么? Name Server 存储了Broker的什么信息? Name Server 为Producer的提供些什么信息? Name Server 为Co...

    Joyven 评论0 收藏0
  • RocketMQ源码学习(五)-Broker(与Consumer交互部分)

    摘要:发送消息阶段,不允许发送重复的消息。虽然不能严格保证不重复,但是正常情况下很少会出现重复发送消费情况,只有网络异常,启停等异常情况下会出现消息重复。 问题列表 Broker 怎么响应Consumer请求? Broker 怎么维护ConsumeQueue? Broker 怎么处理事务消息的 ConsumeQueue ? Broker 怎么处理定时消息的 ConsumeQueue? B...

    paulli3 评论0 收藏0
  • SpringBoot RocketMQ 整合使用和监控

    摘要:前提通过前面两篇文章可以简单的了解和安装,今天就将和整合起来使用。然后我运行之前的整合项目,查看监控信息如下总结整篇文章讲述了与整合和监控平台的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ...

    Jacendfeng 评论0 收藏0

发表评论

0条评论

周国辉

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<