RabbitMQ入门 安装 SpringAMQP简单队列、工作队列、发布订阅(扇出模式,广播模式)、Direct模式(Roting模式)、Topic模式

本文主要是介绍RabbitMQ入门 安装 SpringAMQP简单队列、工作队列、发布订阅(扇出模式,广播模式)、Direct模式(Roting模式)、Topic模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、RabbitMQ介绍

1. MQ介绍

1. 为什么要有MQ

同步调用与异步调用

程序里所有的通信,有两种形式:

  • 同步通信:通信时必须得到结果才会执行下一步

  • 异步通信:通信时不必等待结果,可以直接处理下一步

同步调用

解析:

同步调用的缺点

  • 业务链长,消耗时间增加,用户体验不好

  • 耦合性强

  • 流量洪峰服务器压力大

同步调用的好处:

  • 时效性强,可以立即得到结果

异步调用

解析:

异步调用的好处:

  • 异步调用,调用链短,用户等待时间短,体验好

  • 降低耦合,服务之间耦合性低了,任何一个服务出现问题,对其它服务的影响都非常小

  • 削峰填谷,中间件Broker具备一定的消息堆积与缓存能力,下游服务可以根据自己的能力慢慢处理

异步调用的坏处:

  • 业务复杂度增加:需要考虑数据一致性问题、消息丢失问题、消息重复消费问题、消息的顺序问题等等

  • 架构复杂度增加:对中间件Broker的依赖性增强了,必须保证Broker的高可用;一旦Broker出错,会对整个系统造成非常大的冲击

2. MQ介绍【面试】

什么是MQ

Message Queue,消息队列

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

MQ的作用

  • 异步:实现服务之间异步通信

  • 削峰:实现流量的削峰填谷

  • 解耦:实现服务之间的耦合性降低

3. 常见的MQ

 在使用MQ时:

  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

  • 追求可靠性:RabbitMQ、RocketMQ

  • 追求吞吐能力:RocketMQ、Kafka

  • 追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ介绍

2.1 AMQP协议

AMQP,Advanced Message Queuing Protocol,高级消息队列,是一种网络协议。它是应用层协议的一个开发标准,为面向消息的中间件而设计。

基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品、不同编程语言的限制。

  • Publisher:消息发布者

  • Exchange:交换机。用于分发消息

  • Routes:路由。消息被分发到目标队列上的过程

  • Queue:消息队列。用于存储消息数据

  • Consumer:消息消费者。从队列里取出消息、处理消息

2.2 RabbitMQ

Rabbit公司基于AMQP协议标准,开发了RabbitMQ1.0。RabbitMQ采用Erlang语言开发,Erlang是专门为开发高并发和分布式系统的一种语言,在电信领域广泛使用。

官网地址:Messaging that just works — RabbitMQ

 

  • Broker:消息中间件,指的就是RabbitMQ服务器

  • Virtual Host:虚拟主机。当多个不同用户使用同一个RabbitMQ服务时,可以划分出多个vhost,每个用户在自己的vhost内创建exchange和queue等。

  • Connection:消息生产者、消费者 与 RabbitMQ之间建立的TCP连接

  • Channel:Channel作为轻量级的Connection,可以极大的减少建立Connection的开销。

    如果每一次访问RabbitMQ都建立一个TCP Connection,将会造成巨大的性能开销,性能非常低下。

    Channel是在Connection内部建立的逻辑连接,Channel之间完全隔离。如果应用程序支持多线程,通常每个线程创建独立的Channel进行通讯, AMQP method包含了channel id,帮助客户端和broker识别channel,所以channel之间是完全隔离的

  • Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point), topic(publish-subscribe),fanout(multicast)

  • Queue:消息最终被送到这里,等待Consumer取走

  • Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange的查询表中,用作message的分发依据

3. 小结

同步调用和异步调用
    同步调用:调用后必须得到结果,才会执行下一步
                     好处:

                              时效性强,每一步操作都能够及时得到结果
                     缺点:
                             耦合性太强,任何一个环节出错,都可能导致整个调用链出现问题
                             业务链太长,调用的时间花费比较长,用户体验差
                             难以处理流量洪峰,如果要增加并发能力,整个调用链所有环节都要增加
    异步调用:调用后不必等待结果,可以直接执行下一步
                     好处:
                            业务链短了,异步调用只要发出一条消息给broker,就可以执行下一步了,

                            不需要等等消息的消费

                     坏处:
                            业务复杂度增加了:需要考虑数据一致性问题,消息丢失的问题、消息重

                                                            复消费问题、消息的顺序问题
                            系统架构更复杂了:必须要保证Broker的高可用,否则一旦Broker出错,

                                                             对整个系统影响太大


