本文主要是介绍MQ消息丢失和积压问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
👽System.out.println(“👋🏼嗨,大家好,我是代码不会敲的小符,双非大四,Java实习中…”);
📚System.out.println(“🎈如果文章中有错误的地方,恳请大家指正!共同进步,共同成长✊”);
🌟System.out.println(“💡如果文章对您有所帮助,希望您可以三连支持一下博主噢🔥”);
🌈System.out.println("🚀正在完成计划中:接下来的三个月里,对梦想的追逐 ");
文章目录
- 背景
- 如何解决消息队列的延时以及过期失效问题?
- 怎么处理消息积压?
- 消费故障
- 消费过慢
- 最后
背景
在上一篇文章 消息队列MQ 中,提到了MQ可以削峰和消息持久化,等待其它系统准备好后进行消费。那么消息积压过大是怎么处理的呢、消息队列的消息会过期失效吗
如何解决消息队列的延时以及过期失效问题?
消息队列中的消息是会过期失效的。
假设使⽤的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在队列中积压超过⼀定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第⼆个坑了。如果数据会⼤量积压在 mq ⾥,⼤量的数据可能会丢失。
如果丢失了大量的消息,只能等到高峰期过去之后,写一个临时程序手动将丢失的数据一点一点排查出来,重新发送给mq,进行批量重导数据补取。
- 假设 1 w个订单积压在 mq ⾥⾯,没有处理,其中 1k 个订单都丢了,只能⼿动写程序把那 1k 个订单给查出来,⼿动发到 mq ⾥去再补⼀次。
怎么处理消息积压?
高峰期,积压原因:消费者由于⾃身原因消费失败、消费者消费消息过慢
消费故障
-
跳过非重要消息
如果一些消息是允许丢失的,可以把消息全部丢弃 -
提高消费能⼒
换个说法就是消费过慢,请继续往下看
消费过慢
-
增加机器的数量
提⾼消费者的并⾏度,部署更多的 consumer 机器,Topic 的 MessageQueue 也需要有对应的增加- 因为如果 consumer 机器有5台,然后 MessageQueue 只有4个,那么意味着有⼀个consumer机器是获取不到消息的。
- 加消费者机器(MessageQueue⽐消费者多,不改代码),临时申请多台机器多个部署消费者系统的实例,然后消费者系统同时消费,每个⼈消费⼀个MessageQueue的消息。处理完百万积压的消息之后,就可以下线多余的机器了。
- 加消费者机器(MessageQueue少,需要改代码),这个时候就没办法扩容消费者系统了,因为加再多的消费者系统,还是只有⼏个 MessageQueue,没法并⾏消费。所以此时往往是临时修改那消费者系统的代码,让他们获取到消息不是正常去处理,⽽是直接把消息写⼊⼀个新的Topic,这个速度是很快的,因为仅仅是转发⼀下,不⽤业务处理。然后新的 Topic 有更多个 MessageQueue,然后再部署更多台临时增加的消费者系统,去消费新的 Topic,消费完之后恢复原状。
-
增加消费者 consumer 的线程数量
可以利用线程池给 consumer 分配更多的线程数量,⼀台 consumer 机器上的消费线程越多,消费的速度就越快。
@Service
public class MyConsumerService {@Autowiredprivate Executor messageExecutor; // 自定义线程池@KafkaListener(id="test",topics={"topic-test"})public void listen(String message){System.out.println("收到消息:" + message);messageExecutor.submit(new MyWork(message);}
}
- 开启消费者的批量消费功能
某些业务流程如果⽀持批量⽅式消费,则可以很⼤程度上提⾼消费吞吐量。
优化每条消息消费过程,提升消费者的硬件配置或者改善消息消费的处理逻辑。
最后
慢慢的来,别着急!学会有质量的走过每一步
我是代码不会敲的小符,希望认识更多有经验的大佬,也在努力摸索出自己的道路
欢迎添加小符微信:A13781678921,一起加油
这篇关于MQ消息丢失和积压问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!