消费者详解-消费消息(1)

article/2025/8/20 22:47:34

文章目录

    • 消费者消费消息流程
      • Pull消费流程
        • 1. 初始化消费者
        • 2. 拉取topic的消息队列
        • 3. 拉取消费位点
        • 4. 根据消费位点消费消息
        • 5. 保存消费进度
        • 拉取消息-pullKernelImpl
      • Broker处理拉取消息请求
        • 1、权限、参数校验并且获取初始化变量:
        • 2、获取拉取消息的topic配置
        • 3、解析订阅信息
        • 4、创建消息过滤器
        • 5、获取消息
          • (1)校验消息存储服务状态
          • (2)校验消费队列
          • (3)获取消息
        • 6、处理消息结果
        • 7、更新消费位点
        • 8、返回结果

消费者消费消息流程

RocketMQ的消费方式包含Pull和Push两种。

Pull方式:用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPullConsumer是默认的Pull消费者实现类。

Push方式:代码接入非常简单,适合大部分业务场景。在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类。

Pull消费流程

在这里插入图片描述

在Pull消费信息下,用户需要自己去管理消费位点offset、拉取消息等。所以根据以上Pull消费流程分析:

1. 初始化消费者

初始化好Pull消费者,比如设置消费者组、nameServ地址、订阅的topic等,并启动Pull消费者

2. 拉取topic的消息队列

