本文主要是介绍Day10_08_消息队列之RabbitMQ消息可靠性传输 消息确认机制 死信队列详解及代码实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
RabbitMQ消息可靠性传输 消息确认机制 死信队列详解及代码实现
一. 消息可靠传递的重要性
我们在项目中使用RabbitMQ时,可能会遇到这样的问题:假如在一个订单系统中,用户付款成功,此时我们往RabbitMQ消息队列中发送一条消息,然后期望消息消费者修改订单状态,但是最终实际订单状态却并没有被成功修改.遇到这种问题我们排查的思路如下:
-
1️⃣.消息是否被成功的发送到消息队列?
-
2️⃣.消息是否有丢失的情况?
-
3️⃣.消息是否被成功的消费了?
在生产环境中是不允许出现消息发送/消费错误的情况的,因为这可能会给企业带来巨大的损失.本文将介绍RabbitMQ如何保证消息的可靠性(生产者保证消息可靠投递,消费者保证消息可靠消费,RabbitMQ持久化).
二. 保证消息可靠传递的手段
-
1️⃣.设置交换机、队列和消息都为持久化;
-
2️⃣.生产者消息确认机制;
-
3️⃣.消费者消息确认机制;
-
4️⃣.死信队列.
三. 设置交换机、消息队列和消息持久化
在生产过程中,难免会发生服务器宕机的事情,RabbitMQ也不例外.可能由于某种特殊情况下的异常而导致RabbitMQ宕机从而重启,那么这个时候对于消息队列里的数据,包括交换机、队列以及队列中存在的消息的恢复就显得尤为重要了.RabbitMQ本身带有持久化机制,包括交换机、队列以及消息的持久化.持久化的主要机制就是将信息写入磁盘,当RabbtiMQ服务宕机重启后,从磁盘中读取存入的持久化信息,恢复数据(当然凡事都不是100%的,只能尽最大程度的保证消息不会丢失).
持久化: 保证服务器重启的时候消息不丢失,重点解决服务器的异常崩溃而导致的消息丢失问题.但是,将所有的消息都设置为持久化,会严重影响RabbitMQ的性能,写入硬盘的速度比写入内存的速度慢的不只一点点.对于可靠性要求不是那么高的消息可以不持久化以提高整体的吞吐率.在选择是否要将消息进行持久化时,需要在可靠性和吞吐量之间做一个权衡.
对于某些应用场景,如大流量的订单交易系统,为了不影响性能,我们可以不设置持久化.但是我们会定时扫描数据库中的未发送成功的消息,进行重试发送.
1. 交换机的持久化
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel =connection.createChannel();#该交换机默认没有持久化
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
使用这种方法声明的交换机,默认不是持久化的,在服务器重启之后,交换机会消失.我们在管理台的Exchange页签下查看交换机,可以看到使用上述方法声明的交换机,Features一列是空的,即没有任何附加属性.
我们换用另一种方法声明交换机:
try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection=factory.newConnection();Channel channel = connection.createChannel();#该交换机实现了持久化channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
} catch (IOException e) {e.printStackTrace();
} catch (TimeoutException e) {e.printStackTrace();
}
查看一下方法的说明:
/*** Actively declare a non-autodelete exchange with no extra arguments* @see com.rabbitmq.client.AMQP.Exchange.Declare* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk* @param exchange the name of the exchange* @param type the exchange type* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)* @throws java.io.IOException if an error is encountered* @return a declaration-confirm method to indicate the exchange was successfully declared*/Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
我们可以看到第三个参数durable,如果为true时则表示要做持久化,当服务重启时,交换机依然存在.所以使用该方法声明的交换机是下面这个样子的(做测试的时候,需要先在管理台删掉原来的同名交换机).D表示durable,鼠标放在上边会显示为true.
2. 队列的持久化
与交换机的持久化相同,队列的持久化也是通过durable参数实现的,默认生成的随机队列不是持久化的.前面示例中声明的带有我们自定义名字的队列都是持久化的.
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
#第个参数用于进行持久化设置
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
看一下方法的定义:
/*** Declare a queue* @see com.rabbitmq.client.AMQP.Queue.Declare* @see com.rabbitmq.client.AMQP.Queue.DeclareOk* @param queue the name of the queue* @param durable true if we are declaring a durable queue (the queue will survive a server restart)* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* @param arguments other properties (construction arguments) for the queue* @return a declaration-confirm method to indicate the queue was successfully declared* @throws java.io.IOException if an error is encountered*/Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
第二个参数跟交换机方法的参数一样,true表示做持久化.当RabbitMQ服务重启时,队列依然存在.
这里说一下后边的三个参数:
-
exclusive是排他队列:如果一个队列被声明为排他队列,那么这个队列只能被第一次声明他的连接所见,并在连接断开的时候自动删除.这里有三点需要说明:
1️⃣.同一个连接的不同channel,是可以访问同一连接下创建的排他队列的;
2️⃣.排他队列只能被声明一次,其他连接不允许声明同名的排他队列;
3️⃣.即使排他队列是持久化的,当连接断开或者客户端退出时,排他队列依然会被删除.
-
autoDelete是自动删除:为true时,当没有任何消费者订阅该队列时,队列会被自动删除;
-
arguments:其它参数.
3. 消息的持久化
消息的持久化是指当消息从交换机发送到队列之后,被消费者消费之前,服务器突然宕机重启,消息仍然存在.消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无意义.
通过如下代码设置消息的持久化:
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
其中MessageProperties.PERSISTENT_TEXT_PLAIN
是设置持久化的参数.
我们查看basicPublish方法的定义.
/*** Publish a message* @see com.rabbitmq.client.AMQP.Basic.Publish* @param exchange the exchange to publish the message to* @param routingKey the routing key* @param props other properties for the message - routing headers etc* @param body the message body* @throws java.io.IOException if an error is encountered*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
在看下BasicProperties的类型
public static class BasicProperties {private String contentType;private String contentEncoding;private Map<String,Object> headers;#deliveryMode是设置消息持久化的参数private Integer deliveryMode;private Integer priority;private String correlationId;private String replyTo;private String expiration;private String messageId;private Date timestamp;private String type;private String userId;private String appId;private String clusterId;
其中deliveryMode是设置消息持久化的参数,等于1表示不设置持久化,等于2设置持久化.PERSISTENT_TEXT_PLAIN
是进行实例化的一个deliveryMode=2的常量对象,便于编程.
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2,0,null, null,null,null,null,null, null,null,null);
设置了队列的持久化和消息的持久化之后,当服务器宕机重启,存在队列中未发送的消息会依然存在.
注意:
持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,只有在内存吃紧的时候才会从内存中清除.
非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间.
以上就是关于RabbitMQ中持久化的一些内容,但是并不会严格的100%保证信息不会丢失.
四. 消息确认机制
1. 消息确认种类
RabbitMQ的消息确认有两种:
消息发送确认: 这种是用来确认生产者将消息发送到交换机,交换机传递到队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换机,二是确认是否到达队列.
消费接收确认: 这种是确认消费者是否成功消费了队列中的消息.
2. 消息发送确认
2.1 ConfirmCallback
通过实现ConfirmCallBack
接口,消息发送到交换机Exchange后触发该回调.
使用该功能需要开启确认,spring-boot中配置如下:
spring.rabbitmq.publisher-confirms = true
2.2 ReturnCallback
通过实现ReturnCallback
接口,如果消息从交换机发送到对应的队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发该回调).
使用该功能需要开启确认,spring-boot中配置如下:
spring.rabbitmq.publisher-returns = true
3. 消息接收确认
3.1 消息接收的确认模式
-
AcknowledgeMode.NONE: 不确认
-
AcknowledgeMode.AUTO: 自动确认
-
AcknowledgeMode.MANUAL: 手动确认
spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
3.2 手动确认
3.2.1 成功确认
void basicAck(long deliveryTag, boolean multiple) throws IOException;#deliveryTag:该消息的index;#multiple: 是否批量. true: 将一次性ack所有小于deliveryTag的消息.#消费者成功处理后,调用
#channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)
#方法对消息进行确认.
3.2.2 失败确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;#deliveryTag:该消息的index.#multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息.#requeue:被拒绝的消息是否重新进入队列.
void basicReject(long deliveryTag, boolean requeue) throws IOException;#deliveryTag:该消息的index.
#requeue: 被拒绝的是否重新入队列.
channel.basicNack
与 channel.basicReject
的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息.
五. 生产者的消息发送确认机制
1. 消息发送确认机制概述
当消息发送出去之后,我们如何知道消息有没有正确到达exchange呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了.
为解决这个问题,主要有如下两种方案:
-
通过事务机制实现;
-
通过生产者消息确认机制(publisher confirm)实现.
但是使用事务机制实现会严重降低RabbitMQ的消息吞吐量,我们采用一种轻量级的方案---生产者消息确认机制.
2. 什么是消息发送确认机制?
简而言之,就是 生产者发送的消息一旦被投递到所有匹配的队列之后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地.
如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出.
Mandatory参数:当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息.
六. 消费者的消息接收确认机制
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者的消息确认机制(message acknowledgement).采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题.因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息.
七. 死信队列
1. 死信队列与私信概述
DLX,Dead Letter Exchange
的缩写,又被称为死信邮箱、死信交换机.DLX就是一个普通的交换机,和一般的交换机没有任何区别.
当一个消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,RabbitMQ会自动发送).
2. 什么样的消息会变成死信?
-
1️⃣.消息被拒绝(basic.reject或basic.nack)并且requeue=false;
-
2️⃣.消息TTL过期;
-
3️⃣.队列达到最大长度(队列满了,无法再添加数据到mq中).
3. 死信交换机应用场景
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因.
4. 如何使用死信交换机?
定义业务(普通)队列的时候指定参数:
- x-dead-letter-exchange: 用来设置死信后发送的交换机
- x-dead-letter-routing-key: 用来设置死信的routingKey
@Bean
public Queue helloQueue() {//将普通队列绑定到私信交换机上Map<String, Object> args = new HashMap<>(2);args.put(DEAD_LETTER_EXCHANGE_KEY, deadExchangeName);args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);Queue queue = new Queue(queueName, true, false, false, args);return queue;
}
八. 消息确认的实现
创建一个SpringBoot项目,在该项目中创建一个新的模块,项目结构如图:
注意:本案例是把消息发送和消息接收的实现写在了同一个项目模块中了,没有用两个模块实现.
2. 添加相关依赖
在模块的pom.xml中添加如下依赖包.
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
3. 开启生产者消息发送确认机制
# 开启发送确认
spring.rabbitmq.publisher-confirms=true# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
4. 开启消费者消息接收确认机制
# 开启ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
5. 完整配置文件
spring:application:name: rabbitmq-reliabilityrabbitmq:host: localhostport: 5672username: guestpassword: guest# 开启发送确认publisher-confirms: true# 开启发送失败退回publisher-returns: true# 开启ACKlistener:simple:acknowledge-mode: manual
6. 创建RabbitMQ配置类
@Configuration
public class RabbitConfig {
public final static String queueName = "hello_queue";
/*** 死信队列:*/public final static String deadQueueName = "dead_queue";public final static String deadRoutingKey = "dead_routing_key";public final static String deadExchangeName = "dead_exchange";
/*** 死信队列 交换机标识符*/public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";/*** 死信队列交换机绑定键标识符*/public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";@Beanpublic Queue helloQueue() {//将普通队列绑定到私信交换机上Map<String, Object> args = new HashMap<>(2);args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);Queue queue = new Queue(queueName, true, false, false, args);return queue;}/*** 死信队列:*/@Beanpublic Queue deadQueue() {Queue queue = new Queue(deadQueueName, true);return queue;}@Beanpublic DirectExchange deadExchange() {return new DirectExchange(deadExchangeName);}@Beanpublic Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);}
}