常见的MQ有哪些:
    RabbitMQ:追求高性能
    RocketMQ:追求高稳定
    Kafka:追求高吞吐量
AMQP协议:是一种消息通信协议,规定了消息中间件要有以下结构
    Publisher:生产者,发布消息的服务是生产者
    Consumer:消费者,接收处理消息的服务是消费者
    Exchange:交换机,用于分发路由消息,把一条消息分发到对应的队列上
    Queue:队列,真正缓存堆积消息的队列,等待消费者进行处理
RabbitMQ:是根据AMQP协议设计出来的消息中间件
    Producer:生产者,发送消息的服务
    Consumer:消费者,接收消息的服务
    Connection:连接对象,无论是生产者还是消费者,都必须和RabbitMQ建立连接才可以通信
    Channel:通道,是一种轻量的连接,使用Channel进行通信,更灵活,消耗更小
    VirtualHost:虚拟主机,是对Exchange和Queue的逻辑分组,实现不同环境的隔离。不同VirtualHost之间是互相隔离的
    Exchange:交换机
    Queue:队列

二、RabbitMQ安装

1. 拉取RabbitMQ镜像

1. 拉取RabbitMQ镜像

  • 方式一:在线拉取镜像

    docker pull rabbitmq:3.8-management

    方式二:从本地加载镜像【我们采用这种方式】

    把资料中的《mq.tar》上传到虚拟机CentOS里

    在CentOS里执行命令加载镜像:

  • #先切换到mq.tar所在的目录#再执行命令加载镜像
    docker load -i mq.tar#加载后,查看一下镜像。找一下有没有rabbitmq这个镜像
    docker images

2. 安装RabbitMQ

执行下面的命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \--restart=always \rabbitmq:3-management
  • Java程序连接RabbitMQ:使用端口5672

  • RabbitMQ控制台: http://ip:15672, 登录帐号:itcast, 密码:123321

3. RabbitMQ控制台

打开浏览器输入地址:http://ip:15672, 登录帐号:itcast, 密码:123321

在控制台里,可以查看、管理 交换机、队列等等

三、RabbitMQ入门

目标

使用RabbitMQ的简单模式,发送消息、接收消息

实现

RabbitMQ的简单模式,是所有消息模式里最简单、最基础的一种。如下图所示:

  • P:Producer或者Publisher,消息的生产者(或者叫 消息发布者)

  • C:Consumer,消息的消费者

  • queue:消息队列,图中红色部分即是

我们使用这种模式,实现入门案例:

  • 生产者把消息投递到消息队列里

  • 消费者从消息队列里获取消息

1. 创建工程

  • 创建project,作为父工程使用

  • 删除src文件夹

  • 修改pom.xml添加依赖坐标

<dependencies><!-- RabbitMQ的客户端依赖包 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>
</dependencies>

2. 消息生产者

  • 在工程里创建Module:demo01-producer

  • 编写测试类

public class Demo01ProducerSimple {public static void main(String[] args) throws IOException, TimeoutException {//1. 连接RabbitMQ服务器ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.137");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");//2. 获取通道ChannelConnection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 声明队列String queueName = "test_simple.queue";//  参数1:队列名称//  参数2:是否 持久化队列(持久化队列:重启RabbitMQ服务后,队列仍然存在)//  参数3:是否 独占队列(第一个消费者独占队列)//  参数4:是否 自动删除队列(自动删除队列:当所有消费者都断开连接后,队列自动删除)//  参数5:队列的附加参数channel.queueDeclare(queueName, true, false, false, null);//4. 发送消息:简单模式,交换机设置为空,路由key设置为队列名称//  参数1:交换机名称。简单模式时,设置为空字符串//  参数2:路由key。简单模式时,设置为队列名称//  参数3:消息的附加参数//  参数4:消息内容channel.basicPublish("", queueName, null, "这是一条简单消息".getBytes());//5. 释放资源,关闭连接channel.close();connection.close();}

3. 消费消费者

