RocketMQ快速入门:如何保证消息不丢失|保证消息可靠性(九)

2024-06-20 20:12

本文主要是介绍RocketMQ快速入门:如何保证消息不丢失|保证消息可靠性(九),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

0. 引言

在金融、电商等对数据完整性要求极高的行业,消息的丢失可能会导致数据不一致,严重影响业务逻辑和数据统计,也影响客户体验,所以在很多业务场景下,我们都要求数据不能丢失。而rocketmq中,如何对消息防丢失进行处理的呢?

1. 原理

1.2 产生消息丢失的场景

首先我们要理解消息的传递过程,在哪些阶段会导致消息丢失,才能知道如何进行防控。

我们之前分析过rabbitmq如何保证消息不丢失, rabbitmq内部有交换机这一转发步骤,所以相对比rocketmq更加复杂,但是两者的分析方法是一致的。
在这里插入图片描述

rocketmq的消息传递分为3个阶段:
(1)生产者发送消息到broker的队列中
(2)broker存储消息
(3)消费者到队列获取消息进行消费
在这里插入图片描述
而这三个阶段可能会导致消息丢失的场景是什么呢?其实由rabbitmq的分析我们可以得到启发。
(1)生产者发送消息到broker的队列中

生产者在发出消息后,可能因为网络异常、broker宕机,导致发出的消息实际并没有到达broker

(2)broker存储消息

broker的存储机制是将消息先存储到内存,存储完成后再发送回执给生产者,然后再异步将数据刷到磁盘,但如果在这个刷盘这个过程中broker宕机了,也会导致消息丢失

(3)消费者到队列获取消息进行消费

broker在将消息发出后,同样可能因为网络异常、消费者宕机或者消息者消费到一半产生错误等因素,导致消息实际并没有被消费者消费,但broker又扣除了这条消息,就会导致消息丢失

1.2 防丢失措施

阶段一:生产者发送消息到broker的队列中

1、因为发送到broker期间网络因素我们很难干预,也很难百分比保证。第一点我们能做的,如果其中一个broker宕机,那能有备用节点顶上,保证可用性。于是第一项就是多节点部署broker

2、但万一节点都挂了呢,或者整个机房网络瘫痪了,如何保证消息不丢失,我们只要从上游控制,如果下游不通时,就不要发了,待会再发。于是也衍生出消息发送失败时的重试机制

3、但如果一直重发不成功怎么办呢,那就需要下游告知上游,这次发送没成功,你记录好状态,这就是broker要有返回状态告知,否则生产者也不知道到底发送成功没有。broker中提供了3种发送方式:同步、异步、单向(详见之前的文章: RocketMQ快速入门:集成java客户端实现各类消息发送|异步、同步、顺序、单向、延迟、事务(五)附带源码)。

这三种方式中单向肯定不行,他是不管返回结果的,最容易丢失消息。而异步需要设置回调函数,在回调函数中处理发送失败时的逻辑,如果对于一些场景回调里很能补救,最常见的就是回调里进行重发,所以最优先保证消息可靠的就是同步发送的方式,一旦获取到发送失败,就进行补救处理,或者不再继续后续的业务逻辑,整个流程直接报错打回

总结一下,生产阶段保证消息可靠的手段包括多节点部署broker, 消息重发、同步发送。这几种方式实际上是可以配合使用的, 比如多节点部署,通过同步发送,发送失败时进行3次重发,都重发失败则记录状态。
在这里插入图片描述

阶段二:broker存储消息

存储阶段导致丢失的原因就是因为broker默认的是异步刷盘机制,如果改成同步刷盘呢,先存储到内存,然后刷新到磁盘,刷新成功后才给生产者返回成功收到的回执,以此保证消息可靠。rocketmq中也是支持同步刷盘的。

但如果只有一个节点的话,即使同步刷盘,当broker宕机后,没有备用的,还是会导致服务不可用,相对可靠性就没有保障了。所以同步刷盘,也可以配合着多节点部署使用

当然如果你的场景对可用性要求不高,即使宕机,只要报错会生产者就行,那同步刷盘也足够了

总结一下,存储阶段主要依赖同步刷盘和多节点部署来保障可靠性,当然多节点部署可以根据业务情况和成本预算选择。
在这里插入图片描述

阶段三:消费者到队列获取消息进行消费

消费阶段的丢失可能性主要来源于消费者没处理好之前就宕机或则异常了,首先一点能想到的那就多个消费者呢,但实际上多点部署在消费阶段并不能解决问题,因为rocketmq消费模式有广播模式和集群模式,广播模式下每个节点都会收到消息,这个模式下的天然就是多节点部署。而集群模式本身也是基于多消费者的情况,但两者都无法保障当消息发送给某一节点后,这个节点拉去了消息,但没实际处理完就异常的场景。

所以就考虑当消费者消费完成后,再给broker发送成功消费的回执,这时broker才更新消息偏移量,将消息标识为被消费。如此才能保障消息的可靠性。

在这里插入图片描述

2. 实现

2.1 生产阶段

1、多节点部署,可以部署主从或集群模式,这里不讲解详细的搭建流程,后续单独讲解

