RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失

2024-08-25 21:20

本文主要是介绍RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

重复消息

我们需要给我们的消费者实现幂等解决重复消息,也就是对同一个消息的处理结果,执行多少次都不变。

这个还是需要结合具体的业务的。你可以使用写入Redis来保证,因为Redis 的 key 和 value 就是天然支持幕等的。
当然还有使用用数据库插入法,基于数据库的唯一键来保证重复数据不会被插入多条。

不过最主要的还是需要根据特定场景使用特定的解决方案,你要知道你的消息消费是否是完全不可重复消费、还是可以忍受重复消费的,然后再选择强校验和弱校验的方式。

消息堆积

RocketMQ的消息堆积,一般都是因为客户端本地消费过程中,由于消费耗时过长或消费并发度较小等原因,导致客户端消费能力不足,出现消息堆积的问题。当线上出现消息堆积的问题时,一般有以下几种方式来解决:

  1. 增加消费者数量:消息堆积了,消费不过来了,那就把消费者的数量增加一下,让更多人的实例来消费这些消心。
  2. 提升消费者消费速度:消费者消费的慢可能是消息堆积的主要原因,想办法提升消费速度,比如引入线程池、本地消息存储后即返回成功后续再慢慢消费等。
  3. 降低生产者的生产速度:如果生产者可控的话,可以让生产者生成消息的速度慢一点。
  4. 清理过期消息:有一些过期消息、或者一直无法成功的消息,在业务做评估之后,如果无影响或者影响不大,其实是可以清理的。
  5. 调整RocketMQ的配置参数:RocketMQ提供了很多可配配置的参数,例如消息消费模式、消息拉取间隔时间等,可以根据实际情况来调整这些参数,从而优化消息消费的为效率
  6. 增加Topic队列数:如果一个Topic的队列数比较少,那么就容易出现消息堆积的情况。可以通过增加队列数来提高消息的处理并发度,从而减少消息堆积。

回溯消费

回溯消费是指在向Consumer已经消费成功的消息,由于业务上需求需要重新消费,在RocketMQ中,Broker投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,间维度精确到毫秒。

如何防止消息不丢失

RocketMQ的消息想要确保不丢失,需要生产者、消费者以及Broker的共同努力,缺一不可。

首先在生产者端,消息的发送分为同步和异步两种,在同步发送消息的情况下,消息的发送会同步阻塞等待Broker返回结果,在Broker确认收到消息之后,生产者才会拿到SendResult。如果这个过程中发生异常,那么就说明消息发送可能失败了,就需要生产者进行重新发送消息
但是Broker其实并不会立即把消息存储到磁盘上,而是先存储到内存中,内存存储成功之后,就返回给确认结果给生产者了。然后再通过异步刷盘的方式将内存中的数据存储到磁盘上。但是这个过程中,如果机器挂了,那么就可能会导致数据丢失。

如果想要保证消息不丢失,可以将消息保存机制修改为同步刷盘,这样,Broker会在同步请求中把数据保存在磁盘上,确保保存成功后再返回确认结果给生产者。

除了同步发送消息,还有异步发送,异步发送的话就需要生产者重写SendCallback的onSuccess和onException方法,用于给Broker进行回调。在方法中实现消息的确认或者重新发送

为了保证消息不丢失,RocketMQ肯定要通过集群方式进行部署,Broker通常采用一主多从部署方式,并且采用主从同步的方式做数据复制。
当主Broker宕机时,从Broker会接管主Broker的工作,保证消息不丢失。RocketMQ的Broker还可以配置多个实例,消息会在多个Broker之间进行冗余备份,从而保证数据的的可靠性。默认方式下,Broker在接收消息后,写入master成功,就可以返回确认响应给生产者了,接着消息将会异步复制到slave节点。但是如果这个过程中,Master的磁盘损坏了。那就会导致数据丢失了。如果想要解决这个问题,可以配置同步复制的方式,即Master在将数据同步到Slave节点后,再返回给生产者确认结果

在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。
所以,在消费者的代码中,一定要在业务逻辑的最后一步 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;当然,也可以先把数据保存在数据库中,就返回,然后自己再慢慢处理。
但是,需要注意的是RocketMQ和Kafka一样,只能最大限度的为保证消息不丢失,但是没办法做到100%保证不丢失。例如异步发送时,要执行回调时Brocker宕机了,或者生产者宕机了。。。

这篇关于RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106683

相关文章

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

MySQL中删除重复数据SQL的三种写法

《MySQL中删除重复数据SQL的三种写法》:本文主要介绍MySQL中删除重复数据SQL的三种写法,文中通过代码示例讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下... 目录方法一:使用 left join + 子查询删除重复数据(推荐)方法二:创建临时表(需分多步执行,逻辑清晰,但会

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

poj2406(连续重复子串)

题意:判断串s是不是str^n,求str的最大长度。 解题思路:kmp可解,后缀数组的倍增算法超时。next[i]表示在第i位匹配失败后,自动跳转到next[i],所以1到next[n]这个串 等于 n-next[n]+1到n这个串。 代码如下; #include<iostream>#include<algorithm>#include<stdio.h>#include<math.

poj3261(可重复k次的最长子串)

题意:可重复k次的最长子串 解题思路:求所有区间[x,x+k-1]中的最小值的最大值。求sa时间复杂度Nlog(N),求最值时间复杂度N*N,但实际复杂度很低。题目数据也比较水,不然估计过不了。 代码入下: #include<iostream>#include<algorithm>#include<stdio.h>#include<math.h>#include<cstring

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

学习记录:js算法(二十八):删除排序链表中的重复元素、删除排序链表中的重复元素II

文章目录 删除排序链表中的重复元素我的思路解法一:循环解法二:递归 网上思路 删除排序链表中的重复元素 II我的思路网上思路 总结 删除排序链表中的重复元素 给定一个已排序的链表的头 head , 删除所有重复的元素,使每个元素只出现一次 。返回 已排序的链表 。 图一 图二 示例 1:(图一)输入:head = [1,1,2]输出:[1,2]示例 2:(图