  • 在工程里创建Module:demo01-consumer

  • 编写测试类

    注意:消费者的代码要放到main方法里,不要写到单元测试方法内

public class Demo01ConsumerSimple {public static void main(String[] args) throws IOException, TimeoutException {//1. 连接RabbitMQ服务器ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.137");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");//2. 获取连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 声明消息队列(如果消息队列不存在,则创建消息队列;如果已存在,则不创建)String queueName = "test_simple.queue";channel.queueDeclare(queueName, true, false, false, null);//4. 监听消息//  参数1:队列名称。要从哪个队列里获取消息//  参数2:是否自动ack。当成功消费消息之后,会自动给RabbitMQ返回一个标识。RabbitMQ收到后,就知道消息已经被成功消费了,不会重新推送这条消息//  参数3:处理消息的回调        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息:" + new String(body));}});//等待接收消息。不要关闭Channel通道和Connection连接System.out.println("等待消息中……");}}

4. 测试

  1. 运行生产者,发送消息

  2. 启动消费者,监听消息

  3. 可以看到消费者已经收到消息了

四、SpringAMQP

1. 准备代码环境

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:Spring AMQP

AMQP全称:Advanced Message Queuing Protocol

AMQP翻译:高级消息队列协议

是一个:进程间传递异步消息的网络协议

SpringAMQP提供了三个功能

  • 自动声明队列交换机及其绑定关系

  • 基于注解监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

为了演示SpringAMQP的功能,我们需要先准备代码环境,步骤如下:

  1. 创建project,删除其src目录,然后添加依赖

  2. 创建生产者模块

  3. 创建消费者模块

1.1 创建project

创建maven类型的project,不选择骨架,直接设置工程坐标然后下一步

创建后,删除src目录

修改pom.xml添加依赖坐标如下:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version>
</parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target>
</properties><dependencies><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

1.2 创建生产者模块

在project里创建module,起名称为demo-producer

依赖

不需要添加依赖,父工程里已经导入了依赖,子模块直接继承父工程里的依赖就足够了

配置

修改配置文件application.yaml

spring:application:name: demo-producerrabbitmq:host: 192.168.200.137 #RabbitMQ服务的ipport: 5672            #RabbitMQ服务的端口username: itcast	  #RabbitMQ的帐号password: 123321	  #RabbitMQ的密码

引导类

创建引导类,没有什么特殊的要求

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DemoProducerApplication {public static void main(String[] args) {SpringApplication.run(DemoProducerApplication.class, args);}
}

1.3 创建消费者模块

在project里创建module,起名称为demo-consumer

依赖

不需要添加依赖,父工程里已经导入了依赖,子模块直接继承父工程里的依赖就足够了

配置

修改配置文件application.yaml

spring:application:name: demo-producerrabbitmq:host: 192.168.200.137 #RabbitMQ服务的ipport: 5672            #RabbitMQ服务的端口username: itcast	  #RabbitMQ的帐号password: 123321	  #RabbitMQ的密码

引导类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DemoConsumerApplication {public static void main(String[] args) {SpringApplication.run(DemoConsumerApplication.class, args);}
}

2. RabbitMQ工作模式

RabbitMQ提供了6种工作模式,参考:RabbitMQ Tutorials — RabbitMQ

  • basic queue:简单模式

  • work queues:工作队列集群消费

  • Publish/Subscribe:发布订阅模式,也称为Fanout,是一种消息广播模式

  • Routing:路由模式,也称为Direct模式

  • Topics:主题模式

  • RPC远程调用模式:远程调用,其实算不上MQ,这里不做介绍

无论哪种模式,可能都需要用到交换机、队列、交换机与队列的绑定。如何声明这些队列和交换机?

  • 在控制台页面上直接创建。不经常用

  • 使用@Bean的方式,声明交换机、队列、绑定关系

  • 使用注解方式,可以在消费者一方直接声明交换机、队列、绑定关系,适合于复杂的绑定关系声明

3. basic queue简单队列 

3.1 模式说明

basic queue:基本 队列

basic queue是RabbitMQ中最简单的一种队列模式:生产者把消息直接发送到队列queue消费者从queue里直接接收消息

