本文主要是介绍RabbitMQ入门 安装 SpringAMQP简单队列、工作队列、发布订阅(扇出模式,广播模式)、Direct模式(Roting模式)、Topic模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、RabbitMQ介绍
1. MQ介绍
1. 为什么要有MQ
同步调用与异步调用
程序里所有的通信,有两种形式:
同步通信:通信时必须得到结果才会执行下一步
异步通信:通信时不必等待结果,可以直接处理下一步
同步调用
解析:
同步调用的缺点:
业务链长,消耗时间增加,用户体验不好
耦合性强
流量洪峰服务器压力大
同步调用的好处:
时效性强,可以立即得到结果
异步调用
解析:
异步调用的好处:
异步调用,调用链短,用户等待时间短,体验好
降低耦合,服务之间耦合性低了,任何一个服务出现问题,对其它服务的影响都非常小
削峰填谷,中间件Broker具备一定的消息堆积与缓存能力,下游服务可以根据自己的能力慢慢处理
异步调用的坏处:
业务复杂度增加:需要考虑数据一致性问题、消息丢失问题、消息重复消费问题、消息的顺序问题等等
架构复杂度增加:对中间件Broker的依赖性增强了,必须保证Broker的高可用;一旦Broker出错,会对整个系统造成非常大的冲击
2. MQ介绍【面试】
什么是MQ
Message Queue,消息队列
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
MQ的作用
异步:实现服务之间异步通信
削峰:实现流量的削峰填谷
解耦:实现服务之间的耦合性降低
3. 常见的MQ
在使用MQ时:
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
2. RabbitMQ介绍
2.1 AMQP协议
AMQP,Advanced Message Queuing Protocol,高级消息队列,是一种网络协议。它是应用层协议的一个开发标准,为面向消息的中间件而设计。
基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品、不同编程语言的限制。
Publisher:消息发布者
Exchange:交换机。用于分发消息
Routes:路由。消息被分发到目标队列上的过程
Queue:消息队列。用于存储消息数据
Consumer:消息消费者。从队列里取出消息、处理消息
2.2 RabbitMQ
Rabbit公司基于AMQP协议标准,开发了RabbitMQ1.0。RabbitMQ采用Erlang语言开发,Erlang是专门为开发高并发和分布式系统的一种语言,在电信领域广泛使用。
官网地址:Messaging that just works — RabbitMQ
Broker:消息中间件,指的就是RabbitMQ服务器
Virtual Host:虚拟主机。当多个不同用户使用同一个RabbitMQ服务时,可以划分出多个vhost,每个用户在自己的vhost内创建exchange和queue等。
Connection:消息生产者、消费者 与 RabbitMQ之间建立的TCP连接
Channel:Channel作为轻量级的Connection,可以极大的减少建立Connection的开销。
如果每一次访问RabbitMQ都建立一个TCP Connection,将会造成巨大的性能开销,性能非常低下。
Channel是在Connection内部建立的逻辑连接,Channel之间完全隔离。如果应用程序支持多线程,通常每个线程创建独立的Channel进行通讯, AMQP method包含了channel id,帮助客户端和broker识别channel,所以channel之间是完全隔离的
Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point), topic(publish-subscribe),fanout(multicast)
Queue:消息最终被送到这里,等待Consumer取走
Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange的查询表中,用作message的分发依据
3. 小结
同步调用和异步调用
同步调用:调用后必须得到结果,才会执行下一步
好处:时效性强,每一步操作都能够及时得到结果
缺点:
耦合性太强,任何一个环节出错,都可能导致整个调用链出现问题
业务链太长,调用的时间花费比较长,用户体验差
难以处理流量洪峰,如果要增加并发能力,整个调用链所有环节都要增加
异步调用:调用后不必等待结果,可以直接执行下一步
好处:
业务链短了,异步调用只要发出一条消息给broker,就可以执行下一步了,不需要等等消息的消费
坏处:
业务复杂度增加了:需要考虑数据一致性问题,消息丢失的问题、消息重复消费问题、消息的顺序问题
系统架构更复杂了:必须要保证Broker的高可用,否则一旦Broker出错,对整个系统影响太大
常见的MQ有哪些:
RabbitMQ:追求高性能
RocketMQ:追求高稳定
Kafka:追求高吞吐量
AMQP协议:是一种消息通信协议,规定了消息中间件要有以下结构
Publisher:生产者,发布消息的服务是生产者
Consumer:消费者,接收处理消息的服务是消费者
Exchange:交换机,用于分发路由消息,把一条消息分发到对应的队列上
Queue:队列,真正缓存堆积消息的队列,等待消费者进行处理
RabbitMQ:是根据AMQP协议设计出来的消息中间件
Producer:生产者,发送消息的服务
Consumer:消费者,接收消息的服务
Connection:连接对象,无论是生产者还是消费者,都必须和RabbitMQ建立连接才可以通信
Channel:通道,是一种轻量的连接,使用Channel进行通信,更灵活,消耗更小
VirtualHost:虚拟主机,是对Exchange和Queue的逻辑分组,实现不同环境的隔离。不同VirtualHost之间是互相隔离的
Exchange:交换机
Queue:队列
二、RabbitMQ安装
1. 拉取RabbitMQ镜像
1. 拉取RabbitMQ镜像
方式一:在线拉取镜像
docker pull rabbitmq:3.8-management
方式二:从本地加载镜像【我们采用这种方式】
把资料中的《mq.tar》上传到虚拟机CentOS里
在CentOS里执行命令加载镜像:
#先切换到mq.tar所在的目录#再执行命令加载镜像 docker load -i mq.tar#加载后,查看一下镜像。找一下有没有rabbitmq这个镜像 docker images
2. 安装RabbitMQ
执行下面的命令来运行MQ容器:
docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \--restart=always \rabbitmq:3-management
Java程序连接RabbitMQ:使用端口5672
RabbitMQ控制台: http://ip:15672, 登录帐号:itcast, 密码:123321
3. RabbitMQ控制台
打开浏览器输入地址:http://ip:15672, 登录帐号:itcast, 密码:123321
在控制台里,可以查看、管理 交换机、队列等等
三、RabbitMQ入门
目标
使用RabbitMQ的简单模式,发送消息、接收消息
实现
RabbitMQ的简单模式,是所有消息模式里最简单、最基础的一种。如下图所示:
P:Producer或者Publisher,消息的生产者(或者叫 消息发布者)
C:Consumer,消息的消费者
queue:消息队列,图中红色部分即是
我们使用这种模式,实现入门案例:
生产者把消息投递到消息队列里
消费者从消息队列里获取消息
1. 创建工程
创建project,作为父工程使用
删除src文件夹
修改pom.xml添加依赖坐标
<dependencies><!-- RabbitMQ的客户端依赖包 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency> </dependencies>
2. 消息生产者
在工程里创建Module:
demo01-producer
编写测试类
public class Demo01ProducerSimple {public static void main(String[] args) throws IOException, TimeoutException {//1. 连接RabbitMQ服务器ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.137");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");//2. 获取通道ChannelConnection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 声明队列String queueName = "test_simple.queue";// 参数1:队列名称// 参数2:是否 持久化队列(持久化队列:重启RabbitMQ服务后,队列仍然存在)// 参数3:是否 独占队列(第一个消费者独占队列)// 参数4:是否 自动删除队列(自动删除队列:当所有消费者都断开连接后,队列自动删除)// 参数5:队列的附加参数channel.queueDeclare(queueName, true, false, false, null);//4. 发送消息:简单模式,交换机设置为空,路由key设置为队列名称// 参数1:交换机名称。简单模式时,设置为空字符串// 参数2:路由key。简单模式时,设置为队列名称// 参数3:消息的附加参数// 参数4:消息内容channel.basicPublish("", queueName, null, "这是一条简单消息".getBytes());//5. 释放资源,关闭连接channel.close();connection.close();}
3. 消费消费者
在工程里创建Module:
demo01-consumer
编写测试类
注意:消费者的代码要放到main方法里,不要写到单元测试方法内
public class Demo01ConsumerSimple {public static void main(String[] args) throws IOException, TimeoutException {//1. 连接RabbitMQ服务器ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.137");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");//2. 获取连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 声明消息队列(如果消息队列不存在,则创建消息队列;如果已存在,则不创建)String queueName = "test_simple.queue";channel.queueDeclare(queueName, true, false, false, null);//4. 监听消息// 参数1:队列名称。要从哪个队列里获取消息// 参数2:是否自动ack。当成功消费消息之后,会自动给RabbitMQ返回一个标识。RabbitMQ收到后,就知道消息已经被成功消费了,不会重新推送这条消息// 参数3:处理消息的回调 channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息:" + new String(body));}});//等待接收消息。不要关闭Channel通道和Connection连接System.out.println("等待消息中……");}}
4. 测试
运行生产者,发送消息
启动消费者,监听消息
可以看到消费者已经收到消息了
四、SpringAMQP
1. 准备代码环境
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:Spring AMQP
AMQP全称:Advanced Message Queuing Protocol
AMQP翻译:高级消息队列协议
是一个:进程间传递异步消息的网络协议。
SpringAMQP提供了三个功能:
自动声明队列、交换机及其绑定关系
基于注解的监听器模式,异步接收消息
封装了RabbitTemplate工具,用于发送消息
为了演示SpringAMQP的功能,我们需要先准备代码环境,步骤如下:
创建project,删除其src目录,然后添加依赖
创建生产者模块
创建消费者模块
1.1 创建project
创建maven类型的project,不选择骨架,直接设置工程坐标然后下一步
创建后,删除src目录
修改pom.xml添加依赖坐标如下:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version> </parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target> </properties><dependencies><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> </dependencies>
1.2 创建生产者模块
在project里创建module,起名称为
demo-producer
依赖
不需要添加依赖,父工程里已经导入了依赖,子模块直接继承父工程里的依赖就足够了
配置
修改配置文件application.yaml
spring:application:name: demo-producerrabbitmq:host: 192.168.200.137 #RabbitMQ服务的ipport: 5672 #RabbitMQ服务的端口username: itcast #RabbitMQ的帐号password: 123321 #RabbitMQ的密码
引导类
创建引导类,没有什么特殊的要求
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class DemoProducerApplication {public static void main(String[] args) {SpringApplication.run(DemoProducerApplication.class, args);} }
1.3 创建消费者模块
在project里创建module,起名称为
demo-consumer
依赖
不需要添加依赖,父工程里已经导入了依赖,子模块直接继承父工程里的依赖就足够了
配置
修改配置文件application.yaml
spring:application:name: demo-producerrabbitmq:host: 192.168.200.137 #RabbitMQ服务的ipport: 5672 #RabbitMQ服务的端口username: itcast #RabbitMQ的帐号password: 123321 #RabbitMQ的密码
引导类
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class DemoConsumerApplication {public static void main(String[] args) {SpringApplication.run(DemoConsumerApplication.class, args);} }
2. RabbitMQ工作模式
RabbitMQ提供了6种工作模式,参考:RabbitMQ Tutorials — RabbitMQ
basic queue:简单模式
work queues:工作队列集群消费
Publish/Subscribe:发布订阅模式,也称为Fanout,是一种消息广播模式
Routing:路由模式,也称为Direct模式
Topics:主题模式
RPC远程调用模式:远程调用,其实算不上MQ,这里不做介绍
无论哪种模式,可能都需要用到交换机、队列、交换机与队列的绑定。如何声明这些队列和交换机?
在控制台页面上直接创建。不经常用
使用@Bean的方式,声明交换机、队列、绑定关系
使用注解方式,可以在消费者一方直接声明交换机、队列、绑定关系,适合于复杂的绑定关系声明
3. basic queue简单队列
3.1 模式说明
basic queue:基本 队列
basic queue是RabbitMQ中最简单的一种队列模式:生产者把消息直接发送到队列queue,消费者从queue里直接接收消息。
解析:
3.2 使用示例
生产者
当生产者向某一队列发送消息时,必须先声明队列;否则消息会发送失败。
我们使用@Bean的方式声明队列
声明队列
创建一个配置类
Demo01SimpleConfig
,类上添加@Configuration
注解类里定义一上方法,返回值为
Queue
,在方法里定义队列。方法上加注解@Bean
/*** 生产者-->simple.queue-->消费者* 需要:声明队列simple.queue*/ @Configuration public class Demo02SimpleConfig {/*** 声明 一个队列,队列名称为simple.queue* 注意:Queue类是org.springframework.amqp.core.Queue,不要导错了*/@Beanpublic Queue simpleQueue(){return QueueBuilder.durable("producer.simple.queue").build();} }
发送消息
@SpringBootTest public class Demo02SimpleTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//参数1:队列名称//参数2:消息内容rabbitTemplate.convertAndSend("producer.simple.queue","生产者,制造中...");} }
消费者
创建一个类,用于监听消息。类上需要添加@Component注解
类里定义一个方法,用于处理消息。
方法上需要添加注解
@RabbitListener(queues = "队列名称")
方法上需要添加一个
String
类型的形参:是接收到的消息内容
/***监听器 消费者 接收者 收听广播*/ @Slf4j @Component public class Demo02SimpleListener {@RabbitListener(queues = "producer.simple.queue")public void handleSimpleQueueMsg(String msg){log.info("从{}队列里接收到消息:{}","producer.simple.queue",msg);} }
4. work queues工作队列
假如只有一个消费者处理消息,那么处理消息的速度就有可能赶不上发送消息的速度。该如何同时处理更多的消息呢?
可以在同一个队列上创建多个竞争的消费者,以便消费者可以同时处理更多的消息
4.1 模式说明
多个消费者相互竞争,从同一个队列里获取消息。生产者发送的消息将被所有消费者分摊
消费
注意: 一个队列里的一条消息,只能被消费一次,不可能多个消费者同时消费处理
对于任务过重,或者任务较多的情况,使用工作队列可以提高任务处理的速度
例如:短信通知服务。 订单完成后要发短信通知
4.2 示例代码
生产者
声明队列
创建配置类
Demo02WorkQueueConfig
里使用@Bean
声明队列/*** ↗消费者1* 生产者-->work.queue* ↘消费者2* 需要:声明队列:work.queue*/ @Configuration public class Demo03WorkQueuesConfig {/*** 声明 一个队列,队列名称是 work.queue* 注意:Queue类是org.springframework.amqp.core.Queue,不要导错了 */@Beanpublic Queue workQueue(){return QueueBuilder.durable("producer.work.queue").build();} }
发送消息
创建测试类,添加测试方法,发送消息:
@SpringBootTest public class Demo02WorkQueueTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("producer.work.queue", "这是work队列消息" + i);}} }
测试:
消费者
@Slf4j @Component public class Demo03WorkQueueListener {@RabbitListener(queues = "producer.work.queue")public void handleWorkQueueMsg1(String msg){log.info("消费者1从{}队列里接收到消息{}","producer.work.queue",msg);}@RabbitListener(queues = "producer.work.queue")public void handleWorkQueueMsg2(String msg){log.info("消费者2从{}队列里接收到消息{}","producer.work.queue",msg);} }
测试:
4.3 注意事项
在WorkQueues模式的默认情况下,一个队列里的所有消息,将平均分配给每个消费者。
这种情况并没有考虑到消费者的实际处理能力,显然是有问题的。
例如:生产者发送了50条消息,有两个消费者,各接收到了25条消息。假如
消费者1,每秒能处理100条消息。 很快就能处理完消息
消费者2,每秒能处理10条消息。 消息堆积越来越多
要解决这个问题其实非常简单:让每个消费者一次性只拉取1条消息
修改消费者的配置文件application.yaml:
spring:rabbitmq:listener:simple:prefetch: 1 #消费者一次抓取几条消息
5. Publish/Subscribe发布订阅(Fanout)
在上一章节中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只传递给一个消费者。在这一节中,我们将做一些完全不同的事情——我们将向多个消费者传递一条消息。这种模式称为“发布/订阅”。
5.1 模式说明
P:生产者,用于发送消息。但是生产者要把消息发给交换机(X)
X:Exchange交换机
接收消息,接收生产者发送的消息
处理消息,把消息投递给某个或某些队列,或者把消息丢弃。具体会如何操作,由Exchange类型决定:
Fanout:广播,把消息交给绑定的所有队列(交换机绑定了哪些队列,就把消息投递给这些队列)
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给符合routing pattern的队列
注意:交换机只负责转发消息,不具备存储消息的能力。所以如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,消息将会丢失
Queue:消息队列,接收消息、缓存消息
C:消费者,等待消息、处理消息
5.2 示例代码
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:
生产者程序将发出日志消息
消费者程序将接收日志消息
第一组消费者,接收到日志消息并保存到磁盘上
第二组消费者,接收到日志消息并打印到控制台
生产者
声明队列
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** ↗ producer.fanout.queue1---消费者1* 生产者-->fanout.exchange* ↘ producer.fanout.queue1---消费者2** 需要:* 1. 声明Fanout类型的交换机:fanout.exchange* 2. 声明队列1:producer.fanout.queue1* 声明队列2:producer.fanout.queue2* 3. 把交换机与队列1绑定* 把交换机与队列2绑定*/@Configuration public class Demo04FanoutConfig {@Beanpublic Queue fanoutQueue1() {return QueueBuilder.durable("producer.fanout.queue1").build();}@Beanpublic Queue fanoutQueue2() {return QueueBuilder.durable("producer.fanout.queue2").build();}@Beanpublic FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("producer.fanout.exchange").build();}/*** 注意形参的名称。Spring在注入对象时,首先根据形参类型,从容器里查找对象。现在根据类型找到多个,根据形参名称byName注入* @param fanoutQueue1* @param fanoutExchange* @return*/@Beanpublic Binding fanoutQueue1Biding(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);} }
发送消息
@SpringBootTest public class Demo04FanoutTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {//参数1:交换机名称//参数2:路由key。fanout类型的交换机不需要使用路由key,把这个值写成空的//参数3:消息内容rabbitTemplate.convertAndSend("producer.fanout.exchange", "", "是一条Fanout广播消息");} }
消费者
@Slf4j @Component public class Demo04Fanoutlistener {@RabbitListener(queues = "producer.fanout.queue1")public void handleFanoutQueue1Msg(String msg) {log.info("消费者1从{}队列里收到消息:{}","producer.fanout.queue1",msg);}@RabbitListener(queues = "producer.fanout.queue2")public void handleFanoutQueue2Msg(String msg) {log.info("消费者2从{}队列里收到消息:{}","producer.fanout.queue2",msg);}}
测试:
6. Direct(Routing)
在上一个章节中,我们构建了一个简单的日志系统。我们能够向许多消费者广播日志消息。
在本节中,我们将向其添加一个功能:我们将使消费者能够仅订阅消息的子集。例如:
只能将关键错误消息定向到日志文件(以节省磁盘空间)
同时仍然能够在控制台上打印所有日志消息。
6.1 模式说明
队列在绑定交换机时,需要给队列指定一个Routing Key(路由key)
生产者在发送消息时,必须指定消息的Routing Key
交换机根据消息的RoutingKey进行判断:只有队列的RoutingKey 与 消息的RoutingKey完全相同,才会收到消息
6.2 示例代码
生产者
声明队列
/*** ↗ error-->direct.error.queue-->消费者1* 生产者-->direct.exchange* ↘ info---->direct.all.queue---->消费者2* ↘ error--->direct.all.queue---->消费者2* 需要:* 1. 声明Direct类型的交换机:direct.exchange* 2. 声明队列1:direct.error.queue,用于接收错误日志信息* 声明队列2:direct.all.queue, 用于接收所有日志信息* 3. 把交换机与队列1绑定:把 RoutingKey为error的消息,投递到队列1* 把交换机与队列2绑定:把 RoutingKey为error的消息,投递到队列2* 把交换机与队列2绑定:把 RoutingKey为info的消息, 投递到队列2*/ @Configuration public class Demo05DirectConfig {@Beanpublic Queue directQueue1() {return QueueBuilder.durable("producer.direct.queue1").build();}@Beanpublic Queue directQueue2() {return QueueBuilder.durable("producer.direct.queue2").build();}@Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange("producer.direct.exchange").build();}@Beanpublic Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("error");}@Beanpublic Binding directQueue2BindingInfo(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("info");}@Beanpublic Binding direcQueue2BindingError(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("error");} }
发消息:
@SpringBootTest public class Demo05DirectTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//rabbitTemplate.convertAndSend("producer.direct.exchange","error","这是一条错误日志");rabbitTemplate.convertAndSend("producer.direct.exchange","info","这是一条正常日志");} }
消费者
@Slf4j @Component public class Demo05DirectListener {@RabbitListener(queues = "producer.direct.queue1")public void handleDirectQueue1(String msg) {log.info("消费者1从{}队列里收到消息:{}", "producer.direct.queue1", msg);}@RabbitListener(queues = "producer.direct.queue2")public void handleDirectQueue2(String msg) {log.info("消费者2从{}队列里收到消息:{}", "producer.direct.queue2", msg);} }
测试:
7. Topic【重点】
在上一个章节中,我们改进了日志系统。我们没有使用仅能进行消息广播的
FANOUT
,而是使用了DIRECT
,实现了了有选择地接收日志。虽然使用
DIRECT
改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由,例如:
第一组消费者,要接收所有系统的所有日志消息,打印到控制台
第二组消费者,要接收所有系统的错误日志消息,和订单系统的所有日志消息,保存到磁盘
为了在日志系统中实现这一点,我们需要了解更复杂的
TOPIC
交换机。7.1 模式说明
RoutingKey:发送到
TOPIC
的消息不能有任意的routing键,它:
必须是由点分隔的单词列表
可以有任意多个单词,最多255个字节
可使用
*
星号,匹配一个单词可使用
#
,匹配0个或多个单词使用特定RoutingKey发送的消息,将被传递到使用匹配Key绑定的所有队列。
7.2 使用示例
生产者
/*** ↗#.error--->topic.queue1-->消费者1* 生产者-->topic.exchange* ↘order.*--->topic.queue2-->消费者2* 需要:* 1. 声明Topic类型的交换机:topic.exchange* 2. 声明队列1:topic.queue1* 声明队列2:topic.queue2* 3. 把交换机与队列1绑定:将RoutingKey为#.error的消息,投递到topic.queue1* 把交换机与队列2绑定:将RoutingKey为order.*的消息,投递到topic.queue2*/ @Configuration public class Demo05TopicConfig {@Beanpublic Queue topicQueue1(){return QueueBuilder.durable("topic.queue1").build();}@Beanpublic Queue topicQueue2(){return QueueBuilder.durable("topic.queue2").build();}@Beanpublic TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("topic.exchange").build();}@Beanpublic Binding queue1Binding(TopicExchange topicExchange, Queue topicQueue1){return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#.error");}@Beanpublic Binding queue2Binding(TopicExchange topicExchange, Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("order.*");} }
发送消息
@SpringBootTest public class Demo05TopicTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//路由为order.*的消息,投递到topic.queue2,将由消费者2接收rabbitTemplate.convertAndSend("topic.exchange", "order.create", "创建订单1");//路由为order.*的消息,投递到topic.queue2,将由消费者2接收rabbitTemplate.convertAndSend("topic.exchange", "order.cancel", "取消订单1");//路由为#.error的消息,投递到topic.queue1,将由消费者1接收//路由为order.*的消息,投递到topic.queue2,将由消费者2接收rabbitTemplate.convertAndSend("topic.exchange", "order.error", "订单1取消失败");} }
消费者
@Slf4j @Component public class Demo05TopicListener {@RabbitListener(queues = "topic.queue1")public void handleTopicQueue1Msg(String msg) {log.info("消费者1从{}接收到消息:{}", "topic.queue1", msg);}@RabbitListener(queues = "topic.queue2")public void handleTopicQueue2Msg(String msg) {log.info("消费者2从{}接收到消息:{}", "topic.queue1", msg);} }
8. 小结
1. 简单模式的使用:
生产者
声明队列:在一个配置类里,使用@Bean方式,声明了一个Queue对象
发送消息:使用RabbitTemplate对象的convertAndSend("队列名称", "消息内容");
消费者
在任意bean对象里增加一个方法
@RabbitListener(queues="队列名称")
public void 方法名(String msg){
//msg的值,就是接收到的消息内容
}
2. 工作队列模式:
主要解决的问题:单个消费者,处理消息的能力有限;增加消费者的数量,可以提升消费能力
生产者
声明队列:在一个配置类里,使用@Bean方式,声明了一个Queue对象
发送消息:使用RabbitTemplate对象的convertAndSend("队列名称", "消息内容");消费者
在任意bean对象里增加多个方法,每个方法都是一个消费者,可以处理消息
@RabbitListener(queues="队列名称")
public void 方法名(String msg){
//msg的值,就是接收到的消息内容
}
为了避免不同消费者,消费能力不均衡,导致:某些消费者很快处理完了,其它消费者处理的慢导致消息堆积
不采用平均分摊消息的方式(默认)
让消费者每次拉取1条消息,处理完再拉取:谁的性能强,谁就能拉取、处理更多的消息
4. Direct模式(Routing模式)
主要是用于:一条消息,可以自主决定发送到哪个队列上
生产者:
声明队列
声明交换机:DirectExchange
声明交换机与队列的绑定关系:每个绑定关系,都需要设置一个RoutingKey标识
发送消息时,用的方法是:
rabbitTemplate.convertAndSend("交换机名称", "消息标识", "消息内容")
消费者:
创建多个方法,每个方法监听不同的队列
5. Topic模式:
主要是用于:更灵活的消息分发。可以根据消息的RoutingKey,与交换机绑定的通配符进行对比
哪个队列的通配符跟消息RoutingKey匹配上了,就把消息发送到哪个队列上
生产者:
声明队列
声明交换机:TopicExchange
声明交换机与队列的绑定关系:每个绑定关系,都设置一个RoutingKey通配符
使用.分隔开的多个英文单词,最多255个字节
*表示一个单词
#表示0个或多个单词
发送消息时,使用的方法是:
rabbitTemplate.convertAndSend("交换机名称","消息标识","消息内容");
消费者:
创建多个方法,每个方法监听不同的队列
五、其它内容
这篇关于RabbitMQ入门 安装 SpringAMQP简单队列、工作队列、发布订阅(扇出模式,广播模式)、Direct模式(Roting模式)、Topic模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!