RocketMQ广播模式消费失败是否会重试?

2024-08-23 00:28

本文主要是介绍RocketMQ广播模式消费失败是否会重试?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 继续
  • 广播和集群模式的消费流程
    • 集群模式(默认的)
    • 广播模式
    • 小结
  • push和pull介绍
    • 源码展示
  • 偏移量保存失败情况
      • 1. 网络问题
      • 2. Consumer本地问题
      • 3. 消费进度记录器问题
      • 4. 程序设计问题
      • 5. 异常终止
      • 6. 持久化策略问题
      • 7. 同步问题
  • 源码解析
    • `OffsetStore` 接口
    • LocalFileOffsetStore 类
  • 总结

前言

前两天有个同事问了一个问题:“在广播模式下消息消费是否会重试?”,而我的答案是会重试,因为在我的印象中RocketMQ有个最少消费一次机制,自然就会想不管怎么样都有可能出现重复消费的情况。但他立马百度查了一下:

image-20240808153752930

啪啪打脸!!!!!!,当时哥们哑口无言,哈哈哈!

但还是带着疑问的,那RocketMQ是怎么保证最少消费一次的?

继续

其实这个问题应该换种问法,按一般的思考逻辑,我们说的重试是是否会进入RocketMQ的重试队列走它的退避算法,所以应该问:“RocketMQ消息消费失败是否会进入重试队列?”,那这个结果是:不会,如果是是否会重试,是会的,就算不考虑生产者重复推送和Rebalance再均衡机制,消息还是有可能造成重试的。

本文不介绍什么是广播模式和集群模式以及生产者重复推送和Rebalance机制,大家可自行了解。本文只介绍消费端

广播和集群模式的消费流程

  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer。
  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个Consumer。

集群模式(默认的)

  1. 消息生产者(Producer)将消息发送到RocketMQ的一个主题(Topic),Broker 接收到消息后,将其存储在相应的队列(Message Queue)中。
  2. 消费者实例Consumer会订阅一个或多个主题。当一个消费者订阅某个主题时,RocketMQ会根据负载均衡策略将该主题的消息队列分配给消费者实例。
  3. 消费者实例会主动向Broker发送拉取请求(Pull Request),从被分配的消息队列中获取消息。每个消费者只会拉取自己被分配的队列中的消息。
  4. 消息处理,也就是我们自己的代码逻辑
  5. 消费者每次处理完消息后,会提交当前消费进度(Offset)。集群模式下,这一进度信息会被存储在Broker中。
  6. 如果消息处理失败,可以配置RocketMQ的重试机制,消费者会重新拉取并处理失败的消息,直到处理成功或达到最大重试次数。

第6步就是我同事理解的重试机制,会进入重试队列!多一嘴:如果捕获了异常是不会重试的

                 +------------------+|    Producer      |+------------------+|V+------------------+|     Broker       |+------------------+|    |    |    MQ1    MQ2   MQ3  (多个消息队列)|    |    |+--------+--------+|  Consumer Group  |  (集群模式)|                  |+--------+--------+--------+--------+|   Consumer A    |   Consumer B    |  (多个消费者实例)+-----------------+-----------------+(Queue 1, Queue 3)   (Queue 2)

