RocketMQ广播模式消费失败是否会重试?

2024-08-23 00:28

本文主要是介绍RocketMQ广播模式消费失败是否会重试?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 继续
  • 广播和集群模式的消费流程
    • 集群模式(默认的)
    • 广播模式
    • 小结
  • push和pull介绍
    • 源码展示
  • 偏移量保存失败情况
      • 1. 网络问题
      • 2. Consumer本地问题
      • 3. 消费进度记录器问题
      • 4. 程序设计问题
      • 5. 异常终止
      • 6. 持久化策略问题
      • 7. 同步问题
  • 源码解析
    • `OffsetStore` 接口
    • LocalFileOffsetStore 类
  • 总结

前言

前两天有个同事问了一个问题:“在广播模式下消息消费是否会重试?”,而我的答案是会重试,因为在我的印象中RocketMQ有个最少消费一次机制,自然就会想不管怎么样都有可能出现重复消费的情况。但他立马百度查了一下:

image-20240808153752930

啪啪打脸!!!!!!,当时哥们哑口无言,哈哈哈!

但还是带着疑问的,那RocketMQ是怎么保证最少消费一次的?

继续

其实这个问题应该换种问法,按一般的思考逻辑,我们说的重试是是否会进入RocketMQ的重试队列走它的退避算法,所以应该问:“RocketMQ消息消费失败是否会进入重试队列?”,那这个结果是:不会,如果是是否会重试,是会的,就算不考虑生产者重复推送和Rebalance再均衡机制,消息还是有可能造成重试的。

本文不介绍什么是广播模式和集群模式以及生产者重复推送和Rebalance机制,大家可自行了解。本文只介绍消费端

广播和集群模式的消费流程

  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer。
  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个Consumer。

集群模式(默认的)

  1. 消息生产者(Producer)将消息发送到RocketMQ的一个主题(Topic),Broker 接收到消息后,将其存储在相应的队列(Message Queue)中。
  2. 消费者实例Consumer会订阅一个或多个主题。当一个消费者订阅某个主题时,RocketMQ会根据负载均衡策略将该主题的消息队列分配给消费者实例。
  3. 消费者实例会主动向Broker发送拉取请求(Pull Request),从被分配的消息队列中获取消息。每个消费者只会拉取自己被分配的队列中的消息。
  4. 消息处理,也就是我们自己的代码逻辑
  5. 消费者每次处理完消息后,会提交当前消费进度(Offset)。集群模式下,这一进度信息会被存储在Broker中。
  6. 如果消息处理失败,可以配置RocketMQ的重试机制,消费者会重新拉取并处理失败的消息,直到处理成功或达到最大重试次数。

第6步就是我同事理解的重试机制,会进入重试队列!多一嘴:如果捕获了异常是不会重试的

                 +------------------+|    Producer      |+------------------+|V+------------------+|     Broker       |+------------------+|    |    |    MQ1    MQ2   MQ3  (多个消息队列)|    |    |+--------+--------+|  Consumer Group  |  (集群模式)|                  |+--------+--------+--------+--------+|   Consumer A    |   Consumer B    |  (多个消费者实例)+-----------------+-----------------+(Queue 1, Queue 3)   (Queue 2)

