前提介绍

在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。

DefaultMQPullConsumer

DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。

总体流程执行

DefaultMQPullConsumer使用例子

public class MQPullConsumer {    private static final Map OFFSE_TABLE = new HashMap();    public static void main(String[] args) throws MQClientException {        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");        consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");        consumer.start();        // 从指定topic中拉取所有消息队列        Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");        for(MessageQueue mq:mqs){            try {                // 获取消息的offset,指定从store中获取               long offset = consumer.fetchConsumeOffset(mq,true);                while(true){                 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);                                 putMessageQueueOffset(mq,pullResult.getNextBeginOffset());                    switch(pullResult.getPullStatus()){                    case FOUND:                        List messageExtList = pullResult.getMsgFoundList();                        for (MessageExt m : messageExtList) {                            System.out.println(new String(m.getBody()));                        }                        break;                    case NO_MATCHED_MSG:                        break;                    case NO_NEW_MSG:                        break;                    case OFFSET_ILLEGAL:                        break;                    }                }            } catch (Exception e) {                e.printStackTrace();            }        }        consumer.shutdown();    }    // 保存上次消费的消息下标    private static void putMessageQueueOffset(MessageQueue mq,            long nextBeginOffset) {        OFFSE_TABLE.put(mq, nextBeginOffset);    }    // 获取上次消费的消息的下标    private static Long getMessageQueueOffset(MessageQueue mq) {        Long offset = OFFSE_TABLE.get(mq);        if(offset != null){            return offset;        }        return 0l;    }}
  • 消费者启动:consumer.start();
  • 获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
    Set mqs = consumer.fetchSubscribeMessageQueues("topicTest");//遍历队列for(MessageQueue mq:mqs){    try {    //获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取,true表示从broker获取        long offset = consumer.fetchConsumeOffset(mq,true);        while(true){            //第二个参数表示可以消费哪些tag的消息            //第三个参数表示从哪个位移开始消费消息            //第四个参数表示一次最大拉多少个消息            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);}

DefaultMQPullConsumer的总体流程

启动DefaultMQPullConsumer是通过调用start()方法完成的

DefaultMQPullConsumer拉取源码分析

分析下DefaultMQPullConsumer拉取消息的流程

consumer.fetchSubscribeMessageQueues("order-topic")

从指定topic中拉取所有消息队列

Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");

核心源码分析

fetchSubscribeMessageQueues()
  • 通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。

  • 每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。

  • 向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
 public Set fetchSubscribeMessageQueues(String topic) throws MQClientException {        try {            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);            if (topicRouteData != null) {                // 2、遍历topicRouteData                Set mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);                if (!mqList.isEmpty()) {                    return mqList;                } else {                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);                }            }        } catch (Exception e) {            throw new MQClientException(                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),                e);        }        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);    }

遍历过程TopicRouteData

遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限,
若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合;
最后返回给MessageQueue集合

public static Set topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {        Set mqList = new HashSet();        List qds = route.getQueueDatas();        for (QueueData qd : qds) {            if (PermName.isReadable(qd.getPerm())) {                for (int i = 0; i < qd.getReadQueueNums(); i++) {                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);                    mqList.add(mq);                }            }        }        return mqList;    }
consumer.fetchConsumeOffset

通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。

DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)

fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。
这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。

pullBlockIfNotFound拉取信息

rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。

不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());    }

获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。

  • 参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;

  • 参数2:需要过滤用的表达式 ;

  • 参数3:偏移量即消费队列的进度 ;

  • 参数4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
DefaultMQPullConsumerImpl.pullSyncImpl的实现过程
      private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,        long timeout)        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.isRunning();        //检查入参是否合法        if (null == mq) {            throw new MQClientException("mq is null", null);        }        if (offset < 0) {            throw new MQClientException("offset < 0", null);        }        if (maxNums <= 0) {            throw new MQClientException("maxNums <= 0", null);        }        //更新再平衡服务的数据,因为再平衡服务不起作用,所以更新数据没有效果        this.subscriptionAutomatically(mq.getTopic());        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);        //计算超时时间,如果调用的是pullBlockIfNotFound方法,block参数就是true,否则就是false        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());        //调用PullAPIWrapper从broker拉取消息,        //pullKernelImpl方法里面构建PullMessageRequest请求对象        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(            mq,//队列            subscriptionData.getSubString(),//消息的过滤规则            subscriptionData.getExpressionType(),            isTagType ? 0L : subscriptionData.getSubVersion(),            offset,//拉取消息的位移            maxNums,//建议broker一次性返回最大消息个数,默认是32个            sysFlag,            0,//设置的提交位移,可以看到永远都是0,所以broker无法记录有效位移,需要程序自己记录控制提交位移            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),            timeoutMillis,//超时时间            CommunicationMode.SYNC,            null//回调逻辑为null        );        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);        //If namespace is not null , reset Topic without namespace.        this.resetTopic(pullResult.getMsgFoundList());        if (!this.consumeMessageHookList.isEmpty()) {            ConsumeMessageContext consumeMessageContext = null;            consumeMessageContext = new ConsumeMessageContext();            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());            consumeMessageContext.setConsumerGroup(this.groupName());            consumeMessageContext.setMq(mq);            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());            consumeMessageContext.setSuccess(false);            this.executeHookBefore(consumeMessageContext);            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());            consumeMessageContext.setSuccess(true);            this.executeHookAfter(consumeMessageContext);        }        return pullResult;    }

检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*"
this.subscriptionAutomatically(mq.getTopic());
// 构建标志位,逻辑或运算|=
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

    SubscriptionData subscriptionData;    try {        //以请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),            mq.getTopic(), subExpression);    } catch (Exception e) {        throw new MQClientException("parse subscription error", e);    }    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;    // 从broker中拉取消息    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(        mq,        subscriptionData.getSubString(),        0L,        offset,        maxNums,        sysFlag,        0,        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),        timeoutMillis,        CommunicationMode.SYNC,        null    );    // 对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);    if (!this.consumeMessageHookList.isEmpty()) {        ConsumeMessageContext consumeMessageContext = null;        consumeMessageContext = new ConsumeMessageContext();        consumeMessageContext.setConsumerGroup(this.groupName());        consumeMessageContext.setMq(mq);        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());        consumeMessageContext.setSuccess(false);        this.executeHookBefore(consumeMessageContext);        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());        consumeMessageContext.setSuccess(true);        this.executeHookAfter(consumeMessageContext);    }    return pullResult;}

Push和Pull的操作对比

  • push-优点:及时性、服务端统一处理实现方便
  • push-缺点:容易造成堆积、负载性能不可控
  • pull-优点:获得消息状态方便、负载均衡性能可控
  • pull-缺点:及时性差

使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。

资料参考