资讯专栏INFORMATION COLUMN

RocketMQ源码学习(二)-Producer

chengtao1633 / 2270人阅读

摘要:每个优先级可以用不同的表示,发消息时,指定不同的来表示优先级,这种方式可以解决绝大部分的优先级问题,但是对业务的优先级精确性做了妥协。支持定时消息,但是不支持任意时间精度,支持特定的,例如定时,,等。

Producer 生产者

这次源码学习的方法是带着问题学习源码实现,问题列表如下

Producer 同步消息怎么发送?

Producer 是与NameServer什么交互?

Producer 异步消息怎么发送?

P2P,Pub/Sub 都支持吗?

Producer 怎么保证顺序消息?

Producer 负载均衡?

Producer Group是什么概念?

Producer 怎么制定消息优先级?

Producer 的事务消息怎么实现?

Producer 定时消息怎么实现?

Producer 同步消息怎么发送? 消息体Message
    //主题
    private String topic;
    //状态
    private int flag;
    //属性
    private Map properties;
    //消息体
    private byte[] body;
执行逻辑
逻辑:
1. 查找topic中的发布信息
2. 选择topic中的某个MessageQueue
3. 使用Netty发送消息至MessageQueue
代码走读,重点逻辑在代码中注释(省略与主逻辑无关的代码)
DefaultMQProducer.send
->DefaultMQProducerImple.send
->DefaultMQProducerImple.sendDefaultImpl
private SendResult sendDefaultImpl(//
        Message msg, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //获取topic信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //选择topic中的中的某个MessageQueue(相当于kafka的partition)
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //真正的发送数据方法
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } 
            }
    }

本次重点关注消息体的发送
DefaultMQProducerImple.sendDefaultImpl->DefaultMQProducerImpl.sendKernelImpl

private SendResult sendKernelImpl(final Message msg, //
        final MessageQueue mq, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final TopicPublishInfo topicPublishInfo, //
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                //构建请求头部
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                //异步模式发送
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                            brokerAddr, // 1
                            mq.getBrokerName(), // 2
                            msg, // 3
                            requestHeader, // 4
                            timeout, // 5
                            communicationMode, // 6
                            sendCallback, // 7
                            topicPublishInfo, // 8
                            this.mQClientFactory, // 9
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                            context, //
                            this);
                        break;
                    case ONEWAY:
                    //同步模式发送
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

后面就是底层的消息发送工具,整个RocketMQ共用
DefaultMQProducerImpl.sendKernelImpl->MQClientAPIImpl.sendMessage

public SendResult sendMessage(//
        final String addr, // 1
        final String brokerName, // 2
        final Message msg, // 3
        final SendMessageRequestHeader requestHeader, // 4
        final long timeoutMillis, // 5
        final CommunicationMode communicationMode, // 6
        final SendCallback sendCallback, // 7
        final TopicPublishInfo topicPublishInfo, // 8
        final MQClientInstance instance, // 9
        final int retryTimesWhenSendFailed, // 10
        final SendMessageContext context, // 11
        final DefaultMQProducerImpl producer // 12
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
            default:
                assert false;
                break;
        }

        return null;
    }

MQClientAPIImpl.sendMessage->MQClientAPIImpl.sendMessageSync
委托给Netty

 private SendResult sendMessageSync(//
                                       final String addr, //
                                       final String brokerName, //
                                       final Message msg, //
                                       final long timeoutMillis, //
                                       final RemotingCommand request//
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
    }

MQClientAPIImpl.sendMessageSync->NettyRomotingClient.invokeSync
在调用前后触发hook,真正的还在后面

  @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

NettyRomotingClient.invokeSync->NettyRomotingClient.invokeSyncImpl
终于到了Netty真正发数据的时候了

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    PLOG.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }
Producer是与NameServer什么交互? TopicPublishInfo
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    //topic中MessgeQueuee信息
    private List messageQueueList = new ArrayList();
    //负责均衡策略的索引
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    //topic包含的路由信息
    private TopicRouteData topicRouteData;
执行逻辑
逻辑:
1. 发送消息时查找topic中的发布信息
2. 如果没找到,通过NameServer去寻找
3. 通过Netty请求NameServer
代码走读,重点逻辑在代码中注释(省略与主逻辑无关的代码)

DefaultMQProducerImpl

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //从NameServer更新
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

MQClientInstance

 public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
    }
                   
 public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        //构建向NameServer的消息体
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
        //Netty 请求
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                // TODO LOG
                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
