RabbitMQ消息的可靠传输和防止消息丢失

2024-06-17 17:36

本文主要是介绍RabbitMQ消息的可靠传输和防止消息丢失,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在Spring Cloud项目中,为了确保RabbitMQ消息的可靠传输和防止消息丢失,需要考虑以下几个方面:

  1. 消息持久化:确保消息在RabbitMQ中持久化。
  2. 队列持久化:确保队列是持久化的。
  3. 发布确认:使用发布确认机制确保消息发送到RabbitMQ。
  4. 消费者确认:确保消费者正确地确认消息。
  5. 重试机制:在消息消费失败时,设置重试机制。

下面详细介绍如何实现这些措施:

1. 添加依赖

确保在你的pom.xml中添加了Spring Boot和RabbitMQ的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 配置RabbitMQ

application.ymlapplication.properties文件中配置RabbitMQ:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: true

3. 定义配置类

创建一个配置类来配置队列、交换机和绑定:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "myQueue";public static final String EXCHANGE_NAME = "myExchange";public static final String ROUTING_KEY = "myRoutingKey";@Beanpublic Queue myQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic DirectExchange myExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Binding myBinding(Queue myQueue, DirectExchange myExchange) {return BindingBuilder.bind(myQueue).to(myExchange).with(ROUTING_KEY);}
}

4. 配置消息生产者

确保消息生产者配置了发布确认和消息持久化:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 设置发布确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("Message delivered successfully: " + correlationData);} else {System.err.println("Failed to deliver message: " + correlationData + ", cause: " + cause);}}});// 设置消息返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("Returned Message: " + new String(message.getBody()) +", replyCode: " + replyCode + ", replyText: " + replyText +", exchange: " + exchange + ", routingKey: " + routingKey);});}public void sendMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);}
}

5. 配置消息消费者

确保消息消费者配置了消息确认机制:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class MessageConsumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void handleMessage(String message, Channel channel, Message message) throws Exception {try {// 处理消息System.out.println("Received Message: " + message);// 消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 消费失败,重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

6. 启用重试机制

在Spring Cloud Stream中启用重试机制:

spring:cloud:stream:bindings:input:destination: myQueueconsumer:retry:max-attempts: 5backOffPolicy:initialInterval: 1000multiplier: 2.0maxInterval: 10000

7. 测试

测试消息生产和消费,确保消息在各种情况下都不会丢失,包括网络故障、RabbitMQ服务器重启等。

总结

通过以上步骤,你可以在Spring Cloud项目中使用RabbitMQ并确保消息不会丢失。关键在于:

  1. 消息和队列的持久化:确保消息和队列都是持久化的。
  2. 发布确认:启用发布确认回调机制,确保消息被正确地发送到RabbitMQ。
  3. 消费者确认:确保消费者正确地确认消息。
  4. 重试机制:在消费失败时启用重试机制,以确保消息最终能够被成功处理。

通过这些配置,可以显著提高消息传输的可靠性,防止消息丢失。

这篇关于RabbitMQ消息的可靠传输和防止消息丢失的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1070100

相关文章

探索蓝牙协议的奥秘:用ESP32实现高质量蓝牙音频传输

蓝牙(Bluetooth)是一种短距离无线通信技术,广泛应用于各种电子设备之间的数据传输。自1994年由爱立信公司首次提出以来,蓝牙技术已经经历了多个版本的更新和改进。本文将详细介绍蓝牙协议,并通过一个具体的项目——使用ESP32实现蓝牙音频传输,来展示蓝牙协议的实际应用及其优点。 蓝牙协议概述 蓝牙协议栈 蓝牙协议栈是蓝牙技术的核心,定义了蓝牙设备之间如何进行通信。蓝牙协议

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

消息认证码解析

1. 什么是消息认证码         消息认证码(Message Authentication Code)是一种确认完整性并进行认证的技术,取三个单词的首字母,简称为MAC。         消息认证码的输入包括任意长度的消息和一个发送者与接收者之间共享的密钥,它可以输出固定长度的数据,这个数据称为MAC值。         根据任意长度的消息输出固定长度的数据,这一点和单向散列函数很类似

RabbitMQ实践——临时队列

临时队列是一种自动删除队列。当这个队列被创建后,如果没有消费者监听,则会一直存在,还可以不断向其发布消息。但是一旦的消费者开始监听,然后断开监听后,它就会被自动删除。 新建自动删除队列 我们创建一个名字叫queue.auto.delete的临时队列 绑定 我们直接使用默认交换器,所以不用创建新的交换器,也不用建立绑定关系。 实验 发布消息 我们在后台管理页面的默认交换器下向这个队列

rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费

业务描述 由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群):  producer如何实现 producer只需发送消息时调用如下方法即可 /*** 发送有序消息** @param messageMap 消息数据* @param

Spring 集成 RabbitMQ 与其概念,消息持久化,ACK机制

目录 RabbitMQ 概念exchange交换机机制 什么是交换机binding?Direct Exchange交换机Topic Exchange交换机Fanout Exchange交换机Header Exchange交换机RabbitMQ 的 Hello - Demo(springboot实现)RabbitMQ 的 Hello Demo(spring xml实现)RabbitMQ 在生产环境

Spring boot+RabbitMQ环境

消息队列在目前分布式系统下具备非常重要的地位,如下的场景是比较适合消息队列的: 跨系统的调用,异步性质的调用最佳。高并发问题,利用队列串行特点。订阅模式,数据被未知数量的消费者订阅,比如某种数据的变更会影响多个系统的数据,订单数据就是比较好理解的。 之前有一个场景是商品数据在修改后需要推送到elasticsearch中,由于修改产品的并发量以及数据量均不大,所以对于消息未做持久化,而且为了快速

SpringBoot中如何监听两个不同源的RabbitMQ消息队列

spring-boot如何配置监听两个不同的RabbitMQ 由于前段时间在公司开发过程中碰到了一个问题,需要同时监听两个不同的rabbitMq,但是之前没有同时监听两个RabbitMq的情况,因此在同事的帮助下,成功实现了监听多个MQ。下面我给大家一步一步讲解下,也为自己做个笔记; 详细步骤: 1. application.properties 文件配置: u.rabbitmq.ad

msvcp140.dll丢失的解决方法,msvcp140.dll丢失下载办法

一、msvcp140.dll丢失或损坏的影响 系统更新影响 系统更新是导致msvcp140.dll丢失或损坏的常见原因之一。在自动更新过程中,可能会引入与现有应用程序不兼容的DLL版本,从而引发错误。根据用户反馈和技术支持数据,大约15%的msvcp140.dll问题与系统更新有关。 恶意软件攻击 恶意软件通过删除或损坏系统文件来破坏计算机功能,msvcp140.dll是其攻击目标之一。安

TCP 可靠传输的工作原理

转载地址:https://my.oschina.net/xinxingegeya/blog/485233 感谢原作者 TCP 可靠传输的工作原理 ARQ(Automatic Repeat-reQuest)(自动重传请求) 停止等待ARQ协议 连续ARQ协议   停止等待ARQ协议 全双工通信的双发既是发送方也是接收方。下面为了讨论问题的方便,我们仅考虑A发送数据而B接受数据