调用fetchSubscribeMessageQueues(String topic),拉取topic的所有MessageQueue。首先尝试获取在topicSubscribeInfoTable内存中topic的消息队列,如果找不到,则调用fetchSubscribeMessageQueues(String topic)方法从nameServ找topic的路由信息解析为topic的消息队列,并将topic去除namespace处理。源码在org.apache.rocketmq.client.impl.consumer.fetchSubscribeMessageQueues

    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {this.isRunning();// check if has info in memory, otherwise invoke api.// 从内存topicSubscribeInfoTable中尝试获取topic的消息队列,如果没有则请求nameServ获取Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);if (null == result) {result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);}// 对topic去除namespace处理return parseSubscribeMessageQueues(result);}public Set<MessageQueue> parseSubscribeMessageQueues(Set<MessageQueue> queueSet) {Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();for (MessageQueue messageQueue : queueSet) {String userTopic = NamespaceUtil.withoutNamespace(messageQueue.getTopic(),this.defaultMQPullConsumer.getNamespace());resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId()));}return resultQueues;}public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {try {// 从nameServ拉取topic路由信息TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);if (topicRouteData != null) {// 将路由信息转换为消息队列Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);if (!mqList.isEmpty()) {return mqList;} else {throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);}}} catch (Exception e) {throw new MQClientException("Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),e);}throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);}public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {Set<MessageQueue> mqList = new HashSet<MessageQueue>();List<QueueData> qds = route.getQueueDatas();for (QueueData qd : qds) {if (PermName.isReadable(qd.getPerm())) {for (int i = 0; i < qd.getReadQueueNums(); i++) {MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);mqList.add(mq);}}}return mqList;}

3. 拉取消费位点

根据topic获取到的消息队列列表,遍历topic全部的消息队列,获取每个队列的消费位点(消费位点读取方式由用户决定,下面的consumeFromOffset(MessageQueue messageQueue)方法是RocketMQ的示例)

    public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException {//-1 when startedlong offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);if (offset < 0) {//query from brokeroffset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);}if (offset < 0) {//first time start from last offsetoffset = consumer.maxOffset(messageQueue);}//make sureif (offset < 0) {offset = 0;}return offset;}

4. 根据消费位点消费消息

根据查询到的消费位点offset调用org.apache.rocketmq.client.consumer.DefaultMQPullConsumer#pull方法获取该topic的消息,调用DefaultMQPullConsumer#pull方法实际调用的是org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#pull方法:

    public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// consumerPullTimeOutMills:消费者拉取消息超时时间,默认10sreturn pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());}public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 构建 topic订阅信息SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);}private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)throws MQClientException {if (null == mq) {throw new MQClientException("mq is null", null);}try {return FilterAPI.buildSubscriptionData(mq.getTopic(), subExpression);} catch (Exception e) {throw new MQClientException("parse subscription error", e);}}public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {subscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData;}

首先构建拉取的topic的订阅信息subscriptionData,调用pullSyncImpl方法同步拉取topic消息。

当然,有同步拉取就有异步拉取,异步拉取的方法是pullAsyncImpl,流程大致一致,只是多了个用户自定义实现的PullCallback用于拉取到消息后回调通知用户进行处理。

private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 1、判断当前消费者是否处于Running状态this.isRunning();if (null == mq) {throw new MQClientException("mq is null", null);}if (offset < 0) {throw new MQClientException("offset < 0", null);}if (maxNums <= 0) {throw new MQClientException("maxNums <= 0", null);}// 如果topic在原本的订阅信息中不存在,自动添加topic的订阅信息this.subscriptionAutomatically(mq.getTopic());// 标志位/*** commitOffset:有该标志位表示Broker会根据拉取的请求更新保存的消费位点* block/suspend:有该标志位表示改拉取请求可以被暂时挂起,等待 timeoutMillis 时间内会检查是否有新消息可拉取* subscription:有该标志位则表示Broker会根据拉取请求的信息创建topic订阅信息,没有则会从消费者注册信息中获取* classFilter:有该标志位则表示需要走类过滤模式,在发送拉取消息请求前会选择有ClassFilter的Broker*/int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);// block = true consumerTimeoutMillisWhenSuspend 超时时间默认 30000ms,// 否则就是timeout,一般为consumerPullTimeoutMillis,默认为 10000mslong timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;// 判断tag表达式类型是否是Tag类型(默认为Tag类型)boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());// 拉取消息PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(mq,subscriptionData.getSubString(),subscriptionData.getExpressionType(),isTagType ? 0L : subscriptionData.getSubVersion(),offset,maxNums,sysFlag,0,this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),timeoutMillis,CommunicationMode.SYNC,null);// 处理拉取消息结果this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);//If namespace is not null , reset Topic without namespace.this.resetTopic(pullResult.getMsgFoundList());if (!this.consumeMessageHookList.isEmpty()) {ConsumeMessageContext consumeMessageContext = null;consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());consumeMessageContext.setConsumerGroup(this.groupName());consumeMessageContext.setMq(mq);consumeMessageContext.setMsgList(pullResult.getMsgFoundList());consumeMessageContext.setSuccess(false);this.executeHookBefore(consumeMessageContext);consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());consumeMessageContext.setSuccess(true);this.executeHookAfter(consumeMessageContext);}return pullResult;
}public void subscriptionAutomatically(final String topic) {// 已有订阅信息中不包含topicif (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {try {// 创建订阅信息添加到subscriptionInner中SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);} catch (Exception ignore) {}}}
  1. 校验Cosnumer是否处于Running状态,校验入参

  2. 如果topic在原本的订阅信息subscriptionInner中不存在,自动添加topic的订阅信息

  3. 设置系统标志sysFlag和传参timeoutMillisisTagTypesysFlag标志含义如下:

  • commitOffset:有该标志位表示Broker会根据拉取的请求更新保存的消费位点
  • block/suspend:有该标志位表示改拉取请求可以被暂时挂起,等待 timeoutMillis 时间内会检查是否有新消息可拉取
  • subscription:有该标志位则表示Broker会根据拉取请求的信息创建topic订阅信息,没有则会从消费者注册信息中获取
  • classFilter:有该标志位则表示需要走类过滤模式,在发送拉取消息请求前会选择有ClassFilter的Broker
  1. 调用this.pullAPIWrapper.pullKernelImpl方法拉取消息

  2. 调用this.pullAPIWrapper.processPullResult处理拉取消息结果。根据Broker返回的消息结果中suggestWhichBrokerId字段更新pullFromWhichNodeTable中该mq对应的下次请求拉取消息时应该向那个Broker拉取请求的BrokerId。

    如果拉取到消息(pullStatus == PullStatus.FOUND),则再次根据topic的订阅信息中记录的tags进行过滤,因为Broker只进行tag hashcode过滤,可能会有hash冲突,需要进一步过滤。

    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;// 更新pullFromWhichNodeTablethis.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());// 如果拉取结果是拉取到消息if (PullStatus.FOUND == pullResult.getPullStatus()) {ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);// 再次根据tag名进行过滤,因为Broker只进行tag hashcode过滤,可能会有hash冲突,需要进一步过滤List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));msg.setBrokerName(mq.getBrokerName());}pullResultExt.setMsgFoundList(msgListFilterAgain);}pullResultExt.setMessageBinary(null);return pullResult;}public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);if (null == suggest) {this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));} else {suggest.set(brokerId);}}
  1. 如果namespace不为空,重新设置拉取到的消息的topic为去除namespace后的topic
    public void resetTopic(List<MessageExt> msgList) {if (null == msgList || msgList.size() == 0) {return;}//If namespace not null , reset Topic without namespace.for (MessageExt messageExt : msgList) {if (null != this.getDefaultMQPullConsumer().getNamespace()) {messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultMQPullConsumer.getNamespace()));}}}

5. 保存消费进度

保存消费进度,消费进度可以执行*updateConsumeOffset()*方法,将消费位点上报给Broker,也可以自行保存消费位点

    // 更新某一个Queue的消费位点@Overridepublic void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {// 如果namespace不为空,则设置传入的mq的topic加上namespacethis.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);}public MessageQueue queueWithNamespace(MessageQueue queue) {if (StringUtils.isEmpty(this.getNamespace())) {return queue;}return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());}public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {// 当前consumer是否处于runningthis.isRunning();// 更新offsetTable mq对应的offset,false表示不必一定要递增才可以更新this.offsetStore.updateOffset(mq, offset, false);}// RemoteBrokerOffsetStore和LocalFileOffsetStore的更新offset逻辑一致public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {// 更新offsetTable mq对应的offsetAtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {// increaseOnly=true表示只允许值递增,比较offset大于offsetOld才能更新offsetTableif (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}}}public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) {long prev = target.get();while (value > prev) {boolean updated = target.compareAndSet(prev, value);if (updated)return true;prev = target.get();}return false;}

拉取消息-pullKernelImpl

org.apache.rocketmq.client.impl.consumerPullAPIWrapper.pullKernelImpl方法是真正的消费者拉取消息实现:

    public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 2.获取mq对应的Broker地址信息FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),// 1.计算应该从那个BrokerId拉取消息this.recalculatePullFromWhichNode(mq), false);// 如果获取不到,则尝试更新topic路由信息,然后再次获取if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {{// check version// 3.如果tag表达式不是TAG类型,且查询到拉取消息的Broker节点的版本小于V4_1_0_SNAPSHOT,则报错不支持除TAG外的类型if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;// 如果拉取消息的Broker是Slave,去除CommitOffset标志位if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);// isTagType ? 0 : subscriptionData.getSubVersion()requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();// 如果存在ClassFilter标志位,则尝试随机获取一个topic对应存在FilterServer的BrokerAddrif (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}// 4.拉取消息PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}
  1. 首先调用this.recalculatePullFromWhichNode(mq)方法计算从pullFromWhichNodeTable获取当前mq应该从哪个BrokerId拉取消息,默认是MixAll.MASTER_ID

        public long recalculatePullFromWhichNode(final MessageQueue mq) {// 默认falseif (this.isConnectBrokerByUser()) {// defaultBrokerId 默认等于 MixAll.MASTER_IDreturn this.defaultBrokerId;}// 建议从哪个Broker拉取消息AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);if (suggest != null) {return suggest.get();}// 没有记录则默认返回MASTER_IDreturn MixAll.MASTER_ID;}
    
  2. 调用this.mQClientFactory.findBrokerAddressInSubscribe方法根据brokerAddrTable获取指定的BrokerName、BrokerId的地址信息,如果找不到,则立即请求nameServ更新topic路由信息以及Broker的地址信息然后再次尝试获取。

  3. 校验Broker是否支持SQL92类型tag,Broker节点的版本小于V4_1_0_SNAPSHOT则不支持;

    如果请求的Broker是Slave,则去除sysFlag中的FLAG_COMMIT_OFFSET标志位,表示Slave不允许拉取消息时更新消费位点。(Slave通过定时任务定时从Broker中拉取配置信息,包含消费位点信息进行更新);

    如果sysFlagFLAG_CLASS_FILTER标志位,则重新获取Broker的地址,从topic对应的路由信息中随机获取一个有Filter Server的Broker的地址

        private String computePullFromWhichFilterServer(final String topic, final String brokerAddr)throws MQClientException {// 获取所有topic路由信息ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();if (topicRouteTable != null) {// 获取指定topic的路由信息TopicRouteData topicRouteData = topicRouteTable.get(topic);// 获取指定Broker的filterServer列表List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);// 如果不为空则随机取一个的BrokerAddrif (list != null && !list.isEmpty()) {return list.get(randomNum() % list.size());}}throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "+ topic, null);}
    
  4. 拉取消息,调用this.mQClientFactory.getMQClientAPIImpl().pullMessage方法,创建请求编码为RequestCode.PULL_MESSAGERemotingCommand,实现分为异步ASYNC和同步SYNC实现,不管异步和同步,Broker端都是同一个方法进行接收请求处理

        public PullResult pullMessage(final String addr,final PullMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC:this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC:return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break;}return null;}private PullResult pullMessageSync(final String addr,final RemotingCommand request,final long timeoutMillis) throws RemotingException, InterruptedException, MQBrokerException {// 拉取消息RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;// 解析并设置拉取结果状态return this.processPullResponse(response, addr);}private PullResult processPullResponse(final RemotingCommand response,final String addr) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark(), addr);}PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());}
    

Broker处理拉取消息请求

BrokerController初始化时会调用this.registerProcessor();方法向NettyRemotingServer注册RequestCode对应的处理器NettyRequestProcessor,存储在HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable内存中,当有RequestCommand请求进来,会根据RequestCommandRequestCode调用对应的处理器。

而消费者向Broker请求拉取消息的RequestCodePULL_MESSAGE,对应的处理器是:org.apache.rocketmq.broker.processor.PullMessageProcessor

   /*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);

当有拉取消息的请求进来时,会调用PullMessageProcessor#processRequest方法,是调用的是this.processRequest(ctx.channel(), request, true);方法,这里brokerAllowSuspend传参传入了true,表示Broker允许挂起拉取请求,在一定时间内等待消息。

    public RemotingCommand processRequest(final ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {return this.processRequest(ctx.channel(), request, true);}

this.processRequest(ctx.channel(), request, true);方法逻辑流程大致如下:

1、权限、参数校验并且获取初始化变量:

  • 校验当前Broker是否可读,没有读权限则返回NO_PERMISSION

  • 调用findSubscriptionGroupConfig方法根据消费者组从subscriptionGroupTable内存中获取消费者的消费者组配置信息,如果消费者组配置不在内存中,如果允许自动创建消费者组配置(autoCreateSubscriptionGroup默认为true)或者是系统内置的消费者组名则创建该消费者组的配置信息,然后递增一个版本信息dataVersion,持久化信息;这是唯一创建消费者组配置SubscriptionGroupConfig的地方,在消费者发送心跳给Broker会创建对应的消费者的配置信息。

    如果返回的SubscriptionGroupConfig仍然为空,即不存在内存中,又无法自动创建且不是系统消费者组名,则会返回SUBSCRIPTION_GROUP_NOT_EXIST

        public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {// 根据消费者组获取消费者组配置信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {// 允许自动创建消费者组配置(如果不允许则需要手动命令行创建),或者是系统消费者组,是则创建消费者组group订阅组信息,autoCreateSubscriptionGroup默认为trueif (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}// 递增一个版本this.dataVersion.nextVersion();// 固化消费者组配置信息this.persist();}}return subscriptionGroupConfig;}
    
  • 校验消费者组配置信息中的consumeEnable是否为true,如果是false,表示该消费者组不允许消费,返回NO_PERMISSIONconsumeEnable通过Broker的定时任务调用BrokerController.this.protectBroker()方法:

        public void protectBroker() {// disableConsumeIfConsumerReadSlowly 表示如果消费者消费消息过慢是否失效该消费者,默认falseif (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {// 遍历momentStatsItemSetFallSize统计信息中的statsItemTablefinal Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();while (it.hasNext()) {final Map.Entry<String, MomentStatsItem> next = it.next();// 获取统计的消费者拉取信息的剩余字节final long fallBehindBytes = next.getValue().getValue().get();// 如果大于consumerFallbehindThresholdif (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {// 解析 statsKey :queueId + @ + topic + @ + groupfinal String[] split = next.getValue().getStatsKey().split("@");final String group = split[2];LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);// 按照group获取subscriptionGroupTable中消费者组的消费者,将consumeEnable置为falsethis.subscriptionGroupManager.disableConsume(group);}}}}
    

    其中momentStatsItemSetFallSize统计剩余字节信息通过消费者在拉取完消息后统计还有多少字节消息没有拉取进行统计,调用org.apache.rocketmq.store.stats.BrokerStatsManager#recordDiskFallBehindSize方法:

        public void recordDiskFallBehindSize(final String group, final String topic, final int queueId,final long fallBehind) {final String statsKey = String.format("%d@%s@%s", queueId, topic, group);this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);}public MomentStatsItem getAndCreateStatsItem(final String statsKey) {MomentStatsItem statsItem = this.statsItemTable.get(statsKey);if (null == statsItem) {statsItem =new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);MomentStatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);if (null != prev) {statsItem = prev;// statsItem.init();}}return statsItem;}
    
  • 初始化标志变量:

    • hasSuspendFlag:为true时表示改拉取请求可以被暂时挂起,等待 suspendTimeoutMillisLong时间内会检查是否有新消息可拉取

    • hasCommitOffsetFlag:为true时表示Broker会根据拉取的请求更新保存的消费位点

    • hasSubscriptionFlag:为true时表示Broker会根据拉取请求的信息创建topic订阅信息,没有则会从消费者注册信息中获取

    • suspendTimeoutMillisLong:有暂停挂起标志hasSuspendFlag为true,则suspendTimeoutMillisLong = suspendTimeoutMillis(默认 20000ms),没有则是0

2、获取拉取消息的topic配置

  • 根据请求的topic获取TopicConfigManager内存topicConfigTable中保存的TopicConfigtopicConfigTable信息会在生产者发送topic消息,Broker接收到消息后创建该topic的TopicConfig。如果没有在topicConfigTable中找到topic的TopicConfig,则报TOPIC_NOT_EXIST错误。
  • 如果该topic的TopicConfig中没有读权限,则返回NO_PERMISSION错误
  • 如果请求的消费队列id小于0或者大于等于该topic的TopicConfig的读队列数(下标从0开始),返回SYSTEM_ERROR错误

3、解析订阅信息

  • 如果hasSubscriptionFlag = true,则根据请求Header中获取topicsubscriptionexpressionType创建订阅信息subscriptionData。如果表达式类型expressionType不是Tag类型,则创建consumerFilterData

  • hasSubscriptionFlag = false,则从消费者注册信息中获取信息创建订阅信息

    • 根据消费者组consumerGroup从消费者管理服务ConsumerManager内存consumerTable中获取消费者组信息ConsumerGroupInfoConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable是通过消费者启动时,发送心跳给Broker,Broker注册消费者时添加消费者组的信息。如果ConsumerGroupInfo为空,则返回SUBSCRIPTION_NOT_EXIST错误
    • 判断上面获取到的消费者订阅组配置是否允许消费者广播模式消费,如果不允许,消费者又是广播消费模式,则返回NO_PERMISSION
    • 从查询到的消费者组配置ConsumerGroupInfo中获取topic的订阅信息subscriptionData,如果subscriptionData == null,返回SUBSCRIPTION_NOT_EXIST错误
    • 判断Broker记录的topic的订阅信息的版本是否符合(只要大于等于请求Header记录的订阅信息版本就没有问题),不是则返回SUBSCRIPTION_NOT_LATEST错误
    • 如果订阅信息中的表达式类型不是Tag类型,则从consumerFilterManager消费者过滤管理器内存ConcurrentMap<String/*Topic*/, FilterDataMapByTopic> filterDataByTopic中获取该消费者组订阅的topic的消费者过滤信息consumerFilterData(这里的filterDataByTopic也是消费者启动时通过心跳发送信息到Broker中,Broker将信息保存在consumerFilterManager的内存中)。如果查询到的consumerFilterData为空,则返回FILTER_DATA_NOT_EXIST错误;如果consumerFilterData的版本小于请求Header中记录的订阅信息版本,则返回FILTER_DATA_NOT_LATEST错误
  • 如果不是Tag类型,且Broker配置enablePropertyFilter(是否允许消息内容过滤)为false(默认false),返回SYSTEM_ERROR

4、创建消息过滤器

  • filterSupportRetry = true表示重试消息允许进行消息过滤,则创建ExpressionForRetryMessageFilter对象;否则创建ExpressionMessageFilter对象

5、获取消息

Broker通过调用消息存储服务MessageStoregetMessage方法获取请求的消息,源码在org.apache.rocketmq.store.DefaultMessageStore#getMessage

    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,final int maxMsgNums,final MessageFilter messageFilter) {// 校验消息存储服务状态if (this.shutdown) {log.warn("message store has shutdown, so getMessage is forbidden");return null;}if (!this.runningFlags.isReadable()) {log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());return null;}long beginTime = this.getSystemClock().now();GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;long nextBeginOffset = offset;long minOffset = 0;long maxOffset = 0;// lazy init when find msg.GetMessageResult getResult = null;// 当前Master Broker存储的所有消息的最大物理位点final long maxOffsetPy = this.commitLog.getMaxOffset();// 根据topic、queueId查询ConsumeQueue索引ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);// ConsumeQueue索引存在if (consumeQueue != null) {// consumeQueue索引中最小的位点minOffset = consumeQueue.getMinOffsetInQueue();// consumeQueue索引中最大的位点maxOffset = consumeQueue.getMaxOffsetInQueue();// 如果 minOffset 为空、查询的消息的位点小于minOffset、查询的消息的位点等于大于maxOffset,返回错误信息if (maxOffset == 0) {status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);// 查询offset 太小} else if (offset < minOffset) {status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);// 查询offset 超过 消费队列 一个位置} else if (offset == maxOffset) {status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);// 查询offset 超过 消费队列 太多(大于一个位置)} else if (offset > maxOffset) {status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}} else {// 根据offset查找ConsumeQueue信息SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);if (bufferConsumeQueue != null) {try {status = GetMessageStatus.NO_MATCHED_MESSAGE;// commitLog下一个文件(MappedFile)对应的开始offset。long nextPhyFileStartOffset = Long.MIN_VALUE;// 拉取到的消息最大物理位置offsetlong maxPhyOffsetPulling = 0;int i = 0;final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();getResult = new GetMessageResult(maxMsgNums);ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();// 根据bufferConsumeQueue的size和maxFilterMessageCount遍历获取消息在commitLog的偏移量、消息大小,消息的tags的hashcodefor (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {// 消息物理位置offsetlong offsetPy = bufferConsumeQueue.getByteBuffer().getLong();// 消息大小int sizePy = bufferConsumeQueue.getByteBuffer().getInt();// 消息的tags的hashcodelong tagsCode = bufferConsumeQueue.getByteBuffer().getLong();// 设置消息物理位置拉取到的最大offsetmaxPhyOffsetPulling = offsetPy;// 当 offsetPy 小于 nextPhyFileStartOffset 时,意味着对应的 Message 已经移除,所以直接continue,直到可读取的Message。if (nextPhyFileStartOffset != Long.MIN_VALUE) {if (offsetPy < nextPhyFileStartOffset)continue;}// 校验当前Master Broker存储的所有消息的最大物理位点与offsetPy的差值是否大于内存40%的最大存储容量// 校验查询的消息是否在硬盘中boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);/*** sizePy:消息大小* maxMsgNums:查询的消息数量* getResult.getBufferTotalSize(): 0 获取的消息的buffer大小* getResult.getMessageCount(): maxMsgNums 获取的消息数量* isInDisk:消息是否是在硬盘中*/// 判断是否已经获得足够消息或者传输使用的内存或硬盘大小已满if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),isInDisk)) {break;}boolean extRet = false, isTagsCodeLegal = true;if (consumeQueue.isExtAddr(tagsCode)) {extRet = consumeQueue.getExt(tagsCode, cqExtUnit);if (extRet) {tagsCode = cqExtUnit.getTagsCode();} else {// can't find ext content.Client will filter messages by tag also.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",tagsCode, offsetPy, sizePy, topic, group);isTagsCodeLegal = false;}}// 过滤消息if (messageFilter != null&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}continue;}// 根据消息在commitLog的偏移量和消息大小获取CommitLog中对应的消息内容SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);if (null == selectResult) {// 从commitLog无法读取到消息,说明该消息对应的文件(MappedFile)已经删除,计算下一个MappedFile的起始位置if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.MESSAGE_WAS_REMOVING;}nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);continue;}if (messageFilter != null&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}// release...selectResult.release();continue;}// 返回消息结果添加查询到的消息,nextPhyFileStartOffset重置为Long.MIN_VALUEthis.storeStatsService.getGetMessageTransferedMsgCount().add(1);getResult.addMessage(selectResult);status = GetMessageStatus.FOUND;nextPhyFileStartOffset = Long.MIN_VALUE;}// 统计剩余可拉取消息字节数if (diskFallRecorded) {long fallBehind = maxOffsetPy - maxPhyOffsetPulling;brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);}// 计算下次拉取消息的消息队列位点nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);// maxOffsetPy和maxPhyOffsetPulling差值,表示还有多少消息没有拉取long diff = maxOffsetPy - maxPhyOffsetPulling;// Broker认为可使用的最大内存,为总物理内存的40%(默认),Slave是30%long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));// diff > memory 为true表示没有拉取的消息比分配的内存大,说明此时Master Broker内存繁忙,应该选择从Slave拉取消息。getResult.setSuggestPullingFromSlave(diff > memory);} finally {// 释放ConsumeQueue mappedFile的引用bufferConsumeQueue.release();}} else {status = GetMessageStatus.OFFSET_FOUND_NULL;nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "+ maxOffset + ", but access logic queue failed.");}}} else {// ConsumeQueue索引不存在,返回NO_MATCHED_LOGIC_QUEUE,并且返回下次查询开始的Offsetstatus = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);}// 统计if (GetMessageStatus.FOUND == status) {this.storeStatsService.getGetMessageTimesTotalFound().add(1);} else {this.storeStatsService.getGetMessageTimesTotalMiss().add(1);}long elapsedTime = this.getSystemClock().now() - beginTime;this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);// lazy init no data found.if (getResult == null) {getResult = new GetMessageResult(0);}getResult.setStatus(status);getResult.setNextBeginOffset(nextBeginOffset);getResult.setMaxOffset(maxOffset);getResult.setMinOffset(minOffset);return getResult;}

根据上述源码,逻辑流程解析如下:

(1)校验消息存储服务状态

校验消息存储服务是否处于shutdown状态,是则返回null;如果处于不可读状态,也返回null

(2)校验消费队列
  • 根据请求的topicqueueId调用findConsumeQueue方法获取消息存储内存ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable中的消费队列CosumeQueue,如果不存在则根据topicqueueId创建一个。

        public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 根据topic查找consumeQueueTable中存在的queueId-ConsumeQueue集合ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);// 不存在则创建一个if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 根据queueId查找ConsumeQueueConsumeQueue logic = map.get(queueId);// 不存在则创建一个新的ConsumeQueue,ConsumeQueue存储也是利用MappedFileQueueif (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}
    
  • 如果查询到的consumeQueue为空,表示找不到对应的消费队列索引,返回NO_MATCHED_LOGIC_QUEUE,并且设置下次请求的offset位点为0,表示从头获取消息;如果不为空,则获取consumeQueue中最小位点minOffsetconsumeQueue中最大位点maxOffset进行如下校验:

    • 最大位点maxOffset等于0,表示consumeQueue中没有消息,返回NO_MESSAGE_IN_QUEUE,设置下次请求正确的offset位点为0
    • 查询的offset小于最小位点minOffset,表示offset所在消息已经过期被删除,返回OFFSET_TOO_SMALL,设置下次请求的正确offset位点为minOffset
    • 查询的offset等于最大位点maxOffset,表示查询offset 超过消费队列最大位点一个位置(从0开始),返回OFFSET_OVERFLOW_ONE,下次请求的位点仍然是offset
    • 查询的offset大于最大位点maxOffset,表示查询offset超过消费队列太多(大于一个位置),返回OFFSET_OVERFLOW_BADLY,如果最小位点minOffset等于0,则设置下次请求的位点为minOffset,否则设置下次请求的位点为maxOffset。因为minOffset不等于0表示开头是事务消息或者过期被删,所以选择maxOffset
        public long getMinOffsetInQueue() {return this.minLogicOffset / CQ_STORE_UNIT_SIZE;}public long getMaxOffsetInQueue() {return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;}public long getMaxOffset() {MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {// 获取最后一个MappedFile的起始位置加上已写入的消息的位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}return 0;}private long nextOffsetCorrection(long oldOffset, long newOffset) {long nextOffset = oldOffset;// Broker不是Slave或者Slave允许offset检查if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {nextOffset = newOffset;}return nextOffset;}
    
(3)获取消息
  • 如果请求的offset通过上述校验,则根据offset获取消费队列索引中记录的字节内容bufferConsumeQueue,如果bufferConsumeQueue为空,表示找不到offset对应的索引信息,返回NO_MATCHED_LOGIC_QUEUE,设置下次请求的位点为ConsumeQueue下一个MappedFile的起始位置;

  • bufferConsumeQueue不为空就遍历消费队列索引中每一个索引信息(每个索引信息大小单位为CQ_STORE_UNIT_SIZE即20byte),每个索引信息按顺序可以读取到:

    • offsetPy:消息在CommitLog物理位置offset
    • sizePy:消息大小
    • tagsCode:消息的tags的hashcode

    每次循环则设置maxPhyOffsetPulling = offsetPymaxPhyOffsetPulling表示拉取到的消息最大物理位置offset。

    isInDisk校验当前Broker存储的所有消息的最大物理位点与offsetPy的差值是否大于内存最大存储容量的指定百分比(Master是40%,Slave是30%),用来校验查询的消息是否在硬盘中。

        private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));return (maxOffsetPy - offsetPy) > memory;}
    

    然后调用isTheBatchFull方法,判断是否已经获得足够消息或者传输使用的内存或硬盘大小已满,该方法入参含义如下:

    • sizePy:消息大小
    • maxMsgNums:查询的消息数量
    • bufferTotal:getResult.getBufferTotalSize(),初始值为0,表示已经获取的消息的buffer大小
    • messageTotal:getResult.getMessageCount(),表示已经获取的消息数量
    • isInDisk:消息是否是在硬盘中

    处理逻辑如下:

    • 如果已经获取的消息的buffer大小等于0,或者已经获取的消息数量为0,表示没有满,返回false
    • 如果查询的消息数量已经小于等于已经获取的消息数量,表示获取到足够的消息,返回true
    • 如果查询的消息存在硬盘中即isInDisk等于true
      • 如果已经获取的消息的buffer大小加上消息大小大于硬盘最大传输字节数maxTransferBytesOnMessageInDisk,返回true,maxTransferBytesOnMessageInDisk默认值为1024*64byte
      • 如果已经获取的消息数量大于硬盘最大传输数量maxTransferCountOnMessageInDisk-1返回true,maxTransferCountOnMessageInDisk默认值为8
    • 如果查询的消息存在内存中
      • 如果已经获取的消息的buffer大小加上消息大小大于内存最大传输字节数maxTransferBytesOnMessageInMemory,返回true,maxTransferBytesOnMessageInMemory默认值1024 * 256byte
      • 如果已经获取的消息数量大于内存最大传输数量maxTransferCountOnMessageInMemory-1,返回true,maxTransferCountOnMessageInMemory默认值为32
        private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {// 如果已经获取的消息的buffer大小等于0,或者已经获取的消息数量为0,表示没有满,返回falseif (0 == bufferTotal || 0 == messageTotal) {return false;}// 如果查询的消息数量已经小于等于已经获取的消息数量,表示获取到足够的消息,返回trueif (maxMsgNums <= messageTotal) {return true;}// 如果查询的消息存在硬盘中if (isInDisk) {// 如果已经获取的消息的buffer大小加上消息大小 大于 硬盘最大传输字节数,返回trueif ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {return true;}// 如果已经获取的消息数量 大于 硬盘最大传输数量-1 返回trueif (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {return true;}} else {// 消息存在内存中// 如果已经获取的消息的buffer大小加上消息大小 大于 内存最大传输字节数,返回trueif ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {return true;}// 如果已经获取的消息数量 大于 内存最大传输数量-1if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {return true;}}return false;}
    

    然后根据消息在commitLog的偏移量offsetPy和消息大小sizePy获取CommitLog中对应的消息内容selectResult

        public SelectMappedBufferResult getMessage(final long offset, final int size) {// CommitLog MappedFile大小,默认1Gint mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();// 根据offset找对应的MappedFile,如果查询offset等于0,则在根据offset找不到MappedFile时,会返回第一个MappedFileMappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);if (mappedFile != null) {// 计算offset相对MappedFile的位置 posint pos = (int) (offset % mappedFileSize);// 获取pos到size的内存大小return mappedFile.selectMappedBuffer(pos, size);}return null;}
    

    如果获取的消息字节内容selectResult为空,说明该消息对应的文件(MappedFile)已经删除,计算下一个有消息的MappedFile的起始位置nextPhyFileStartOffset,继续下一个循环。

    如果下一个循环从索引获取到的消息的CommitLog物理位置offset小于nextPhyFileStartOffset,则直接下一个循环知道获取到可读取的消息的offsetPy为止。

    然后将经过消息过滤的消息字节内容添加到返回结果中,直到isTheBatchFull方法返回true,退出循环

  • 如果diskFallRecorded,默认true,则统计剩余可拉取的字节数

  • 计算下次拉取消息的消息队列位点nextBeginOffset

  • 计算还有多少消息没有拉取diff = maxOffsetPy - maxPhyOffsetPulling,计算Broker认为可使用的最大内存(Master是40%,Slave是30%),如果diff > memory 为true表示没有拉取的消息比分配的内存大,说明此时当前Broker内存繁忙,应该选择从Slave拉取消息。

  • 释放ConsumeQueue mappedFile的引用,返回消息结果

6、处理消息结果

  • 设置responseresponseHeader,如果suggestPullingFromSlave=true,则设置下次请求建议拉取的Broker为whichBrokerWhenConsumeSlowly,默认为1,否则就是Master,值为0。

  • 判断当前Broker的角色,如果当前是Slave,且Slave配置不允许读取消息,则返回PULL_RETRY_IMMEDIATELY,suggestWhichBrokerId为Master,slaveReadEnable默认false

  • 如果Slave配置允许读取消息且繁忙,则下次请求向BrokerId为订阅组配置信息中whichBrokerWhenConsumeSlowly(默认为1)的Broker拉取消息,如果不繁忙,则下次请求向BrokerId为订阅组配置信息中brokerId(默认是Master)的Broker发送请求;否则直接设置suggestWhichBrokerId为Master

  • 根据获取消息结果的状态,设置response的编码code

  • 根据response的返回编码分别处理:

    • SUCCESS:

      主要是进行数据统计,将返回的消息结果中的消息字节缓存列表messageBufferList转换为字节放进response的body中

    • PULL_NOT_FOUND:

      如果brokerAllowSuspendhasSuspendFlag都为true,brokerAllowSuspend为true表示Broker允许挂起拉取请求,hasSuspendFlag为true时表示改拉取请求可以被暂时挂起,则创建PullRequest对象,调用this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);方法挂起该拉取请求,将pullRequest对象放进拉取请求挂起服务PullRequestHoldService的内存ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable中:

          public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {// 构建key,创建ManyPullRequest对象,并设置进pullRequestTable中String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);}
      

      拉取请求挂起服务PullRequestHoldService则会不断调用this.checkHoldRequest()方法检测内存pullRequestTable中的挂起的拉取请求是否有新消息到来,通过遍历ManyPullRequest拉取请求对象中PullRequest对象列表,比较PullRequest对象中记录的请求offset是否大于对应的最新最大的ConsumeQueue的offset,是则表示有新消息到来,则用pullMessageExecutor线程池另起一个线程调用PullMessageProcessor.this.processRequest方法再进行一次拉取消息逻辑。如果拉取请求的挂起时间超时,也会立即用pullMessageExecutor线程池另起一个线程调用PullMessageProcessor.this.processRequest方法再进行一次拉取消息逻辑。当然没有新消息进来,又没有超时的拉取请求会重新添加回ManyPullRequest中:

          protected void checkHoldRequest() {// 遍历pullRequestTablefor (String key : this.pullRequestTable.keySet()) {// 解析key为 topic、queueIdString[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);// 获取ConsumQueue的最大的offsetfinal long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);}public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {String key = this.buildKey(topic, queueId);// 获取对应的ManyPullRequest拉取请求对象ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;// 如果最新的ConsumeQueue的最大offset小于等于拉取请求的offset,表明没有新消息if (newestOffset <= request.getPullFromThisOffset()) {// 则再次尝试拉取最新的最大的ConsumeQueue的offsetnewestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 如果最新的ConsumeQueue的最大offset大于拉取请求的offset,表明有新消息进来if (newestOffset > request.getPullFromThisOffset()) {// 过滤消息boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}// 新消息匹配过滤规则if (match) {try {// 用pullMessageExecutor线程池另起一个线程调用PullMessageProcessor.this.processRequest方法再进行一次拉取消息逻辑// 但是不允许Broker挂起拉取请求this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}// 请求挂起超市// 则立即用pullMessageExecutor线程池另起一个线程调用PullMessageProcessor.this.processRequest方法再进行一次拉取消息逻辑 // 但是不允许Broker挂起拉取请求if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}// 没超时的,又没有新消息到的请求重新添加回去if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}
      

      调用完this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);方法,还会将response置为null,原因后面会讲到。

    • PULL_RETRY_IMMEDIATELY:

      不做额外处理

    • PULL_OFFSET_MOVED:

      如果当前Broker不是Slave或者配置offsetCheckInSlave为true,允许Slave对offset进行检查相关操作,则将该拉取请求创建一个topic为TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT(值OFFSET_MOVED_EVENT)的消息异步存储下来。而MonitorService监控服务会消费这个消息,用于上报监控信息。

7、更新消费位点

是否可以更新消费位点需要如下条件:

  • brokerAllowSuspend == true:Broker允许挂起拉取请求
  • hasCommitOffsetFlag == true:拉取请求中有CommitOffset标志,允许Broker根据拉取的请求更新保存的消费位点
  • this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE:当前Broker不是Slave

然后通过调用this.brokerController.getConsumerOffsetManager().commitOffset方法更新Broker的消费位点管理服务ConsumerOffsetManager内存ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable中对应的消费位点记录:

    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,final long offset) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;this.commitOffset(clientHost, key, queueId, offset);}private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null == map) {map = new ConcurrentHashMap<Integer, Long>(32);map.put(queueId, offset);this.offsetTable.put(key, map);} else {Long storeOffset = map.put(queueId, offset);if (storeOffset != null && offset < storeOffset) {log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);}}}