Producer 异步消息怎么发送?

异步:

异步与同步的区别,是无需等待,后续逻辑透过回调.

代码走读,重点逻辑在代码中注释(省略与主逻辑无关的代码)

异步发送方法参数多了SendCallback参数,且没有返回值

   public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }

SendCallback是发送成功和失败后的回调函数

public interface SendCallback {
    public void onSuccess(final SendResult sendResult);

    public void onException(final Throwable e);
}

前面的和同步类似都跳过,直接看

  private void sendMessageAsync(//
        final String addr, //
        final String brokerName, //
        final Message msg, //
        final long timeoutMillis, //
        final RemotingCommand request, //
        final SendCallback sendCallback, //
        final TopicPublishInfo topicPublishInfo, //
        final MQClientInstance instance, //
        final int retryTimesWhenSendFailed, //
        final AtomicInteger times, //
        final SendMessageContext context, //
        final DefaultMQProducerImpl producer //
    ) throws InterruptedException, RemotingException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (null == sendCallback && response != null) {

                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        if (context != null && sendResult != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
                    } catch (Throwable e) {
                        //
                    }

                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    return;
                }

                if (response != null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
        // 上面重复了,sendCallBack的判断应该放在这
                        try {
        // 异步成功回调点
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        //异常回调点
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } else {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    if (!responseFuture.isSendRequestOK()) {
                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (responseFuture.isTimeout()) {
                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                            responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else {
                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    }
                }
            }
        });
    }
    @Override
    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
   public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                        //执行回调
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }

                        PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
                        timeoutMillis, //
                        this.semaphoreAsync.getQueueLength(), //
                        this.semaphoreAsync.availablePermits()//
                    );
                PLOG.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }
P2P,Pub/Sub都支持吗?

JMS系统有P2P和Pub/Sub两种模式,一条消息只能被消费一次时即为P2P,RocketMQ的广播模式即为了Pub/Sub模式.

Producer怎么保证顺序消息?

顺序消息:

消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

普通顺序消息:

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

严格顺序消息:

顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只 要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。

要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系

关于顺序消息建议阅读分布式开放消息系统(RocketMQ)的原理与实践
放在Producer就是要保证需要严格顺序消息就是把同类型的消息发送到同一MessageQueue,通过实现特定的MessageQueueSelector,可以定制跟消息体相关的MessageQueue

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
    }
public interface MessageQueueSelector {
    MessageQueue select(final List mqs, final Message msg, final Object arg);
}
Producer 负载均衡?

同步发送消息中,有这样一个方法来选定MessageQueue

MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
   public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }

保证低延迟

   public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
   //如果要保证最低发送延迟走此逻辑
   //建议:这里改成策略模式
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                //轮询看能不能有低延迟的MessageQueue
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
        //如果没找到就随机找一个
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }
        
        //不然走此逻辑,轮询模式
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

选最低延迟的broker,根据FaultItem的可用性和开始时间选择

    @Override
    public String pickOneAtLeast() {
        final Enumeration elements = this.faultItemTable.elements();
        List tmpList = new LinkedList();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
        
                @Override
    public String pickOneAtLeast() {
        final Enumeration elements = this.faultItemTable.elements();
        List tmpList = new LinkedList();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            Collections.shuffle(tmpList);
            //按延迟排序
            Collections.sort(tmpList);

            final int half = tmpList.size() / 2;
            if (half <= 0) {
                return tmpList.get(0).getName();
            } else {
            //从最烂的后一个开始选
                final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();
            }
        }

        return null;
    }

FaultItem排序策略

        @Override
        public int compareTo(final FaultItem other) {
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable())
                    return -1;

                if (other.isAvailable())
                    return 1;
            }

            if (this.currentLatency < other.currentLatency)
                return -1;
            else if (this.currentLatency > other.currentLatency) {
                return 1;
            }

            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }

轮询模式:

轮询取不等于lastBrokerName的一个MessageQueue
  public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

没有lastBrokerName,就轮询取吧

   public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
Producer Group是什么概念?

一类 Producer 的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。

Producer 怎么制定消息优先级?

优先级:

每条消息都有不同的优先级,一般用整数来描述,优先级高的消 息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。由于RocketMQ所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此RocketMQ没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即多带带配置一个优先级高的队列,和一个普通优先级 的队列, 将不同优先级发送到不同队列即可。

对于优先级问题,可以归纳为 2 类

