本文主要是介绍微服务架构之使用RabbitMQ进行异步处理方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发...
一.什么是RabbitMQ?
RabbitMQ 是一种流行的消息队列(Message Queue)实现,基于 AMQP 协议(Advanced Message Queuing Protocol)。它支持异步通信,使多个系统之间以非阻塞的方式交换数据。
在我们使用微服务的时候,微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于 OpenFeign 的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯。
如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。
二.异步调用处理逻辑:
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
除此之外还有:
- Exchange(交换机):用于路由消息。RabbitMQ 有多种交换机类型(如 direct、topic、fanout、headers),它们决定了消息如何被传递到队列。
- Binding(绑定):连接交换机和队列的规则。
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。这样,发送消息的人和接收消息的人就完全解耦了。
异步调用的优势包括:
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
当然,异步通信也并非完美无缺,它存在下列缺点:
- 完全依赖于Broker的可靠性、安全性和性能
- 架构复杂,后期维护和调试麻烦
三.RabbitMQ的基本使用
下面是RabbitMQ的官网:https://www.rabbitmq.com/
1.安装
首先将RabbitMQ的镜像拉取下来,然后运行下面命令:
docker run \ -e RABBITMQ_DEFAULT_USER=hmall \ -e RABBITMQ_DEFAULT_PASS=123 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network hm-net\ -d \ rabbitmq:3.8-management
在 docker run
命令中,您将容器的两个端口暴露到主机上:
-p 15672:15672
:将容器内部的15672
端口映射到主机的15672
端口。这个端口通常用于 RabbitMQ 的 Web 管理控制台(Management Console)。-p 5672:5672
:将容器内部的5672
端口映射到主机的5672
端口。这个端口用于 AMQP 协议,即客户端与 RabbitMQ 进行消息传输的端口。
随后我们访问http://虚拟机IP地址:15672来打开RabbitMQ的控制台,默认账号密码是 guest
/ guest。
在控制台上主要可以关注三个信息:Exchanges(交换机),Queues(队列),Admin(用户管理)。
2.架构图
其中包含几个概念:
publisher
:生产者,也就是发送消息的一方consumer
:消费者,也就是消费消息的一方queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
RabbitMQ 使用 AMQP 协议,核心的消息模型包括:
- Producer 将消息发送到 Exchange。
- Exchange 将消息路由到 Queue。
- Consumer 从 Queue 获取消息进行处理。
RabbitMQ 消息生命周期:
- 消息发布:Producer 将消息发送到 RabbitMQ。
- 消息存储:消息通过交换机路由到一个或多个队列,队列暂存这些消息。
- 消息消费:Consumer 从队列中获取并处理消息。
3.RabbitMQ控制台的使用
在 RabbitMQ 中,交换机(Exchange) 和 队列(Queue) 是核心概念。它们之间的关系决定了消息的路由和存储方式。
(1)Exchanges 交换机
- 交换机是 消息的路由器,负责决定消息应该被发送到哪个队列。
- 生产者将消息发送给交换机,而不是直接发送到队列。
- 交换机根据路由规则 决定消息的走向(即发往哪些队列)。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
RabbitMQ 提供了四种常用的交换机类型,每种类型的路由规则不同:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机。
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符,可以进行模糊匹配。
- Headers:头匹配,基于MQ的 header 信息来路由消息,用的较少。
我们可以再这里创建交换机,Name表示创建的交换机的名字,Type表示可以选择交换机的四种类型。创建成功后就可以在上面看到创建的交换机名字:
比如我们点击amq.fanout查看交换机数据并且可以发送消息给消费者。
注意!!!如果我们不将交换机指定队列的话,由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力。
所以下面我们要先创建队列,然后让生产者推送的消息经过交换机的传递后,到达消息队列,然后再给消费者。所以生产者无需知道队列的存在以此来达到解耦的效果。
(2)Queues 队列
- 队列用于 存储消息,直到消费者消费它们。
- 队列与消费者一一对应,即一个消费者从一个队列读取消息。
- 队列按 FIFO(First In, First Out)的顺序存储消息。
交换机与队列的关系:
交换机(Exchange)是消息的路由器,它决定了将消息发送到哪个队列。队列(Queue)是消息的存储和处理地方。交换机本身并不知道具体的队列,它只是通过绑定(Binding)来决定消息应该被路由到哪些队列。
交换机将消息通过路由键(Rouwww.chinasem.cnting Key)发送到绑定的队列,但交换机和队列之间的连接并不是自动的,需要显式地设置绑定。绑定指定了 交换机 和 队列 之间的关系,以及 路由规则(例如,路由键匹配的规则)。
在这里我们填写队列名字即可,其他暂时可以不用填写。
随后我们向交换机进行绑定(bind)队列,随后通过队列传输给消费者。
这里的Routing key的出现是为了让 Direct (交换机的类型)能够选择队列而存在的。
我们在绑定队列完成后会出现下面这样,这样证明我们成功为交换机绑定好两个队列:
随后我们在下面窗口推送消息:
(3)Admin
①Users(用户管理):
管理 RabbitMQ 中的用户账号,在这里 添加、删除用户,并设置每个用户的权限。
每个用户可分配不同的 角色:
- administrator:管理员,具有所有权限。
- monitoring:可以监控和查看信息,但不能管理。
- policymaker:可以设置策略和参数。
- management:可以访问管理界面但没有策略权限。
Name
:itheima
,也就是用户名Tags
:administrator
,说明itheima
用户是超级管理员,拥有所有权限Can Access virtual host
:/
,可以访问的virtual host
,这里的/
是默认的virtual host
②Virtual Hosts(虚拟主机):
将 RabbitMQ 服务器划分为多个 虚拟主机(vhost),类似于一个独立的命名空间。
- 不同的应用可以使用不同的虚拟主机,彼此隔离。
- 每个虚拟主机都有自己的 交换机、队列和用户权限。
四.SpringAMOP的使用
Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 提供的一个消息队列集成模块,主要用于简化与 RabbitMQ 的集成。它通过 AMQP 协议来实现消息的生产和消费。
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
1.导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.添加配置
在publisher以及consumer
服务的application.yml
中添加配置:
spring: rabbitmq: host: 192.168.150.101 # 你的虚拟机IP port: 5672 # 端口 virtual-host: /hmall # 虚拟主机 username: hmall # 用户名 password: 123 # 密码 listener: simple: prefetch: 1 # (能者多劳)每次只能获取一条消息,处理完成才能获取下一个消息
5672
端口用于 AMQP 协议,通常用于客户端与 RabbitMQ 进行消息传输。,所以这里port绑定的是5672。
在 RabbitMQ 中,guest
用户默认只能在 本地 连接到 RabbitMQ 实例。如果你希望使用 guest
用户进行远程连接(即从非本地机器连接 RabbitMQ),RabbitMQ 默认是 不允许 的。
3.在publisher服务中利用RabbitTemplate实现消息发送
RabbitTemplate 解释:
RabbitTemplate
是 Spring AMQP 的核心实现类,它实现了 AmqpTemplate
接口,并且对 AmqpTemplate
提供了一些更高层次的封装,简化了消息发送和接收的操作。
RabbitTemplate
是大多数 Spring AMQP 用户的首选,它提供了很多便捷的方法和默认的行为,使得消息交互变得更加简单。
主要方法:
convertAndSend
: 与AmqpTemplate
相同,提供了消息转换并发送的功能。receiveAndConvert
: 从队列中接收消息并转换成 Java 对象。send
: 发送消息,不进行转换。receive
: 从队列中接收消息。
RabbitTemplate
为发送消息提供了更丰富的功能,如消息转换器、默认交换机支持等,通常适用于大多数使用 Spring 的场景。
(1) 发送消息(convertAndSend)
rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);
exchange
:指定交换机的名称。routingKey
:消息的路由键。message
:消息体,可以是任何对象,RabbitTemplate
会自动将其转换为消息格式(例如,jsON、文本等)。
此方法的作用是将消息发送到指定交换机,并且根据给定的路由键路由到相关队列。
实例:
rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付成功的消息");
这里消息 "支付成功的消息"
被发送到 pay.topic
交换机,使用 pay.success
路由键路由到对应的队列。
(2)接收消息(receiveAndConvert)
public <T> T receiveAndConvert(String queueName);
queueName
:消息队列的名称。- 返回类型为
<T>
,表示接收到的消息会被转换为指定的类型。
receiveAndConvert
会从指定的队列接收消息,并自动将消息转换成目标对象类型。如果消息体不是可转换的,方法将抛出异常。
String message = (String) rabbitTemplate.receiveAndConvert("pay.success.queue"); System.out.println("收到的消息: " + message);
这里,pay.success.queue
队列中的消息将被接收,并自动转换为 String
类型。
还有一种就是 receive 方法:
public Message receive(String queueName);
queueName
:队列名称。- 返回的是
Message
类型,而不是直接转换成对象。你可以通过Message
对象获取消息的内容和其他属性。
实例:
Message message = rabbitTemplate.receive("pay.success.queue"); if (message != null) { System.out.println("收到的消息: " + new String(message.getBody())); }
(3)发送消息并获取响应(convertSendAndReceivejs)
public <T> T convertSendAndReceive(String exchange, String routingKey, Object message);
exchange
:交换机名称。routingKey
:路由键。message
:发送的消息对象。
convertSendAndReceive
方法既发送消息到交换机,也等待从队列返回响应。它会根据指定的路由键将消息发送到交换机,并等待响应消息,然后将响应转换为返回类型。
实例:
String response = (String) rabbitTemplate.convertSendAndReceive("pay.topic", "pay.success", "请求处理"); System.out.println("收到响应: " + response);
(4)发送消息到特定队列
public void send(String exchange, String routingKey, Message message);
exchange
:交换机名称。routingKey
:路由键。message
:消息对象(可以是Message
类型)。
send
方法允许你将一个 Message
对象发送到指定的交换机,并根据路由键进行路由。
Message message = new Message("支付成功".getBytes()); rabbitTemplate.send("pay.topic", "pay.success", message);
(5)设置消息监听器
RabbitTemplate
本身并不直接处理消息监听(接收消息),但是可以通过设置 RabbitListener
来监听消息,并将其与 RabbitTemplate
配合使用。一般来说,消息接收和处理是在 @RabbitListener
注解的监听器方法中完成的。
@RabbitListener(queues = "pay.success.queue") public void receiveMessage(String message) { System.out.println("接收到消息: " + message); }
下面看一个代码实例:
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } }
4.定义消费者实现异步调用
@Component @RequiredArgsConstructor public class PayStatusListener { private final IOrderService orderService; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "trade.pay.success.queue", durable = "true"), exchange = @Exchange(name = "pay.topic"), key = "pay.success" )) public void listenPaySuccess(Long orderId){ //调用方法 orderService.markOrderPaySuccess(orderId); } }
①@RequiredArgsConstructor
这是 Lombok 提供的注解,自动为类中所有 final
修饰的字段生成一个包含这些字段的构造函数。
使用这个注解可以免去手动编写构造函数的麻烦,尤其是在使用依赖注入时(例如注入 IOrderService
)。
②@RabbitListener
@RabbitListener
注解用于监听来自 RabbitMQ 队列的消息。
它会自动监听指定的队列,当有消息到达时,会触发 listenPaySuccess
方法进行处理。
③QueueBinding(队列绑定)
通过 @QueueBinding
注解,绑定了 队列 和 交换机,并指定了 路由键。
@Queue
name = "trade.pay.success.queue"
:指定队列的名称为trade.pay.success.queue
。durable = "true"
:表示队列是 持久化 的,即 RabbitMQ 重启后队列依然存在。- 队列的作用:队列是消息的临时存储地,消费者会从队列中拉取消息并处理。
@Exchange
name = "pay.topic"
:指定交换机的名称为pay.topic
,这是一个 Topic Exchange(主题交换机)。- 交换机的作用:交换机决定消息如何路由到队列。Topic Exchange 可以根据路由键的匹配规则将消息路由到合适的队列。
key = "pay.success"
- 路由键:指定了路由键为
pay.success
。这意味着当生产者发送的消息路由键是pay.success
时,消息将被路由到trade.pay.success.queue
队列。
5.总流程处理过程
- 生产者:在支付成功后,生产者会发送一条消息到
pay.topic
交换机,消息的路由键为pay.success
。 - 交换机:
pay.topic
交换机会根据路由键pay.success
将消息路由到trade.pay.success.queue
队列。 - 消费者:
PayStatusListener
作为消费者监听trade.pay.success.queue
,当有消息到达队列时,它会接收到订单 ID 并调用订单服务更新订单状态。
五.使用配置类管理定义交换机,队列及两者关系
在 Spring AMQP 中,交换机(Exchange)、队列(Queue)、以及绑定(Binding)可以通过配置类来定义和管理。配置类可以帮助你灵活地创建和绑定交换机与队列,并且可以根据业务需求自定义各种参数。
创建配置类效果展示:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 创建队列 @Bean public Queue queue() { return new Queue("trade.pay.success.queue", true); // durable=true 表示队列持久化 } // 创建交换机 @Bean public TopicExchange exchange() { return new TopicExchange("pay.topic"); // 创建主题交换机 } // 创建绑定关系(队列与交换机通过 routing key 绑定) @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由键是 pay.success } }
1.创建队列:Queue
@Bean public Queue queue() { return new Queue("trade.pay.success.queue", true); }
作用:通过 @Bean
注解定义了一个队列 Bean。Spring 容器会自动管理这个队列,并在 RabbitMQ 上创建该队列。
参数:
"trade.pay.success.queue"
:这是队列的名称。每个队列在 RabbitMQ 中必须有唯一的名称。true
:表示这个队列是 持久化 的。持久化的队列在 RabbitMQ 服务重启后依然存在。- 队列还可以设置其他属性,例如是否自动删除(
autoDelete
)、是否排他性(exclusive
)等,默认是false
。
2.创建交换机:Exchange
@Bean public TopicExchange exchange() { return new TopicExchange("pay.topic"); }
- 作用:通过
@Bean
注解定义了一个 Topic Exchange 类型的交换机。 - 参数:
"pay.topic"
:这是交换机的名称。同样,交换机在 RabbitMQ 中也必须有唯一的名称。TopicExchange
支持通过通配符(如*
和#
)进行更灵活的消息匹配。
Topic Exchange 是一种交换机类型,它允许使用通配符来进行路由。例如,路由键可以是 "pay.*"
,可以匹配 "pay.success"
或 "pay.failure"
。在这里可以使用四种交换机类型来定义交换机,具体场景具体分析使用。
3.创建绑定关系:Binding
@Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("pay.success"); }
作用:通过 @Bean
注解定义了队列和交换机的绑定关系。这个绑定决定了消息在何种条件下会从交换机路由到队列。
参数:
queue
:队列trade.pay.success.queue
被传入,表示将该队列与交换机绑定。exchange
:交换机pay.topic
被传入,表示将队列与该交换机绑定。.with("pay.success")
:指定了路由键pay.success
,路由键pay.success
会告诉交换机,如果生产者发送的消息带有这个路由键,那么该消息就会路由到trade.pay.success.queue
队列。
4.多个队列绑定到同一个交换机
我们可以将多个队列绑定到同一个交换机,并使用不同的路由键。这样可以实现根据不同的路由键来发送不同类型的消息到各自的队列。
@Bean public Queue paySuccessQueue() { return new Queue("pay.success.queue", true); } @Bean public Queue payFailureQueue() { return new Queue("pay.failure.queue", true); } @Bean public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) { return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success"); } @Bean public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) { return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure"); }
- 在这个例子中,
pay.success.queue
和pay.failure.queue
都绑定到同一个交换机pay.topic
,但使用不同的路由键。 - 消息路由逻辑:
- 当生产者发送路由键为
pay.success
的消息时,消息会路由到pay.success.queue
队列。 - 当生产者发送路由键为
pay.failure
的消息时,消息会路由到pay.failure.queue
队列。
- 当生产者发送路由键为
BindingBuilder.bind(paySuccessQueue)
:paySuccessQueue
参数就是绑定到交换机的队列对象,也就是pay.success.queue
。它会被传递给BindingBuilder
,并且指定此队列和交换机的绑定关系。.to(exchange)
:这里将队列和交换机绑定,指定消息通过交换机路由到队列。.with("pay.success")
:指定路由键为pay.success
,意思是只有路由键为pay.success
的消息才会被路由到paySuccessQueue
队列中。
在 Spring AMQP 中,队列对象(如 paySuccessQueue
)是由 @Bean
注解的 Queue
类型方法返回的。Spring 会自动将返回的队列对象放入到 Spring 容器中,并注入到需要它的地方。所以当我们在 Binding
中使用 paySuccessQueue
时,实际上是在引用之前构造并注册到 Spring 容器中的队列实例。
在 Spring 的上下文中,paySuccessQueue
和 payFailureQueue
是已经被创建并管理的队列对象,我们不需要手动创建队列,只需要在 Binding
中通过引用这些对象来建立队列与交换机之间的关系。
5.配置不同类型的交换机
除了 Topic Exchange,RabbitMQ 还支持其他几种常见的交换机类型。这里分别演示如何创建 Direct Exchange、Fanout Exchange 和 Headers Exchange。
(1)Direct Exchange
@Bean public DirectExchange directExchange() { return new DirectExchange("direct.exchange"); } @Bean public Binding directBinding(Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key"); }
Direct Exchange:直接交换机会根据 完全匹配的路由键 将消息发送到队列。只有当消息的路由键和绑定的路由键 完全一致 时,消息才会被路由到指定队列。
(2)Fanout Exchange
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.exchange"); } @Bean public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); // 不需要路由键 }
Fanout Exchange:扇出交换机会将消息发送到所有绑定的队列,不需要考虑路由键。这个交换机通常用于广播消息。
(3)Headers Exchange
@Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.exchange"); } @Bean public Binding headersBinding(Queue queue, HeadersExchange headersExchange) { return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value"); }
Headers Exchange:头交换机根据消息头的内容进行路由,而不是依赖路由键。适用于按消息的元数据进行路由的场景。
总结:
通过 Spring AMQP 的配置类,你可以非常灵活地定义 RabbitMQ 的 交换机、队列 和 绑定关系,并通过不同的路由键和交换机类型实现复杂的消息路由逻辑。
以下是一些关键要点:
- 队列(Queue):消息的临时存储地,可以是持久化的。
- 交换机(Exchange):控制消息如何分发到不同的队列。
- Direct Exchange:严格匹配路由键。
- Topic Exchange:支持通配符匹配路由键。
- Fanout Exchange:广播消息到所有绑定的队列。
- Headers Exchange:根据消息头的内容进行路由。
- 绑定(Binding):将队列与交换机连接起来,使用路由键来决定消息的流向。
通过配置类来定义这些组件,能够简化 RabbitMQ 与 Spring 应用的集成,并且通过灵活的路由规则支持复杂的消息传递需求。
六.在Springboot项目中使用RabbitMQ解决高并发问题
在 Spring Boot 项目中使用 RabbitMQ 来实现消息传输,处理并发问题是非常常见的一种方式。RabbitMQ 可以通过解耦应用程序的不同部分,并将任务分发给多个消费者,从而有效地解决并发和负载均衡问题。
1.引入依赖
首先,需要在 pom.XML
文件中引入 spring-boot-starter-amqp
依赖,这样 Spring Boot 就可以与 RabbitMQ 集成:
<dependencies> http://www.chinasem.cn <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
2.配置RabbitMQ
在 application.yml
或 application.properties
文件中,配置 RabbitMQ 的连接信息:
spring: rabbitmq: host: localhost port: 15672 username: hmall password: 123 virtual-host: / listener: simple: concurrency: 3 # 设置消费者的最小数javascript量 max-concurrency: 10 # 设置消费者的最大数量
3.RabbitMQ配置类
创建一个配置类,用于声明队列、交换机、绑定等。
package com.example.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 声明队列 @Bean public Queue taskQueue() { return new Queue("taskQueue", true); // true表示队列持久化 } // 声明交换机 @Bean public TopicExchange topicExchange() { return new TopicExchange("taskExchange"); } // 绑定队列和交换机 @Bean public Binding binding(Queue taskQueue, TopicExchange topicExchange) { return new Binding("taskQueue", Binding.DestinationType.QUEUE, "taskExchange", "task.#", // 使用路由键 null); } }
4.创建消息生产者(Producer)
创建一个消息生产者(Producer),它将消息发送到 RabbitMQ 队列。
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TaskProducer { @Autowired private RabbitTemplate rabbitTemplate; // 发送消息 public void sendMessage(String message) { System.out.println("Sending message: " + message); rabbitTemplate.convertAndSend("taskExchange", "task.new", message); // 发送到交换机,使用路由键 } }
5.创建消息消费者(Consumer)
消费者从 RabbitMQ 队列中异步接收消息,并进行并发处理。在消费者类中,使用 @RabbitListener
注解监听队列,确保多个消费者可以同时处理消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TaskConsumer { // 监听队列并异步处理消息 @RabbitListener(queues = "taskQueue") public void receiveMessage(String message) { System.out.println("Received message: " + message); // 模拟处理消息的业务逻辑 try { Thread.sleep(1000); // 模拟耗时操作 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Finished processing message: " + message); } }
6.增加并发消费者(多个消费者)
为了处理高并发,我们可以通过设置 concurrency
和 max-concurrency
来控制消费者的最小和最大并发数。这样,多个消费者可以同时处理来自队列的消息。我们已经在 application.yml
中配置了 concurrency
,现在的配置允许最多启动 20 个消费者来处理消息。
- 最小并发数:设置为 5,意味着在任何时候都会至少启动 5 个消费者。
- 最大并发数:设置为 20,表示 RabbitMQ 可以根据负载增加最多 20 个消费者。
总结
这篇关于微服务架构之使用RabbitMQ进行异步处理方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!