广播模式

  1. 消息生产者Producer将消息发送到RocketMQ的一个主题(Topic)
  2. 消费组(Consumer Group在广播模式下依然存在,但其意义有所不同。每个消费组中的所有消费者实例都会消费该组内订阅的所有消息队列中的消息。
  3. 消费者实例(Consumer订阅一个或多个主题。在广播模式下,消费者不会根据负载均衡策略分配队列,而是每个消费者都会接收并消费该主题的所有队列中的消息。
  4. 每个消费者实例会主动向Broker发送拉取请求(Pull Request),从该主题的所有消息队列中获取消息。
  5. 消费者收到消息后,对消息进行处理。
  6. 广播模式下,消费进度由消费者本地管理,而不是由Broker统一管理。消费者在本地记录其消费的消息偏移量(Offset)。
  7. 在广播模式下,由于每个消费者都会接收并处理相同的消息,因此消息不会丢失。
                 +------------------+|    Producer      |+------------------+|V+------------------+|     Broker       |+------------------+|    |    |    MQ1    MQ2   MQ3  (多个消息队列)|    |    |+--------+--------+|  Consumer Group  |  (广播模式)|                  |+--------+--------+--------+--------+|   Consumer A    |   Consumer B    |  (多个消费者实例)+-----------------+-----------------+(MQ1, MQ2, MQ3)    (MQ1, MQ2, MQ3)  (每个消费者接收所有队列中的消息)

小结

首先我们要知道一个点,集群模式下,消费指针是保存在broker中的,而广播模式中的消费指针则保存在各自的消费者本地

所以在集群模式下,消费者消费完消息之后是会告诉Broker当前的偏移量的,从而,如果Broker没有收到消费者偏移量的响应,就会造成下次消费仍然从之前的偏移量开始消费,造成重复消费。

而在广播模式中。偏移量既然是保存在各自的消费者本地,那只要没有保存成功,下次还是会从上一次的偏移量拉取。同样也衍生出另一个问题,当我们选择push模式消费时,偏移量既然是保存在本地的,那broker是怎么知道当前消费者知道消费到哪了,从而不重复push呢

这就要详细了解一下RocketMQ的pushpull这两种消费模式了;

push和pull介绍

RocketMQ的消费模型中,严格来说,没有真正的“push”模式,消费者始终是通过主动拉取(pull)消息的方式工作。无论是集群模式还是广播模式,消费者都会周期性地向Broker发送拉取请求,以获取新消息。这种方式可以被看作是“长轮询”或“循环拉取”的一种变体。

所以本质也就是:消费的主动拉取机制分为两种循环拉取长轮询

  • 循坏拉取:消费者会在一个循环中不断地向Broker发送拉取请求,即使没有新消息,消费者也会周期性地发送请求。这种方式确保了当有新消息到达时,消费者可以尽快获取到消息。(这种方式也就是我们常规理解的Pull模式)
  • 长轮询:RocketMQ的拉取请求支持长轮询机制。消费者向Broker发送拉取请求时,可以指定一个最长等待时间(brokerSuspendMaxTimeMillis)。如果在这段时间内Broker没有新消息可供拉取,Broker会在超时之前持有这个请求,一旦有新消息到达就立即返回给消费者。这样可以减少无效的拉取请求,降低系统资源消耗。(这种方式也就是我们理解的Push)

源码展示

这里我们只展示push 的代码,因为只有push是跟我们常规理解不一样的,大家可以自行去看看pull的

DefaultMQPushConsumerImpl 类是消费者的核心实现类,其中负责拉取消息的方法如下:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {.........................................................................................................public void pullMessage(final PullRequest pullRequest) {// ProcessQueue: 表示消息处理队列,消费者从Broker拉取的消息会存入ProcessQueue中。final ProcessQueue processQueue = pullRequest.getProcessQueue();// isDropped: 检查ProcessQueue是否已经被丢弃。如果ProcessQueue被丢弃(可能是因为Rebalance操作),则停止拉取消息。if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;}//  更新最后的拉取时间戳,用于判断消费者的活跃状态。pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {//  检查消费者的状态是否正常,如果异常,则延迟执行拉取请求this.makeSureStateOK();} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return;}// 检查消费者是否处于暂停状态,如果是,则延迟拉取请求。if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;}// 流量控制: 如果本地缓存的消息数量或消息大小超过了阈值,则延迟拉取消息,以避免消费者处理不过来long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);......................................................................................// 顺序消费: 如果是顺序消费模式,消费者必须确保消息队列被锁定,如果没有锁定则延迟拉取消息// 偏移量修正: 在首次拉取消息时,可能需要修正消费偏移量,以确保从正确的位置开始消费。if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {......................................................................................return;}} else {if (processQueue.isLocked()) {if (!pullRequest.isPreviouslyLocked()) {......................................................................................}} else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}// 检查是否有订阅数据,如果没有,延迟拉取请求。final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;}// 这里是拉取消息最重要的逻辑final long beginTimestamp = System.currentTimeMillis();// 异步拉取消息的回调接口,在onSuccess中处理不同的拉取结果。PullCallback pullCallback = new PullCallback() {@Override// 包含拉取到的消息以及下一次拉取的起始偏移量。public void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);// 根据拉取结果状态进行相应处理,如FOUND表示成功拉取到消息,OFFSET_ILLEGAL表示偏移量非法,需特殊处理。switch (pullResult.getPullStatus()) {case FOUND:............................................................break;case NO_NEW_MSG:case NO_MATCHED_MSG:......................................................................................break;case OFFSET_ILLEGAL:......................................................................................break;default:break;}}}};// 构建系统标志(sysFlag): 根据不同的条件构建拉取请求的系统标志,标识是否提交偏移量、订阅信息等。boolean commitOffsetEnable = false;long commitOffsetValue = 0L;if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}}String subExpression = null;boolean classFilter = false;SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if (sd != null) {if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}classFilter = sd.isClassFilterMode();}int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter);try {// 调用拉取API: 最终调用pullKernelImpl提交拉取请求,并指定PullCallback进行异步回调处理。this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}
}

我们可以看到DefaultMQPushConsumerImpl依然使用的pullMessage,同时涉及检查消费者状态、流量控制、顺序消费的处理、订阅信息的获取等关键步骤。

那既然了解了pull和push的区别,那么回到问题,在广播模式下,什么情况下会造成偏移量保存的失败:

偏移量保存失败情况

1. 网络问题

  • 网络中断或不稳定: 广播模式下的消费者保存偏移量时仍需要与Broker通信。如果网络出现问题,偏移量保存请求可能会失败。
  • 网络延迟过高: 高延迟的网络可能导致偏移量保存操作超时,进而失败。

2. Consumer本地问题

  • 消费者本地磁盘故障: 在广播模式下,偏移量通常是存储在消费者本地的(例如磁盘或本地文件系统)。如果消费者的磁盘出现问题,偏移量可能无法成功保存。
  • 消费者崩溃: 如果消费者在保存偏移量之前崩溃,偏移量将不会被更新,导致后续重启时重新消费这些消息。

3. 消费进度记录器问题

  • 进度记录器异常: 在广播模式下,消费者自己负责管理消费进度。如果进度记录器出现异常(例如文件系统读写错误、配置错误等),会导致偏移量无法正确保存。

4. 程序设计问题

  • 代码逻辑错误: 如果在实现消费者的代码中存在逻辑错误,例如在偏移量保存前就返回或未正确捕获异常,可能会导致偏移量未被成功保存。
  • 不合理的异常处理: 如果在偏移量保存失败后未能及时重试或补救,偏移量可能会丢失,影响消费进度的准确性。

5. 异常终止

  • 应用强制退出: 如果消费者进程被强制终止或系统突然关机,当前的偏移量可能未能保存到本地,导致重启后从上一个已保存的偏移量开始消费。

6. 持久化策略问题

  • 异步持久化策略: 如果消费者采用异步持久化策略,在保存偏移量时未能及时持久化,进程退出或出现故障时可能导致偏移量丢失。

7. 同步问题

  • 并发更新冲突: 如果同一消费者实例内存在多线程或异步处理逻辑,并发更新偏移量时未进行妥善的同步处理,可能导致偏移量更新失败或记录错误的偏移量。

源码解析

上面我们一直说广播模式的偏移量会保存在本地,那具体是哪呢?

OffsetStore 接口

OffsetStore 是管理消费进度的接口,其具体实现类包括 LocalFileOffsetStoreRemoteBrokerOffsetStore

广播模式下使用 LocalFileOffsetStore 进行本地存储。

public interface OffsetStore {void load() throws MQClientException;void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);long readOffset(final MessageQueue mq, final ReadOffsetType type);void persistAll(final Set<MessageQueue> mqs);void persist(final MessageQueue mq);void removeOffset(MessageQueue mq);void cloneOffset(final MessageQueue srcMQ, final MessageQueue destMQ);
}

LocalFileOffsetStore 类

LocalFileOffsetStore 类在本地文件中存储消费进度。

image-20240822153149537

public class LocalFileOffsetStore implements OffsetStore {// 本地文件路径private final String storePath;// 用于存储消费进度的内存结构private ConcurrentMap<MessageQueue, AtomicLong> offsetTable;// 构造函数public LocalFileOffsetStore(MQClientInstance mQClientFactory, String consumerGroup) {this.storePath = mQClientFactory.getClientConfig().getClientLocalOffsetStoreDir()+ File.separator + consumerGroup;this.offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();}@Overridepublic void persistAll(final Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;String encodeFileName = this.storePath + File.separator + "offsets.json";// 持久化消费进度到本地文件// 文件写入逻辑省略}@Overridepublic void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly) {AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));offsetOld = this.offsetTable.get(mq);}if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}@Overridepublic long readOffset(final MessageQueue mq, final ReadOffsetType type) {// 从内存或本地文件中读取消费进度// 读取逻辑省略return 0;}// 其他方法省略
}

在广播模式下,消费者使用 LocalFileOffsetStore 在本地存储消费进度。消费者不会将消费进度汇报给Broker,而是通过 persistAllupdateOffset 方法将消费进度存储在本地文件中。这确保了消费者在重启时可以从上次的消费进度继续消费,以保证至少消费一次的语义。

总结

所以在广播模式中,消费失败是不会进入重试队列的,这也是我同事想描述的,而我理解的是重复消费;所以在广播模式中只要本地偏移量没有持久化,就会造成重复消费

很多人看不到未来,其实看到了未来 ———————《弱智吧》

这篇关于RocketMQ广播模式消费失败是否会重试?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097825

相关文章

Linux系统配置NAT网络模式的详细步骤(附图文)

《Linux系统配置NAT网络模式的详细步骤(附图文)》本文详细指导如何在VMware环境下配置NAT网络模式,包括设置主机和虚拟机的IP地址、网关,以及针对Linux和Windows系统的具体步骤,... 目录一、配置NAT网络模式二、设置虚拟机交换机网关2.1 打开虚拟机2.2 管理员授权2.3 设置子

Docker镜像pull失败两种解决办法小结

《Docker镜像pull失败两种解决办法小结》有时候我们在拉取Docker镜像的过程中会遇到一些问题,:本文主要介绍Docker镜像pull失败两种解决办法的相关资料,文中通过代码介绍的非常详细... 目录docker 镜像 pull 失败解决办法1DrQwWCocker 镜像 pull 失败解决方法2总

SpringBoot如何通过Map实现策略模式

《SpringBoot如何通过Map实现策略模式》策略模式是一种行为设计模式,它允许在运行时选择算法的行为,在Spring框架中,我们可以利用@Resource注解和Map集合来优雅地实现策略模式,这... 目录前言底层机制解析Spring的集合类型自动装配@Resource注解的行为实现原理使用直接使用M

pip无法安装osgeo失败的问题解决

《pip无法安装osgeo失败的问题解决》本文主要介绍了pip无法安装osgeo失败的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 进入官方提供的扩展包下载网站寻找版本适配的whl文件注意:要选择cp(python版本)和你py

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

大数据spark3.5安装部署之local模式详解

《大数据spark3.5安装部署之local模式详解》本文介绍了如何在本地模式下安装和配置Spark,并展示了如何使用SparkShell进行基本的数据处理操作,同时,还介绍了如何通过Spark-su... 目录下载上传解压配置jdk解压配置环境变量启动查看交互操作命令行提交应用spark,一个数据处理框架

Nginx之upstream被动式重试机制的实现

《Nginx之upstream被动式重试机制的实现》本文主要介绍了Nginx之upstream被动式重试机制的实现,可以通过proxy_next_upstream来自定义配置,具有一定的参考价值,感兴... 目录默认错误选择定义错误指令配置proxy_next_upstreamproxy_next_upst

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Spring Retry 实现乐观锁重试实践记录

《SpringRetry实现乐观锁重试实践记录》本文介绍了在秒杀商品SKU表中使用乐观锁和MybatisPlus配置乐观锁的方法,并分析了测试环境和生产环境的隔离级别对乐观锁的影响,通过简单验证,... 目录一、场景分析 二、简单验证 2.1、可重复读 2.2、读已提交 三、最佳实践 3.1、配置重试模板

MySQL安装时initializing database失败的问题解决

《MySQL安装时initializingdatabase失败的问题解决》本文主要介绍了MySQL安装时initializingdatabase失败的问题解决,文中通过图文介绍的非常详细,对大家的学... 目录问题页面:解决方法:问题页面:解决方法:1.勾选红框中的选项:2.将下图红框中全部改为英