只要达到优先级目的即可,不是严格意义上的优先级,通常将优先级划分为高、中、低,或者再多几个级别。每个优先级可以用不同的topic表示,发消息时,指定不同的topic来表示优先级,这种方式可以解决绝大部分的优先级问题,但是对业务的优先级精确性做了妥协。

严格的优先级,优先级用整数表示,例如0~65535,这种优先级问题一般使用不同 topic 解决就非常不合适。如果要让 MQ 解决此问题,会对 MQ 的性能造成非常大的影响。这里要确保一点,业务上是否确实需要这种严格的优先级,如果将优先级压缩成几个,对业务的影响有多大?

Producer 的事务消息怎么实现?

逻辑:

1. 发送prepare消息
2. 执行生产者业务逻辑(业务逻辑代码需实现LocalTransactionExecuter)
3. 结束事务消息(commit,rollback,notType)

TransactionMQProducer.sendMessageInTransaction->DefaultMQProducerImpl.sendMessageInTransaction

    public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
        throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", null);
        }
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        // 设置事务消息属性
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            1. 发送消息,同普通消息一样
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    //执行生产者业务逻辑
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            //如果1就发送失败,就回滚
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
        //结束事务状态,by本地业务逻辑执行情况
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
    public void endTransaction(//
        final SendResult sendResult, //
        final LocalTransactionState localTransactionState, //
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
        //如果本地执行成功就提交
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
        //如果本地失败,就回滚
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
        //如果本地未知,那就打死未知
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

特殊情况:如果事务结束消息发送失败,该怎么办?
broker会检查如果是prepared状态,则会向Producer发起CheckTransaction请求.

Producer 延时消息怎么实现?

定时消息:

指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能 被消费。
如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不 可避免的产生巨大性能开销。

RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。

实现:
在消息体中设置Message延迟级别,剩下的就交给Broker实现吧

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70317.html

相关文章

  • RocketMQ源码学习(一)-概述

    摘要:每个与集群中的所有节点建立长连接,定时注册信息到所有。完全无状态,可集群部署。本系列源码解析主要参照原理简介来追寻其代码实现虽然版本不太一致但这也是能找到的最详细的资料了接下来根据其模块来源码阅读目录如下 为什么选择读RocketMQ? 对MQ的理解一直不深,上周看了,还是觉得不够深入,找个成熟的产品来学习吧,RabbitMQ是erLang写的,Kafka是Scala写的,非Java写...

    godlong_X 评论0 收藏0
  • RocketMQ源码学习(六)-Name Server

    摘要:完全无状态,可集群部署与集群中的其中一个节点随机选择建立长连接,定期从取路由信息,并向提供服务的建立长连接,且定时向发送心跳。既可以从订阅消息,也可以从订阅消息,订阅规则由配置决定。 问题列表: Name Server 的作用是什么? Name Server 存储了Broker的什么信息? Name Server 为Producer的提供些什么信息? Name Server 为Co...

    Joyven 评论0 收藏0
  • SpringBoot RocketMQ 整合使用和监控

    摘要:前提通过前面两篇文章可以简单的了解和安装,今天就将和整合起来使用。然后我运行之前的整合项目,查看监控信息如下总结整篇文章讲述了与整合和监控平台的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ...

    Jacendfeng 评论0 收藏0
  • 高并发异步解耦利器:RocketMQ究竟强在哪里?

    摘要:它是阿里巴巴于年开源的第三代分布式消息中间件。是一个分布式消息中间件,具有低延迟高性能和可靠性万亿级别的容量和灵活的可扩展性,它是阿里巴巴于年开源的第三代分布式消息中间件。上篇文章消息队列那么多,为什么建议深入了解下RabbitMQ?我们讲到了消息队列的发展史:并且详细介绍了RabbitMQ,其功能也是挺强大的,那么,为啥又要搞一个RocketMQ出来呢?是重复造轮子吗?本文我们就带大家来详...

    tainzhi 评论0 收藏0
  • 本地RocketMQ的安装与调试

    摘要:本地的安装与调试标签启动进入的源码项目。消息发送的高性能与低延迟。强大的消息堆积能力与消息处理能力。严格的顺序消息存储。保证消息至少被消费一次,但不承诺消息不会被消费者多次消费。其消息的幂等由消费者自己实现。 本地RocketMQ的安装与调试 标签:【RocketMQ】 1. 启动 进入RocketMQ-ALL的源码项目。 执行maven打包: mvn -Prelease-all ...

    icattlecoder 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<