2、同步发送,主要通过producer.send来实现同步发送

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {// 声明groupDefaultMQProducer producer = new DefaultMQProducer("group_test");// 声明namesrv地址producer.setNamesrvAddr("localhost:9876");// 设置重试次数producer.setRetryTimesWhenSendFailed(2);// 启动实例producer.start();// 设置消息的topic,tag以及消息体Message msg = new Message("topic_test", "tag_test", "消息内容1".getBytes(StandardCharsets.UTF_8));// 发送消息,并设置10s连接超时SendResult send = producer.send(msg, 10000);System.out.println("发送结果:"+send);// 关闭实例producer.shutdown();}

如果是springboot集成的,可以通过SendResult sendResult = rocketMQTemplate.syncSend("topic_test:tag_test", message);来实现。通过发送结果SendResult对象,来判断发送失败后的处理逻辑

3、发送重试
(1)首先手动重发的实现很简单,只需要根据send.getSendStatus()状态来判断,如果需要重发多次的,可以结合guava-retry 等重发组件来更方便的实现

// 发送消息,并设置10s连接超时SendResult send = producer.send(msg, 10000);System.out.println("发送结果:"+send);if(!send.getSendStatus().equals(SendStatus.SEND_OK)){// 发送失败,手动重发send = producer.send(msg, 10000);}

(2)当然rocketmq也封装好了重试机制给我们使用,其重试机制采用衰减的形式,也就是重试间隔时间会逐渐增加
在这里插入图片描述
我们只需要通过producer.setRetryTimesWhenSendFailed(2);方法即可设置,会在发送失败时自动触发重新发送,同时如果超过设置的超时时间还未接收到成功的结果也会触发重发机制,就不需要我们手动书写重发逻辑了,更加推荐这种方式。

        // 声明groupDefaultMQProducer producer = new DefaultMQProducer("group_test");// 声明namesrv地址producer.setNamesrvAddr("localhost:9876");// 设置重试次数producer.setRetryTimesWhenSendFailed(2);// 启动实例producer.start();

2.2 存储阶段

1、主要将broker的刷盘策略设置为同步刷盘,需要修改broker.conf配置文件

# 设置为同步刷盘模式
flushDiskType = SYNC_FLUSH

2、如果配置的是多节点,一般是主从模式,为了防止主节点有数据,从节点没刷到数据的情况,就需要开启从节点刷盘后再返回ACK回执给生产者,需要修改从节点broker配置文件

# 默认为 ASYNC_MASTER
brokerRole=SYNC_MASTER

broker提供了两种主从同步模式:ASYNC_MASTER异步 和 SYNC_MASTER同步

  • ASYNC_MASTER:

消息发送到master节点后,开启一个异步线程更新给从节点,这个过程中有消息同步丢失的风险,优点是性能高

  • SYNC_MASTER:

消息发送到master节点后,同步更新到从节点,当从节点更新完再返回成功的ACK回执给生产者,表示消息发送成功,可靠性高,但性能会有所下降

3、关于多节点部署,我们在后续单独讲解

2.3 消费阶段

1、其实现就是消费后返回成功状态即可

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");consumer.setNamesrvAddr("127.0.0.1:9876");// 集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);// 设置topicconsumer.subscribe("topic_test", "*");// 设置重试次数consumer.setMaxReconsumeTimes(2);// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {for (MessageExt msg : list) {String topic = msg.getTopic();try {String messageBody = new String(msg.getBody(), "utf-8");System.out.println(topic+":"+messageBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();

2、如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER状态,消费者就会触发稍后重试机制进行重新消费,同样的可以通过consumer.setMaxReconsumeTimes设置最大重试次数

// 设置重试次数
consumer.setMaxReconsumeTimes(2);

其重试次数和时延等级与生产重试是一致的
在这里插入图片描述

这篇关于RocketMQ快速入门:如何保证消息不丢失|保证消息可靠性(九)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1079141

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

电脑桌面文件删除了怎么找回来?别急,快速恢复攻略在此

在日常使用电脑的过程中,我们经常会遇到这样的情况:一不小心,桌面上的某个重要文件被删除了。这时,大多数人可能会感到惊慌失措,不知所措。 其实,不必过于担心,因为有很多方法可以帮助我们找回被删除的桌面文件。下面,就让我们一起来了解一下这些恢复桌面文件的方法吧。 一、使用撤销操作 如果我们刚刚删除了桌面上的文件,并且还没有进行其他操作,那么可以尝试使用撤销操作来恢复文件。在键盘上同时按下“C

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题:

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

poj 2104 and hdu 2665 划分树模板入门题

题意: 给一个数组n(1e5)个数,给一个范围(fr, to, k),求这个范围中第k大的数。 解析: 划分树入门。 bing神的模板。 坑爹的地方是把-l 看成了-1........ 一直re。 代码: poj 2104: #include <iostream>#include <cstdio>#include <cstdlib>#include <al

hdu 4565 推倒公式+矩阵快速幂

题意 求下式的值: Sn=⌈ (a+b√)n⌉%m S_n = \lceil\ (a + \sqrt{b}) ^ n \rceil\% m 其中: 0<a,m<215 0< a, m < 2^{15} 0<b,n<231 0 < b, n < 2^{31} (a−1)2<b<a2 (a-1)^2< b < a^2 解析 令: An=(a+b√)n A_n = (a +

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们的MySQL服务安装好了之后, 会有一个配置文件, 也就

v0.dev快速开发

探索v0.dev:次世代开发者之利器 今之技艺日新月异,开发者之工具亦随之进步不辍。v0.dev者,新兴之开发者利器也,迅速引起众多开发者之瞩目。本文将引汝探究v0.dev之基本功能与优势,助汝速速上手,提升开发之效率。 何谓v0.dev? v0.dev者,现代化之开发者工具也,旨在简化并加速软件开发之过程。其集多种功能于一体,助开发者高效编写、测试及部署代码。无论汝为前端开发者、后端开发者