8、返回结果

至此,我们拿到了拉取消息的结果,回到NettyRemotingAbstract#processRequestCommand方法,RocketMQ使用Netty框架对请求、响应进行处理:

    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}

而在processRequestCommand方法中,通过对注册在NettyRemotingAbstract服务内存HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable中的请求编码与处理器映射,获取请求编码对应的处理器,调用processor.processRequest(ctx, cmd);方法进入处理逻辑。

这里创建了个RemotingResponseCallback回调实例,用于处理处理器返回的responsecallback.callback(response);,需要注意的是,当response返回null时,是不会将response发送回给请求端的,这里就解释了为什么上面要特意设置response = null,因为这样就实现了Broker挂起拉取请求。

    try {String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}}

http://chatgpt.dhexx.cn/article/JjQqeNbj.shtml

相关文章

直播报名 | 海外社交媒体趋势如何?出海品牌如何掌握消费者洞察?

近年来&#xff0c;中国品牌出海势头强劲&#xff0c;智能硬件、互联网应用、时尚服饰等正加速风靡海外市场&#xff0c;涌现出像安克创新、SHEIN这样的全球化品牌。有人提问&#xff0c;安克创新&#xff0c;凭借什么成为全球化品牌&#xff1f;出海企业可以从中借鉴什么&…

