MQ消息丢失和积压问题

2024-02-07 22:04
文章标签 问题 丢失 消息 mq 积压

本文主要是介绍MQ消息丢失和积压问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

👽System.out.println(“👋🏼嗨,大家好,我是代码不会敲的小符,双非大四,Java实习中…”);
📚System.out.println(“🎈如果文章中有错误的地方,恳请大家指正!共同进步,共同成长✊”);
🌟System.out.println(“💡如果文章对您有所帮助,希望您可以三连支持一下博主噢🔥”);
🌈System.out.println("🚀正在完成计划中:接下来的三个月里,对梦想的追逐 ");

文章目录

    • 背景
    • 如何解决消息队列的延时以及过期失效问题?
    • 怎么处理消息积压?
      • 消费故障
      • 消费过慢
    • 最后

背景

在上一篇文章 消息队列MQ 中,提到了MQ可以削峰和消息持久化,等待其它系统准备好后进行消费。那么消息积压过大是怎么处理的呢、消息队列的消息会过期失效吗

如何解决消息队列的延时以及过期失效问题?

消息队列中的消息是会过期失效的。

假设使⽤的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在队列中积压超过⼀定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第⼆个坑了。如果数据会⼤量积压在 mq ⾥,⼤量的数据可能会丢失。

如果丢失了大量的消息,只能等到高峰期过去之后,写一个临时程序手动将丢失的数据一点一点排查出来,重新发送给mq,进行批量重导数据补取。

  • 假设 1 w个订单积压在 mq ⾥⾯,没有处理,其中 1k 个订单都丢了,只能⼿动写程序把那 1k 个订单给查出来,⼿动发到 mq ⾥去再补⼀次。

怎么处理消息积压?

高峰期,积压原因:消费者由于⾃身原因消费失败、消费者消费消息过慢

消费故障

  1. 跳过非重要消息
    如果一些消息是允许丢失的,可以把消息全部丢弃

  2. 提高消费能⼒
    换个说法就是消费过慢,请继续往下看

消费过慢

  1. 增加机器的数量
    提⾼消费者的并⾏度,部署更多的 consumer 机器,Topic 的 MessageQueue 也需要有对应的增加

    • 因为如果 consumer 机器有5台,然后 MessageQueue 只有4个,那么意味着有⼀个consumer机器是获取不到消息的。
    • 加消费者机器(MessageQueue⽐消费者多,不改代码),临时申请多台机器多个部署消费者系统的实例,然后消费者系统同时消费,每个⼈消费⼀个MessageQueue的消息。处理完百万积压的消息之后,就可以下线多余的机器了。
    • 加消费者机器(MessageQueue少,需要改代码),这个时候就没办法扩容消费者系统了,因为加再多的消费者系统,还是只有⼏个 MessageQueue,没法并⾏消费。所以此时往往是临时修改那消费者系统的代码,让他们获取到消息不是正常去处理,⽽是直接把消息写⼊⼀个新的Topic,这个速度是很快的,因为仅仅是转发⼀下,不⽤业务处理。然后新的 Topic 有更多个 MessageQueue,然后再部署更多台临时增加的消费者系统,去消费新的 Topic,消费完之后恢复原状。
    • 在这里插入图片描述
  2. 增加消费者 consumer 的线程数量
    可以利用线程池给 consumer 分配更多的线程数量,⼀台 consumer 机器上的消费线程越多,消费的速度就越快。

@Service
public class MyConsumerService {@Autowiredprivate Executor messageExecutor;	// 自定义线程池@KafkaListener(id="test",topics={"topic-test"})public void listen(String message){System.out.println("收到消息:" + message);messageExecutor.submit(new MyWork(message);}
}
  1. 开启消费者的批量消费功能
    某些业务流程如果⽀持批量⽅式消费,则可以很⼤程度上提⾼消费吞吐量。
    优化每条消息消费过程,提升消费者的硬件配置或者改善消息消费的处理逻辑。

最后

慢慢的来,别着急!学会有质量的走过每一步


我是代码不会敲的小符,希望认识更多有经验的大佬,也在努力摸索出自己的道路
欢迎添加小符微信:A13781678921,一起加油

这篇关于MQ消息丢失和积压问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/689057

相关文章

springboot3.4和mybatis plus的版本问题的解决

《springboot3.4和mybatisplus的版本问题的解决》本文主要介绍了springboot3.4和mybatisplus的版本问题的解决,主要由于SpringBoot3.4与MyBat... 报错1:spring-boot-starter/3.4.0/spring-boot-starter-

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

在 Spring Boot 中使用异步线程时的 HttpServletRequest 复用问题记录

《在SpringBoot中使用异步线程时的HttpServletRequest复用问题记录》文章讨论了在SpringBoot中使用异步线程时,由于HttpServletRequest复用导致... 目录一、问题描述:异步线程操作导致请求复用时 Cookie 解析失败1. 场景背景2. 问题根源二、问题详细分

解读为什么@Autowired在属性上被警告,在setter方法上不被警告问题

《解读为什么@Autowired在属性上被警告,在setter方法上不被警告问题》在Spring开发中,@Autowired注解常用于实现依赖注入,它可以应用于类的属性、构造器或setter方法上,然... 目录1. 为什么 @Autowired 在属性上被警告?1.1 隐式依赖注入1.2 IDE 的警告:

解决java.lang.NullPointerException问题(空指针异常)

《解决java.lang.NullPointerException问题(空指针异常)》本文详细介绍了Java中的NullPointerException异常及其常见原因,包括对象引用为null、数组元... 目录Java.lang.NullPointerException(空指针异常)NullPointer

Android开发中gradle下载缓慢的问题级解决方法

《Android开发中gradle下载缓慢的问题级解决方法》本文介绍了解决Android开发中Gradle下载缓慢问题的几种方法,本文给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、网络环境优化二、Gradle版本与配置优化三、其他优化措施针对android开发中Gradle下载缓慢的问

关于Nginx跨域问题及解决方案(CORS)

《关于Nginx跨域问题及解决方案(CORS)》文章主要介绍了跨域资源共享(CORS)机制及其在现代Web开发中的重要性,通过Nginx,可以简单地解决跨域问题,适合新手学习和应用,文章详细讲解了CO... 目录一、概述二、什么是 CORS?三、常见的跨域场景四、Nginx 如何解决 CORS 问题?五、基

MySQL安装时initializing database失败的问题解决

《MySQL安装时initializingdatabase失败的问题解决》本文主要介绍了MySQL安装时initializingdatabase失败的问题解决,文中通过图文介绍的非常详细,对大家的学... 目录问题页面:解决方法:问题页面:解决方法:1.勾选红框中的选项:2.将下图红框中全部改为英

Nginx启动失败:端口80被占用问题的解决方案

《Nginx启动失败:端口80被占用问题的解决方案》在Linux服务器上部署Nginx时,可能会遇到Nginx启动失败的情况,尤其是错误提示bind()to0.0.0.0:80failed,这种问题通... 目录引言问题描述问题分析解决方案1. 检查占用端口 80 的进程使用 netstat 命令使用 ss