 解析:

3.2 使用示例

生产者

生产者向某一队列发送消息时必须先声明队列;否则消息会发送失败。

我们使用@Bean的方式声明队列

声明队列

  1. 创建一个配置类Demo01SimpleConfig,类上添加@Configuration注解

  2. 类里定义一上方法,返回值为Queue,在方法里定义队列。方法上加注解@Bean

  3. /*** 生产者-->simple.queue-->消费者* 需要:声明队列simple.queue*/
    @Configuration
    public class Demo02SimpleConfig {/*** 声明 一个队列,队列名称为simple.queue* 注意:Queue类是org.springframework.amqp.core.Queue,不要导错了*/@Beanpublic Queue simpleQueue(){return QueueBuilder.durable("producer.simple.queue").build();}
    }

    发送消息


@SpringBootTest
public class Demo02SimpleTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//参数1:队列名称//参数2:消息内容rabbitTemplate.convertAndSend("producer.simple.queue","生产者,制造中...");}
}

消费者

  1. 创建一个类,用于监听消息。类上需要添加@Component注解

  2. 类里定义一个方法,用于处理消息。

    方法上需要添加注解 @RabbitListener(queues = "队列名称")

    方法上需要添加一个String类型的形参:是接收到的消息内容

    

/***监听器  消费者 接收者 收听广播*/
@Slf4j
@Component
public class Demo02SimpleListener {@RabbitListener(queues = "producer.simple.queue")public void handleSimpleQueueMsg(String msg){log.info("从{}队列里接收到消息:{}","producer.simple.queue",msg);}
}

4. work queues工作队列

假如只有一个消费者处理消息,那么处理消息的速度就有可能赶不上发送消息的速度。该如何同时处理更多的消息呢?

可以在同一个队列创建多个竞争的消费者,以便消费者可以同时处理更多的消息

4.1 模式说明

多个消费者相互竞争从同一个队列里获取消息生产者发送的消息将被所有消费者分摊

消费

注意: 一个队列里一条消息只能被消费一次不可能多个消费者同时消费处理

           对于任务过重,或者任务较多的情况,使用工作队列可以提高任务处理的速度

例如:短信通知服务 订单完成后要发短信通知

4.2 示例代码

生产者

声明队列

创建配置类Demo02WorkQueueConfig里使用@Bean声明队列

/***                  ↗消费者1* 生产者-->work.queue*                  ↘消费者2* 需要:声明队列:work.queue*/
@Configuration
public class Demo03WorkQueuesConfig {/*** 声明 一个队列,队列名称是 work.queue* 注意:Queue类是org.springframework.amqp.core.Queue,不要导错了
*/@Beanpublic Queue workQueue(){return QueueBuilder.durable("producer.work.queue").build();}
}

发送消息

创建测试类,添加测试方法,发送消息:

@SpringBootTest
public class Demo02WorkQueueTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("producer.work.queue", "这是work队列消息" + i);}}
}

测试:

消费者

@Slf4j
@Component
public class Demo03WorkQueueListener {@RabbitListener(queues = "producer.work.queue")public void handleWorkQueueMsg1(String msg){log.info("消费者1从{}队列里接收到消息{}","producer.work.queue",msg);}@RabbitListener(queues = "producer.work.queue")public void handleWorkQueueMsg2(String msg){log.info("消费者2从{}队列里接收到消息{}","producer.work.queue",msg);}
}

测试:

4.3 注意事项

在WorkQueues模式的默认情况下,一个队列里的所有消息,将平均分配给每个消费者

这种情况并没有考虑到消费者的实际处理能力,显然是有问题的。

例如:生产者发送了50条消息,有两个消费者,各接收到了25条消息。假如

  • 消费者1,每秒能处理100条消息。 很快就能处理完消息

  • 消费者2,每秒能处理10条消息。 消息堆积越来越多

解决这个问题其实非常简单:让每个消费者一次性只拉取1条消息

修改消费者的配置文件application.yaml:

spring:rabbitmq:listener:simple:prefetch: 1 #消费者一次抓取几条消息

5. Publish/Subscribe发布订阅(Fanout)

在上一章节中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只传递给一个消费者。在这一节中,我们将做一些完全不同的事情——我们将向多个消费者传递一条消息。这种模式称为“发布/订阅”