Kafka消费者不消费数据

背景&#xff1a; 工作往往是千篇一律&#xff0c;真正能学到点知识都是在上线后。使用SkywalkingKafkaES进行应用监控。 现象&#xff1a; 公司使用Skywalking在开发测试环境中Kafka顺利消费数据&#xff0c;到了UAT环境一开始还正常&#xff0c;后面接入了更多的应用后出现…

食品品牌如何做好消费需求洞察直抵消费者心智

做生意的都明白这样一个道理&#xff1a;“先找到买主&#xff0c;再依照需求出售”。之所以这一点很重要&#xff0c;因为这揭示了一条经营企业过程中必须遵守的金科玉律&#xff1a;先有需求&#xff0c;而后才有你的品牌、服务或功能。 需求洞察是品牌、产品、服务等一切的…

消费者洞察:数据影响消费,消费营造数据

本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在9月24日的直播演讲内容整理,演讲围绕“如何洞察消费者”从四个层面展开:首先是(疫情期间以及后疫情时代)消费品行业的发展现状和未来趋势;然后是当前现状下如何通过数据化闭环洞察消费者;有了前面的理论支撑和方…

助力品牌洞察——消费者情绪行为分析

什么是情绪分析&#xff1f; 随着社交网络和数字营销的出现&#xff0c;消费者对产品和品牌的评价受到越来越多的关注。在线用户反馈&#xff08;例如产品评价、社交媒体评论和调查问卷等&#xff09;包含了大量具有价值的数据。通过这些数据&#xff0c;可以了解消费者对您产…