注释: hello_queue就配置了死信交换机、死信队列.
7. 生产者发送消息的核心代码
@Component
public class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(String exchange, String routingKey) {String context = "你好现在是 " + new Date();System.out.println("send content = " + context);this.rabbitTemplate.setMandatory(true);this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);this.rabbitTemplate.convertAndSend(exchange, routingKey, context);}/*** 确认生产者是否把消息成功的发送到了交换机;* 确认后回调:* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {System.out.println("send ack fail, cause = " + cause);} else {System.out.println("send ack success");}}/*** 交换机中的消息是否被成功的发送到队列.* 失败后return回调:** @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);}
}
8. 消费者接收消息的核心代码
@Component
@RabbitListener(queues = RabbitConfig.queueName)
public class HelloReceiver {
@RabbitHandlerpublic void process(String hello, Channel channel, Message message) throws IOException {try {Thread.sleep(2000);System.out.println("睡眠2s");} catch (InterruptedException e) {e.printStackTrace();}try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("receiver success = " + hello);} catch (Exception e) {e.printStackTrace();//丢弃这条消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);System.out.println("receiver fail");}}
}
9. 测试生产者消息确认功能
分为4种场景来测试.创建一个Controller,创建几个接口方法进行测试.
9.1 exchange,queue都正确, confirm被回调,ack=true
@RequestMapping("/send1")
@ResponseBody
public String send1() {helloSender.send(null, RabbitConfig.queueName);return "success";
}
9.2 exchange 错误,queue 正确,confirm被回调,ack=false
@RequestMapping("/send2")
@ResponseBody
public String send2() {helloSender.send("fail-exchange", RabbitConfig.queueName);return "success";
}
9.3 exchange 正确,queue错误,confirm被回调,ack=true; return被回调 replyText:NO_ROUTE
@RequestMapping("/send3")
@ResponseBody
public String send3() {helloSender.send(null, "fail-queue");return "success";
}
9.4 exchange 错误,queue 错误, confirm被回调,ack=false
@RequestMapping("/send4")
@ResponseBody
public String send4() {helloSender.send("fail-exchange", "fail-queue");return "success";
}
10. 测试消费者消息确认功能
10.1 当添加这行代码的时候:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
测试结果: 消息被正常消费,消息从队列中删除.
10.2 当注释掉这行代码的时候:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
测试结果: 消息会被重复消费,一直保留在队列当中.
11. 测试死信队列
当执行这行代码的时候:
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
消息会被加入到死信队列中:
九. 拓展
除了我们上面讲的基本可靠性保证外,其实还有很多性能优化方案、可靠性保证方案:集群监控、流控、镜像队列、HAProxy+Keeplived高可靠负载均衡.
这篇关于Day10_08_消息队列之RabbitMQ消息可靠性传输 消息确认机制 死信队列详解及代码实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!