5.1 模式说明

                

  • P:生产者,用于发送消息。但是生产者要把消息发给交换机(X)

  • X:Exchange交换机

    • 接收消息,接收生产者发送的消息

    • 处理消息,把消息投递给某个或某些队列,或者把消息丢弃。具体会如何操作,由Exchange类型决定:

      • Fanout:广播,把消息交给绑定的所有队列(交换机绑定了哪些队列,就把消息投递给这些队列)

      • Direct:定向,把消息交给符合指定routing key的队列

      • Topic:通配符,把消息交给符合routing pattern的队列

    • 注意:交换机只负责转发消息,不具备存储消息的能力。所以如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,消息将会丢失

  • Queue:消息队列,接收消息、缓存消息

  • C:消费者,等待消息、处理消息

5.2 示例代码

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:

  • 生产者程序将发出日志消息

  • 消费者程序将接收日志消息

    • 第一组消费者,接收到日志消息并保存到磁盘上

    • 第二组消费者,接收到日志消息并打印到控制台

生产者

声明队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/***                       ↗ producer.fanout.queue1---消费者1* 生产者-->fanout.exchange*                       ↘ producer.fanout.queue1---消费者2** 需要:*      1. 声明Fanout类型的交换机:fanout.exchange*      2. 声明队列1:producer.fanout.queue1*         声明队列2:producer.fanout.queue2*      3. 把交换机与队列1绑定*         把交换机与队列2绑定*/@Configuration
public class Demo04FanoutConfig {@Beanpublic Queue fanoutQueue1() {return QueueBuilder.durable("producer.fanout.queue1").build();}@Beanpublic Queue fanoutQueue2() {return QueueBuilder.durable("producer.fanout.queue2").build();}@Beanpublic FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("producer.fanout.exchange").build();}/*** 注意形参的名称。Spring在注入对象时,首先根据形参类型,从容器里查找对象。现在根据类型找到多个,根据形参名称byName注入* @param fanoutQueue1* @param fanoutExchange* @return*/@Beanpublic Binding fanoutQueue1Biding(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

        发送消息

@SpringBootTest
public class Demo04FanoutTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {//参数1:交换机名称//参数2:路由key。fanout类型的交换机不需要使用路由key,把这个值写成空的//参数3:消息内容rabbitTemplate.convertAndSend("producer.fanout.exchange", "", "是一条Fanout广播消息");}
}

消费者

@Slf4j
@Component
public class Demo04Fanoutlistener {@RabbitListener(queues = "producer.fanout.queue1")public void handleFanoutQueue1Msg(String msg) {log.info("消费者1从{}队列里收到消息:{}","producer.fanout.queue1",msg);}@RabbitListener(queues = "producer.fanout.queue2")public void handleFanoutQueue2Msg(String msg) {log.info("消费者2从{}队列里收到消息:{}","producer.fanout.queue2",msg);}}

测试:

6. Direct(Routing)

在上一个章节中,我们构建了一个简单的日志系统。我们能够向许多消费者广播日志消息。

在本节中,我们将向其添加一个功能:我们将使消费者能够仅订阅消息的子集。例如:

  • 只能将关键错误消息定向到日志文件(以节省磁盘空间)

  • 同时仍然能够在控制台上打印所有日志消息。

6.1 模式说明

  • 队列在绑定交换机时,需要给队列指定一个Routing Key(路由key)

  • 生产者在发送消息时,必须指定消息的Routing Key

  • 交换机根据消息的RoutingKey进行判断:只有队列的RoutingKey 与 消息的RoutingKey完全相同,才会收到消息

6.2 示例代码

生产者

声明队列

/***                      ↗ error-->direct.error.queue-->消费者1* 生产者-->direct.exchange*                      ↘ info---->direct.all.queue---->消费者2*                      ↘ error--->direct.all.queue---->消费者2* 需要:*      1. 声明Direct类型的交换机:direct.exchange*      2. 声明队列1:direct.error.queue,用于接收错误日志信息*         声明队列2:direct.all.queue,  用于接收所有日志信息*      3. 把交换机与队列1绑定:把 RoutingKey为error的消息,投递到队列1*         把交换机与队列2绑定:把 RoutingKey为error的消息,投递到队列2*         把交换机与队列2绑定:把 RoutingKey为info的消息, 投递到队列2*/
@Configuration
public class Demo05DirectConfig {@Beanpublic Queue directQueue1() {return QueueBuilder.durable("producer.direct.queue1").build();}@Beanpublic Queue directQueue2() {return QueueBuilder.durable("producer.direct.queue2").build();}@Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange("producer.direct.exchange").build();}@Beanpublic Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("error");}@Beanpublic Binding directQueue2BindingInfo(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("info");}@Beanpublic Binding direcQueue2BindingError(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("error");}
}