数据分析:消费者数据分析

数据分析&#xff1a;消费者数据分析 作者&#xff1a;i阿极 作者简介&#xff1a;Python领域新星作者、多项比赛获奖者&#xff1a;博主个人首页 &#x1f60a;&#x1f60a;&#x1f60a;如果觉得文章不错或能帮助到你学习&#xff0c;可以点赞&#x1f44d;收藏&#x1f4c1…

消费者洞察:案例透视消费者洞察实践与收益

本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在“如何洞察您的消费者”直播课主题演讲整理。点击链接(https://live.vhall.com/534333188)查看完整演讲视频,关注Stratifyd微信公众号并在后台回复“粉丝群”,还可进群申领演讲课件,及时关注Stratifyd最新培训资讯…

消费者洞察:一文看懂消费者是如何做选择的

作者&#xff1a;付永承 全文共 4214 字&#xff0c;阅读需要 9 分钟 ———— / BEGIN / ———— “为什么有时候明明比对手更有优势&#xff0c;产品就是无人问津&#xff1f;” “为什么花了那么多广告费&#xff0c;销量怎么就是上不来&#xff1f;” “为什么之前的营销…

消费者洞察:数据化闭环洞察消费者

本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在“如何洞察您的消费者”直播课主题演讲整理。上周我们通过“数据影响消费,消费营造数据”这篇文章了解了消费品行业的发展现状,新生代互联网消费者更乐于分享表达,然而消费者触点分散,对企业洞察消费者、了解消费者…

系统资源不足,无法完成请求服务。

使用visual studio 编译时&#xff1a; 清理了磁盘…重装了VS2019 没用… 后来一个学长说试试卸载McAfee well done!

win10突然提示系统资源不足,无法完成请求服务的解决方法(VS)win10跑代码变慢

大概率是McAfee搞得鬼 解决方法&#xff1a;点击PC安全-实时扫描-关闭 电脑也快多了&#xff0c;跑代码也快了很多 直接卸载McAfee也可

c语言无纸化软件系统资源不足,系统资源不足无法完成请求的服务怎么办_系统资源不足卸载迈克菲解决教程 - 系统家园...

最近很多用户在问小编系统资源不足无法完成请求的服务怎么解决&#xff0c;小编电脑没有遇到这个问题&#xff0c;但是看到了贴吧很多小伙伴都有这个问题&#xff0c;发现有个共性就是装了迈克菲McAfee杀毒防护软件&#xff0c;下面快点来看看怎么解决问题吧。 系统资源不足卸载…

windows远程拷贝和解压大文件异常:系统资源不足无法完成请求的服务

windows服务器异常&#xff1a;系统资源不足无法完成请求的服务 Insufficient system resources exist to complete the requested service. by qunying.liu(刘群英) 问题描述&#xff1a; 开发人员需要将线上环境windows的某些数据文件复制到开发环境windows用于测试&#xff…

c语言无纸化软件系统资源不足,win10系统安装软件显示“系统资源不足,无法完成请求的服务”的解决方法...

相信不少网友在win10系统安装软件时都遇到过“系统资源不足&#xff0c;无法完成请求的服务”的情况&#xff0c;到底是怎么回事&#xff1f;检查内存空间都足够大的&#xff0c;其实往往导致这一问题的不是存储空间不足问题&#xff0c;而是病毒问题。知道原因后&#xff0c;问…

c语言无纸化软件系统资源不足,win10系统安装软件显示“系统资源不足,无法完成请求的服务”的解决方法-系统城...

相信不少网友在win10系统安装软件时都遇到过“系统资源不足&#xff0c;无法完成请求的服务”的情况&#xff0c;到底是怎么回事&#xff1f;检查内存空间都足够大的&#xff0c;其实往往导致这一问题的不是存储空间不足问题&#xff0c;而是病毒问题。知道原因后&#xff0c;问…

打开U盘 提示 服务器无法运行,win10打开u盘提示“系统资源不足 无法完成请求的服务”怎么办...

一位用户在windows10系统电脑上连接U盘后&#xff0c;发现打开时总会弹出“位置不可用”的提示框&#xff0c;提示&#xff1a;无法访问X:\ 系统资源不足 无法完成请求的服务&#xff0c;这是怎么回事呢&#xff1f;其实&#xff0c;该问题是u盘损坏或内存配置过小所导致的。下…

瑞友服务器系统资源不足,win10/win7打开软件提示系统资源不足,无法完成请求服务的解决方法...

作者&#xff1a;陈俊 日期&#xff1a;2017-11-12 12:32 近期有些win7、win10用户莫名的遇到了一个问题&#xff0c;就是打开疯狂的美工软件的时候提示系统资源不足,无法完成请求服务&#xff0c;刚开始以为是系统缺少了某些组件&#xff0c;结果发现是国外杀毒 迈克菲(McA…

系统使用一段时间就出现“系统资源不足,无法完成请求的服务”

2019独角兽企业重金招聘Python工程师标准>>> 系统使用一段时间后&#xff0c;就会出现这个问题&#xff0c;导致电脑无法在打开软件&#xff0c;网络连接禁止使用&#xff0c;无法进行关机。只能强制关机。 经查证&#xff0c;发现System的句柄数在一直增加。增加到…

win10计算机资源不足无法登陆,win10电脑安装软件提示“系统资源不足,无法完成请求的服务”如何解决...

我们经常会在电脑中安装软件&#xff0c;不过有时候并没有那么顺利&#xff0c;比如有win10系统用户反映说在电脑中安装软件的时候&#xff0c;提示“系统资源不足&#xff0c;无法完成请求的服务”&#xff0c;导致无法安装软件&#xff0c;经过检查发现是病毒问题&#xff0c…

吃鸡显示服务器资源不足,win7系统玩吃鸡提示系统资源不足无法完成请求的服务如何解决...

有不少win7系统玩家反映说在电脑中玩吃鸡游戏的时候&#xff0c;却遇到提示系统资源不足无法完成请求的服务的情况&#xff0c;该怎么解决这样的问题呢&#xff0c;本文就给大家讲解一下win7系统玩吃鸡提示系统资源不足无法完成请求的服务的具体解决步骤吧。 情况一、 启动项加…