本文主要是介绍RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
重复消息
我们需要给我们的消费者实现幂等解决重复消息,也就是对同一个消息的处理结果,执行多少次都不变。
这个还是需要结合具体的业务的。你可以使用写入Redis来保证,因为Redis 的 key 和 value 就是天然支持幕等的。
当然还有使用用数据库插入法,基于数据库的唯一键来保证重复数据不会被插入多条。
不过最主要的还是需要根据特定场景使用特定的解决方案,你要知道你的消息消费是否是完全不可重复消费、还是可以忍受重复消费的,然后再选择强校验和弱校验的方式。
消息堆积
RocketMQ的消息堆积,一般都是因为客户端本地消费过程中,由于消费耗时过长或消费并发度较小等原因,导致客户端消费能力不足,出现消息堆积的问题。当线上出现消息堆积的问题时,一般有以下几种方式来解决:
- 增加消费者数量:消息堆积了,消费不过来了,那就把消费者的数量增加一下,让更多人的实例来消费这些消心。
- 提升消费者消费速度:消费者消费的慢可能是消息堆积的主要原因,想办法提升消费速度,比如引入线程池、本地消息存储后即返回成功后续再慢慢消费等。
- 降低生产者的生产速度:如果生产者可控的话,可以让生产者生成消息的速度慢一点。
- 清理过期消息:有一些过期消息、或者一直无法成功的消息,在业务做评估之后,如果无影响或者影响不大,其实是可以清理的。
- 调整RocketMQ的配置参数:RocketMQ提供了很多可配配置的参数,例如消息消费模式、消息拉取间隔时间等,可以根据实际情况来调整这些参数,从而优化消息消费的为效率
- 增加Topic队列数:如果一个Topic的队列数比较少,那么就容易出现消息堆积的情况。可以通过增加队列数来提高消息的处理并发度,从而减少消息堆积。
回溯消费
回溯消费是指在向Consumer已经消费成功的消息,由于业务上需求需要重新消费,在RocketMQ中,Broker投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,间维度精确到毫秒。
如何防止消息不丢失
RocketMQ的消息想要确保不丢失,需要生产者、消费者以及Broker的共同努力,缺一不可。
首先在生产者端,消息的发送分为同步和异步两种,在同步发送消息的情况下,消息的发送会同步阻塞等待Broker返回结果,在Broker确认收到消息之后,生产者才会拿到SendResult。如果这个过程中发生异常,那么就说明消息发送可能失败了,就需要生产者进行重新发送消息。
但是Broker其实并不会立即把消息存储到磁盘上,而是先存储到内存中,内存存储成功之后,就返回给确认结果给生产者了。然后再通过异步刷盘的方式将内存中的数据存储到磁盘上。但是这个过程中,如果机器挂了,那么就可能会导致数据丢失。
如果想要保证消息不丢失,可以将消息保存机制修改为同步刷盘,这样,Broker会在同步请求中把数据保存在磁盘上,确保保存成功后再返回确认结果给生产者。
除了同步发送消息,还有异步发送,异步发送的话就需要生产者重写SendCallback的onSuccess和onException方法,用于给Broker进行回调。在方法中实现消息的确认或者重新发送。
为了保证消息不丢失,RocketMQ肯定要通过集群方式进行部署,Broker通常采用一主多从部署方式,并且采用主从同步的方式做数据复制。
当主Broker宕机时,从Broker会接管主Broker的工作,保证消息不丢失。RocketMQ的Broker还可以配置多个实例,消息会在多个Broker之间进行冗余备份,从而保证数据的的可靠性。默认方式下,Broker在接收消息后,写入master成功,就可以返回确认响应给生产者了,接着消息将会异步复制到slave节点。但是如果这个过程中,Master的磁盘损坏了。那就会导致数据丢失了。如果想要解决这个问题,可以配置同步复制的方式,即Master在将数据同步到Slave节点后,再返回给生产者确认结果。
在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。
所以,在消费者的代码中,一定要在业务逻辑的最后一步 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;当然,也可以先把数据保存在数据库中,就返回,然后自己再慢慢处理。
但是,需要注意的是RocketMQ和Kafka一样,只能最大限度的为保证消息不丢失,但是没办法做到100%保证不丢失。例如异步发送时,要执行回调时Brocker宕机了,或者生产者宕机了。。。
这篇关于RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!