发消息:

@SpringBootTest
public class Demo05DirectTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//rabbitTemplate.convertAndSend("producer.direct.exchange","error","这是一条错误日志");rabbitTemplate.convertAndSend("producer.direct.exchange","info","这是一条正常日志");}
}

消费者

@Slf4j
@Component
public class Demo05DirectListener {@RabbitListener(queues = "producer.direct.queue1")public void handleDirectQueue1(String msg) {log.info("消费者1从{}队列里收到消息:{}", "producer.direct.queue1", msg);}@RabbitListener(queues = "producer.direct.queue2")public void handleDirectQueue2(String msg) {log.info("消费者2从{}队列里收到消息:{}", "producer.direct.queue2", msg);}
}

测试:

7. Topic【重点】

在上一个章节中,我们改进了日志系统。我们没有使用仅能进行消息广播的FANOUT,而是使用了DIRECT,实现了了有选择地接收日志。

虽然使用DIRECT改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由,例如:

  • 第一组消费者,要接收所有系统的所有日志消息,打印到控制台

  • 第二组消费者,要接收所有系统的错误日志消息,和订单系统的所有日志消息,保存到磁盘

为了在日志系统中实现这一点,我们需要了解更复杂的TOPIC交换机。

7.1 模式说明

​​​​​​

  • RoutingKey:发送到TOPIC的消息不能有任意的routing键,它:

    • 必须是由点分隔的单词列表

    • 可以有任意多个单词,最多255个字节

    • 可使用* 星号,匹配一个单词

    • 可使用#,匹配0个或多个单词

  • 使用特定RoutingKey发送的消息,将被传递到使用匹配Key绑定的所有队列。

7.2 使用示例

生产者

/***                      ↗#.error--->topic.queue1-->消费者1* 生产者-->topic.exchange*                      ↘order.*--->topic.queue2-->消费者2* 需要:*      1. 声明Topic类型的交换机:topic.exchange*      2. 声明队列1:topic.queue1*         声明队列2:topic.queue2*      3. 把交换机与队列1绑定:将RoutingKey为#.error的消息,投递到topic.queue1*         把交换机与队列2绑定:将RoutingKey为order.*的消息,投递到topic.queue2*/
@Configuration
public class Demo05TopicConfig {@Beanpublic Queue topicQueue1(){return QueueBuilder.durable("topic.queue1").build();}@Beanpublic Queue topicQueue2(){return QueueBuilder.durable("topic.queue2").build();}@Beanpublic TopicExchange topicExchange(){return ExchangeBuilder.topicExchange("topic.exchange").build();}@Beanpublic Binding queue1Binding(TopicExchange topicExchange, Queue topicQueue1){return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#.error");}@Beanpublic Binding queue2Binding(TopicExchange topicExchange, Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("order.*");}
}

发送消息

@SpringBootTest
public class Demo05TopicTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//路由为order.*的消息,投递到topic.queue2,将由消费者2接收rabbitTemplate.convertAndSend("topic.exchange", "order.create", "创建订单1");//路由为order.*的消息,投递到topic.queue2,将由消费者2接收rabbitTemplate.convertAndSend("topic.exchange", "order.cancel", "取消订单1");//路由为#.error的消息,投递到topic.queue1,将由消费者1接收//路由为order.*的消息,投递到topic.queue2,将由消费者2接收rabbitTemplate.convertAndSend("topic.exchange", "order.error", "订单1取消失败");}
}

消费者

@Slf4j
@Component
public class Demo05TopicListener {@RabbitListener(queues = "topic.queue1")public void handleTopicQueue1Msg(String msg) {log.info("消费者1从{}接收到消息:{}", "topic.queue1", msg);}@RabbitListener(queues = "topic.queue2")public void handleTopicQueue2Msg(String msg) {log.info("消费者2从{}接收到消息:{}", "topic.queue1", msg);}
}

