RabbitMQ消息的可靠传输和防止消息丢失

2024-06-17 17:36

本文主要是介绍RabbitMQ消息的可靠传输和防止消息丢失,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在Spring Cloud项目中,为了确保RabbitMQ消息的可靠传输和防止消息丢失,需要考虑以下几个方面:

  1. 消息持久化:确保消息在RabbitMQ中持久化。
  2. 队列持久化:确保队列是持久化的。
  3. 发布确认:使用发布确认机制确保消息发送到RabbitMQ。
  4. 消费者确认:确保消费者正确地确认消息。
  5. 重试机制:在消息消费失败时,设置重试机制。

下面详细介绍如何实现这些措施:

1. 添加依赖

确保在你的pom.xml中添加了Spring Boot和RabbitMQ的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 配置RabbitMQ

application.ymlapplication.properties文件中配置RabbitMQ:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: true

3. 定义配置类

创建一个配置类来配置队列、交换机和绑定:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "myQueue";public static final String EXCHANGE_NAME = "myExchange";public static final String ROUTING_KEY = "myRoutingKey";@Beanpublic Queue myQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic DirectExchange myExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Binding myBinding(Queue myQueue, DirectExchange myExchange) {return BindingBuilder.bind(myQueue).to(myExchange).with(ROUTING_KEY);}
}

4. 配置消息生产者

确保消息生产者配置了发布确认和消息持久化:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 设置发布确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("Message delivered successfully: " + correlationData);} else {System.err.println("Failed to deliver message: " + correlationData + ", cause: " + cause);}}});// 设置消息返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("Returned Message: " + new String(message.getBody()) +", replyCode: " + replyCode + ", replyText: " + replyText +", exchange: " + exchange + ", routingKey: " + routingKey);});}public void sendMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);}
}

5. 配置消息消费者

确保消息消费者配置了消息确认机制:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class MessageConsumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void handleMessage(String message, Channel channel, Message message) throws Exception {try {// 处理消息System.out.println("Received Message: " + message);// 消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 消费失败,重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

6. 启用重试机制

在Spring Cloud Stream中启用重试机制:

spring:cloud:stream:bindings:input:destination: myQueueconsumer:retry:max-attempts: 5backOffPolicy:initialInterval: 1000multiplier: 2.0maxInterval: 10000

7. 测试

测试消息生产和消费,确保消息在各种情况下都不会丢失,包括网络故障、RabbitMQ服务器重启等。

总结

通过以上步骤,你可以在Spring Cloud项目中使用RabbitMQ并确保消息不会丢失。关键在于:

  1. 消息和队列的持久化:确保消息和队列都是持久化的。
  2. 发布确认:启用发布确认回调机制,确保消息被正确地发送到RabbitMQ。
  3. 消费者确认:确保消费者正确地确认消息。
  4. 重试机制:在消费失败时启用重试机制,以确保消息最终能够被成功处理。

通过这些配置,可以显著提高消息传输的可靠性,防止消息丢失。

这篇关于RabbitMQ消息的可靠传输和防止消息丢失的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1070100

相关文章

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

C# 防止按钮botton重复“点击”的方法

在使用C#的按钮控件的时候,经常我们想如果出现了多次点击的时候只让其在执行的时候只响应一次。这个时候很多人可能会想到使用Enable=false, 但是实际情况是还是会被多次触发,因为C#采用的是消息队列机制,这个时候我们只需要在Enable = true 之前加一句 Application.DoEvents();就能达到防止重复点击的问题。 private void btnGenerateSh

PHP防止SQL注入详解及防范

SQL 注入是PHP应用中最常见的漏洞之一。事实上令人惊奇的是,开发者要同时犯两个错误才会引发一个SQL注入漏洞。 一个是没有对输入的数据进行过滤(过滤输入),还有一个是没有对发送到数据库的数据进行转义(转义输出)。这两个重要的步骤缺一不可,需要同时加以特别关注以减少程序错误。 对于攻击者来说,进行SQL注入攻击需要思考和试验,对数据库方案进行有根有据的推理非常有必要(当然假设攻击者看不到你的

PHP防止SQL注入的方法(2)

如果用户输入的是直接插入到一个SQL语句中的查询,应用程序会很容易受到SQL注入,例如下面的例子: $unsafe_variable = $_POST['user_input'];mysql_query("INSERT INTO table (column) VALUES ('" . $unsafe_variable . "')"); 这是因为用户可以输入类似VALUE”); DROP TA

PHP防止SQL注入的方法(1)

(1)mysql_real_escape_string – 转义 SQL 语句中使用的字符串中的特殊字符,并考虑到连接的当前字符集 使用方法如下: $sql = "select count(*) as ctr from users where username ='".mysql_real_escape_string($username)."' and password='". mysql_r

防止缓存击穿、缓存穿透和缓存雪崩

使用Redis缓存防止缓存击穿、缓存穿透和缓存雪崩 在高并发系统中,缓存击穿、缓存穿透和缓存雪崩是三种常见的缓存问题。本文将介绍如何使用Redis、分布式锁和布隆过滤器有效解决这些问题,并且会通过Java代码详细说明实现的思路和原因。 1. 背景 缓存穿透:指的是大量请求缓存中不存在且数据库中也不存在的数据,导致大量请求直接打到数据库上,形成数据库压力。 缓存击穿:指的是某个热点数据在

起点中文网防止网页调试的代码展示

起点中文网对爬虫非常敏感。如图,想在页面启用调试后会显示“已在调试程序中暂停”。 选择停用断点并继续运行后会造成cpu占用率升高电脑卡顿。 经简单分析网站使用了js代码用于防止调试并在强制继续运行后造成电脑卡顿,代码如下: function A(A, B) {if (null != B && "undefined" != typeof Symbol && B[Symbol.hasInstan

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队