广播模式

  1. 消息生产者Producer将消息发送到RocketMQ的一个主题(Topic)
  2. 消费组(Consumer Group在广播模式下依然存在,但其意义有所不同。每个消费组中的所有消费者实例都会消费该组内订阅的所有消息队列中的消息。
  3. 消费者实例(Consumer订阅一个或多个主题。在广播模式下,消费者不会根据负载均衡策略分配队列,而是每个消费者都会接收并消费该主题的所有队列中的消息。
  4. 每个消费者实例会主动向Broker发送拉取请求(Pull Request),从该主题的所有消息队列中获取消息。
  5. 消费者收到消息后,对消息进行处理。
  6. 广播模式下,消费进度由消费者本地管理,而不是由Broker统一管理。消费者在本地记录其消费的消息偏移量(Offset)。
  7. 在广播模式下,由于每个消费者都会接收并处理相同的消息,因此消息不会丢失。
                 +------------------+|    Producer      |+------------------+|V+------------------+|     Broker       |+------------------+|    |    |    MQ1    MQ2   MQ3  (多个消息队列)|    |    |+--------+--------+|  Consumer Group  |  (广播模式)|                  |+--------+--------+--------+--------+|   Consumer A    |   Consumer B    |  (多个消费者实例)+-----------------+-----------------+(MQ1, MQ2, MQ3)    (MQ1, MQ2, MQ3)  (每个消费者接收所有队列中的消息)

小结

首先我们要知道一个点,集群模式下,消费指针是保存在broker中的,而广播模式中的消费指针则保存在各自的消费者本地

所以在集群模式下,消费者消费完消息之后是会告诉Broker当前的偏移量的,从而,如果Broker没有收到消费者偏移量的响应,就会造成下次消费仍然从之前的偏移量开始消费,造成重复消费。

而在广播模式中。偏移量既然是保存在各自的消费者本地,那只要没有保存成功,下次还是会从上一次的偏移量拉取。同样也衍生出另一个问题,当我们选择push模式消费时,偏移量既然是保存在本地的,那broker是怎么知道当前消费者知道消费到哪了,从而不重复push呢

这就要详细了解一下RocketMQ的pushpull这两种消费模式了;

push和pull介绍

RocketMQ的消费模型中,严格来说,没有真正的“push”模式,消费者始终是通过主动拉取(pull)消息的方式工作。无论是集群模式还是广播模式,消费者都会周期性地向Broker发送拉取请求,以获取新消息。这种方式可以被看作是“长轮询”或“循环拉取”的一种变体。

所以本质也就是:消费的主动拉取机制分为两种循环拉取长轮询

  • 循坏拉取:消费者会在一个循环中不断地向Broker发送拉取请求,即使没有新消息,消费者也会周期性地发送请求。这种方式确保了当有新消息到达时,消费者可以尽快获取到消息。(这种方式也就是我们常规理解的Pull模式)
  • 长轮询:RocketMQ的拉取请求支持长轮询机制。消费者向Broker发送拉取请求时,可以指定一个最长等待时间(brokerSuspendMaxTimeMillis)。如果在这段时间内Broker没有新消息可供拉取,Broker会在超时之前持有这个请求,一旦有新消息到达就立即返回给消费者。这样可以减少无效的拉取请求,降低系统资源消耗。(这种方式也就是我们理解的Push)

源码展示

这里我们只展示push 的代码,因为只有push是跟我们常规理解不一样的,大家可以自行去看看pull的

DefaultMQPushConsumerImpl 类是消费者的核心实现类,其中负责拉取消息的方法如下:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {.........................................................................................................public void pullMessage(final PullRequest pullRequest) {// ProcessQueue: 表示消息处理队列,消费者从Broker拉取的消息会存入ProcessQueue中。final ProcessQueue processQueue = pullRequest.getProcessQueue();// isDropped: 检查ProcessQueue是否已经被丢弃。如果ProcessQueue被丢弃(可能是因为Rebalance操作),则停止拉取消息。if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}//  更新最后的拉取时间戳,用于判断消费者的活跃状态。pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {//  检查消费者的状态是否正常,如果异常,则延迟执行拉取请求this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return;}// 检查消费者是否处于暂停状态,如果是,则延迟拉取请求。if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}// 流量控制: 如果本地缓存的消息数量或消息大小超过了阈值,则延迟拉取消息,以避免消费者处理不过来long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);......................................................................................// 顺序消费: 如果是顺序消费模式,消费者必须确保消息队列被锁定,如果没有锁定则延迟拉取消息// 偏移量修正: 在首次拉取消息时,可能需要修正消费偏移量,以确保从正确的位置开始消费。if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {......................................................................................return;}} else {if (processQueue.isLocked()) {if (!pullRequest.isPreviouslyLocked()) {......................................................................................}} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}// 检查是否有订阅数据,如果没有,延迟拉取请求。final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}// 这里是拉取消息最重要的逻辑final long beginTimestamp = System.currentTimeMillis();// 异步拉取消息的回调接口,在onSuccess中处理不同的拉取结果。PullCallback pullCallback = new PullCallback() {@Override// 包含拉取到的消息以及下一次拉取的起始偏移量。public void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);// 根据拉取结果状态进行相应处理,如FOUND表示成功拉取到消息,OFFSET_ILLEGAL表示偏移量非法,需特殊处理。switch (pullResult.getPullStatus()) {case FOUND:............................................................break;case NO_NEW_MSG:case NO_MATCHED_MSG:......................................................................................break;case OFFSET_ILLEGAL:......................................................................................break;default:break;}}}};// 构建系统标志(sysFlag): 根据不同的条件构建拉取请求的系统标志,标识是否提交偏移量、订阅信息等。boolean commitOffsetEnable = false;long commitOffsetValue = 0L;if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);try {// 调用拉取API: 最终调用pullKernelImpl提交拉取请求,并指定PullCallback进行异步回调处理。this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}
}

我们可以看到DefaultMQPushConsumerImpl依然使用的pullMessage,同时涉及检查消费者状态、流量控制、顺序消费的处理、订阅信息的获取等关键步骤。

那既然了解了pull和push的区别,那么回到问题,在广播模式下,什么情况下会造成偏移量保存的失败:

偏移量保存失败情况

1. 网络问题

  • 网络中断或不稳定: 广播模式下的消费者保存偏移量时仍需要与Broker通信。如果网络出现问题,偏移量保存请求可能会失败。
  • 网络延迟过高: 高延迟的网络可能导致偏移量保存操作超时,进而失败。

2. Consumer本地问题

  • 消费者本地磁盘故障: 在广播模式下,偏移量通常是存储在消费者本地的(例如磁盘或本地文件系统)。如果消费者的磁盘出现问题,偏移量可能无法成功保存。
  • 消费者崩溃: 如果消费者在保存偏移量之前崩溃,偏移量将不会被更新,导致后续重启时重新消费这些消息。

3. 消费进度记录器问题

  • 进度记录器异常: 在广播模式下,消费者自己负责管理消费进度。如果进度记录器出现异常(例如文件系统读写错误、配置错误等),会导致偏移量无法正确保存。

4. 程序设计问题

  • 代码逻辑错误: 如果在实现消费者的代码中存在逻辑错误,例如在偏移量保存前就返回或未正确捕获异常,可能会导致偏移量未被成功保存。
  • 不合理的异常处理: 如果在偏移量保存失败后未能及时重试或补救,偏移量可能会丢失,影响消费进度的准确性。

5. 异常终止

  • 应用强制退出: 如果消费者进程被强制终止或系统突然关机,当前的偏移量可能未能保存到本地,导致重启后从上一个已保存的偏移量开始消费。

6. 持久化策略问题

  • 异步持久化策略: 如果消费者采用异步持久化策略,在保存偏移量时未能及时持久化,进程退出或出现故障时可能导致偏移量丢失。

7. 同步问题

  • 并发更新冲突: 如果同一消费者实例内存在多线程或异步处理逻辑,并发更新偏移量时未进行妥善的同步处理,可能导致偏移量更新失败或记录错误的偏移量。

源码解析

上面我们一直说广播模式的偏移量会保存在本地,那具体是哪呢?

OffsetStore 接口

OffsetStore 是管理消费进度的接口,其具体实现类包括 LocalFileOffsetStoreRemoteBrokerOffsetStore

广播模式下使用 LocalFileOffsetStore 进行本地存储。

public interface OffsetStore {void load() throws MQClientException;void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);long readOffset(final MessageQueue mq, final ReadOffsetType type);void persistAll(final Set<MessageQueue> mqs);void persist(final MessageQueue mq);void removeOffset(MessageQueue mq);void cloneOffset(final MessageQueue srcMQ, final MessageQueue destMQ);
}

LocalFileOffsetStore 类

LocalFileOffsetStore 类在本地文件中存储消费进度。

image-20240822153149537

public class LocalFileOffsetStore implements OffsetStore {// 本地文件路径private final String storePath;// 用于存储消费进度的内存结构private ConcurrentMap<MessageQueue, AtomicLong> offsetTable;// 构造函数public LocalFileOffsetStore(MQClientInstance mQClientFactory, String consumerGroup) {this.storePath = mQClientFactory.getClientConfig().getClientLocalOffsetStoreDir()+ File.separator + consumerGroup;this.offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();}@Overridepublic void persistAll(final Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;String encodeFileName = this.storePath + File.separator + "offsets.json";// 持久化消费进度到本地文件// 文件写入逻辑省略}@Overridepublic void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly) {AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));offsetOld = this.offsetTable.get(mq);}if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}@Overridepublic long readOffset(final MessageQueue mq, final ReadOffsetType type) {// 从内存或本地文件中读取消费进度// 读取逻辑省略return 0;}// 其他方法省略
}

在广播模式下,消费者使用 LocalFileOffsetStore 在本地存储消费进度。消费者不会将消费进度汇报给Broker,而是通过 persistAllupdateOffset 方法将消费进度存储在本地文件中。这确保了消费者在重启时可以从上次的消费进度继续消费,以保证至少消费一次的语义。

总结

所以在广播模式中,消费失败是不会进入重试队列的,这也是我同事想描述的,而我理解的是重复消费;所以在广播模式中只要本地偏移量没有持久化,就会造成重复消费

很多人看不到未来,其实看到了未来 ———————《弱智吧》

这篇关于RocketMQ广播模式消费失败是否会重试?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097825

相关文章

Spring使用@Retryable实现自动重试机制

《Spring使用@Retryable实现自动重试机制》在微服务架构中,服务之间的调用可能会因为一些暂时性的错误而失败,例如网络波动、数据库连接超时或第三方服务不可用等,在本文中,我们将介绍如何在Sp... 目录引言1. 什么是 @Retryable?2. 如何在 Spring 中使用 @Retryable

如何测试计算机的内存是否存在问题? 判断电脑内存故障的多种方法

《如何测试计算机的内存是否存在问题?判断电脑内存故障的多种方法》内存是电脑中非常重要的组件之一,如果内存出现故障,可能会导致电脑出现各种问题,如蓝屏、死机、程序崩溃等,如何判断内存是否出现故障呢?下... 如果你的电脑是崩溃、冻结还是不稳定,那么它的内存可能有问题。要进行检查,你可以使用Windows 11

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

Codeforces Round #113 (Div. 2) B 判断多边形是否在凸包内

题目点击打开链接 凸多边形A, 多边形B, 判断B是否严格在A内。  注意AB有重点 。  将A,B上的点合在一起求凸包,如果凸包上的点是B的某个点,则B肯定不在A内。 或者说B上的某点在凸包的边上则也说明B不严格在A里面。 这个处理有个巧妙的方法,只需在求凸包的时候, <=  改成< 也就是说凸包一条边上的所有点都重复点都记录在凸包里面了。 另外不能去重点。 int

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法

easyui同时验证账户格式和ajax是否存在

accountName: {validator: function (value, param) {if (!/^[a-zA-Z][a-zA-Z0-9_]{3,15}$/i.test(value)) {$.fn.validatebox.defaults.rules.accountName.message = '账户名称不合法(字母开头,允许4-16字节,允许字母数字下划线)';return fal

【iOS】MVC模式

MVC模式 MVC模式MVC模式demo MVC模式 MVC模式全称为model(模型)view(视图)controller(控制器),他分为三个不同的层分别负责不同的职责。 View:该层用于存放视图,该层中我们可以对页面及控件进行布局。Model:模型一般都拥有很好的可复用性,在该层中,我们可以统一管理一些数据。Controlller:该层充当一个CPU的功能,即该应用程序

迭代器模式iterator

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/iterator 不暴露集合底层表现形式 (列表、 栈和树等) 的情况下遍历集合中所有的元素

【408DS算法题】039进阶-判断图中路径是否存在

Index 题目分析实现总结 题目 对于给定的图G,设计函数实现判断G中是否含有从start结点到stop结点的路径。 分析实现 对于图的路径的存在性判断,有两种做法:(本文的实现均基于邻接矩阵存储方式的图) 1.图的BFS BFS的思路相对比较直观——从起始结点出发进行层次遍历,遍历过程中遇到结点i就表示存在路径start->i,故只需判断每个结点i是否就是stop