8. 小结

1. 简单模式的使用:
  
 生产者
        声明队列:在一个配置类里,使用@Bean方式,声明了一个Queue对象
        发送消息:使用RabbitTemplate对象的convertAndSend("队列名称", "消息内容");
    
消费者
        在任意bean对象里增加一个方法
        @RabbitListener(queues="队列名称")
        public void 方法名(String msg){
            //msg的值,就是接收到的消息内容
        }


2. 工作队列模式:
       主要解决的问题单个消费者处理消息的能力有限

                                   增加消费者的数量,可以提升消费能力

             
生产者
        声明队列:在一个配置类里,使用@Bean方式,声明了一个Queue对象
        发送消息:使用RabbitTemplate对象的convertAndSend("队列名称", "消息内容");

消费者
        在任意bean对象里增加多个方法,每个方法都是一个消费者,可以处理消息
        @RabbitListener(queues="队列名称")
        public void 方法名(String msg){
            //msg的值,就是接收到的消息内容
        }
为了避免不同消费者,消费能力不均衡,

导致:某些消费者很快处理完了,其它消费者处理的慢导致消息堆积
不采用平均分摊消息的方式(默认)
让消费者每次拉取1条消息,处理完再拉取:谁的性能强,谁就能拉取、处理更多的消息


4. Direct模式(Routing模式)
    主要是用于:一条消息,可以自主决定发送到哪个队列上
    生产者:
        声明队列
        声明交换机:DirectExchange
        声明交换机与队列的绑定关系:每个绑定关系,都需要设置一个RoutingKey标识
        发送消息时,用的方法是:
            rabbitTemplate.convertAndSend("交换机名称", "消息标识", "消息内容")
    消费者:
        创建多个方法,每个方法监听不同的队列


5. Topic模式:
    主要是用于:更灵活的消息分发。可以根据消息的RoutingKey,与交换机绑定的通配符进行对比
        哪个队列的通配符跟消息RoutingKey匹配上了,就把消息发送到哪个队列上
    生产者:
        声明队列
        声明交换机:TopicExchange
        声明交换机与队列的绑定关系:每个绑定关系,都设置一个RoutingKey通配符
            使用.分隔开的多个英文单词,最多255个字节
            *表示一个单词
            #表示0个或多个单词
        发送消息时,使用的方法是:
            rabbitTemplate.convertAndSend("交换机名称","消息标识","消息内容");
    消费者:
        创建多个方法,每个方法监听不同的队列

五、其它内容

        

这篇关于RabbitMQ入门 安装 SpringAMQP简单队列、工作队列、发布订阅(扇出模式,广播模式)、Direct模式(Roting模式)、Topic模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/787905

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

CentOS7安装配置mysql5.7 tar免安装版

一、CentOS7.4系统自带mariadb # 查看系统自带的Mariadb[root@localhost~]# rpm -qa|grep mariadbmariadb-libs-5.5.44-2.el7.centos.x86_64# 卸载系统自带的Mariadb[root@localhost ~]# rpm -e --nodeps mariadb-libs-5.5.44-2.el7

Centos7安装Mongodb4

1、下载源码包 curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-4.2.1.tgz 2、解压 放到 /usr/local/ 目录下 tar -zxvf mongodb-linux-x86_64-rhel70-4.2.1.tgzmv mongodb-linux-x86_64-rhel70-4.2.1/

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu2289(简单二分)

虽说是简单二分,但是我还是wa死了  题意:已知圆台的体积,求高度 首先要知道圆台体积怎么求:设上下底的半径分别为r1,r2,高为h,V = PI*(r1*r1+r1*r2+r2*r2)*h/3 然后以h进行二分 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#includ

Centos7安装JDK1.8保姆版

工欲善其事,必先利其器。这句话同样适用于学习Java编程。在开始Java的学习旅程之前,我们必须首先配置好适合的开发环境。 通过事先准备好这些工具和配置,我们可以避免在学习过程中遇到因环境问题导致的代码异常或错误。一个稳定、高效的开发环境能够让我们更加专注于代码的学习和编写,提升学习效率,减少不必要的困扰和挫折感。因此,在学习Java之初,投入一些时间和精力来配置好开发环境是非常值得的。这将为我

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)