MQ,RabbitMQ,SpringAMQP的原理与实操

2024-02-06 08:20

本文主要是介绍MQ,RabbitMQ,SpringAMQP的原理与实操,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MQ

同步通信

image-20240202103233412

image-20240202105021949

image-20240202105123170

异步通信

image-20240202111930461

事件驱动优势:

  • 服务解耦

  • 性能提升,吞吐量提高

    image-20240202141023030

  • 服务没有强依赖,不担心级联失败问题

    image-20240202141137606

  • 流量消峰

    image-20240202141435355

​ 小结: 大多情况对时效性要求较高,所有大多数时间用同步。而如果不需要对方的结果,且吞吐量,并发量较高则需要使用异步通信

image-20240202141921703

MQ常见框架

MQ(MessageQueue),消息队列,字面来看就是存放消息的队列,也就是事件驱动架构中的Broker

消息:就是事件,比如支付成功了这个事件,在MQ中就是一个消息

image-20240202144211395

RabbitMQ,RocketMQ 适合处理业务(若需要优化定制则选Rocket,因为用Java写的)

Kafka 适合处理日志(海量数据且对数据安全性要求不高的场景),ActiveMQ用的较少

RabbitMQ

RabbitMQ概述与安装

RabbitMQ是基于Erlang语言(面向并发的语言,天生为分布式系统而设计的)开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

参考课前资料(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468) 来安装RabbitMQ

image-20240202144811905

之后在浏览器输入:http://192.168.83.130:15672/ 进入RabbitMQ管理页面,按docker run中设置的账号密码进行登录

结果如下

image-20240204101726227

mq整体架构

image-20240204103735587

小结

image-20240204103835121

常见消息模型

image-20240204105108372

HelloWorld 案例

image-20240204105310538

动手实践

案例: 完成官方Demo中的hello world案例(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468)

image-20240204105416259

打开项目,将ip调成自己的rabbitmq使用虚拟机(或电脑)的ip,再运行一次PublisherTest中的 testSendMessage() 方法

发送一条消息。再运行ConsumerTest 中main方法来接收消息。

image-20240204112803673

小结

image-20240204135103572

SpringAMQP

AMOP(Advanced Message Queuing Protocol)高级消息队列协议,大大简化消息发送和接收的代码量,且与语言无关

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

image-20240204145954201

image-20240204140927498

AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>    <groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加mq连接信息

spring:rabbitmq:host: 192.168.83.130 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机 username: itcast # 用户名password: 123321 # 密码

Basic Queue 简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:

1.在父工程中引入spring-amqp的依赖,以及在publisher服务中编写配置

2.在publisher服务中利用RabbitTemplate的convertAndSend方法,发送消息到simple.queue这个队列

image-20240204145734357

SpringAMQP发送消息步骤:引入依赖和设置配置---->利用RabbitTemplate的convertAndSend方法

3.在consumer中编写代码,接收消息

image-20240204151638720

SpringAMQP接收消息步骤:引入依赖和设置配置—》定义类,添加Component注解,类中声明方法添加@RabbitListener注解

Work Queue 工作队列模型

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

比如队列 一秒来50条消息 一个消费者一秒处理40条消息,那么需要两个消费者才能使得队列中消息被处理不丢失

image-20240204153355750

案例:实现一个队列绑定多个消费者

image-20240204153947098

问题:rabbitMQ消息预取,会将50条消息平均分给消费者1和消费者2,但消费者2处理速度慢,因此在1s内处理不完publisher发过来的50条消息

解决方案:让能者多劳,设置preFetch,控制预取消息的上限

image-20240204160513742

小结image-20240204161439493

发布、订阅模型-Fanout

image-20240204161952605

注意:exchange负责消息路由,而不是存储(queue负责存储),路由失败则消息丢失

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue(广播)

案例:利用SpringAMQP演示FanoutExchange的使用

image-20240204163804072

step1 在consumer服务中声明Exchange、Queue、Binding(绑定关系)

image-20240204163828992

image-20240204164305716

step2 在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

step3 在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {// 队列名称  String exchangeName = "itcast.fanout"; // 消息String message = "hello, everyone!";// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息		rabbitTemplate.convertAndSend(exchangeName, "", message);
}

小结

image-20240205092228233

发布、订阅模型-Direct

image-20240205092356181

案例:利用SpringAMQP演示DirectExchange的使用

image-20240205092544599

步骤一 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二 在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testDirectExchange() {//交换机名字String exchangeName = "itcast.direct";//消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";//发送消息,参数依次为:交换机名称,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

从blue->yellow->red 运行三次,得到结果如下

image-20240205104021565

小结

image-20240205104321850

发布、订阅模型-Topic

image-20240205104559605

案例 利用SpringAMQP演示TopicExchange的使用

image-20240205104825731

步骤一:在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二:在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testTopicExchange() {//交换机名字String exchangeName = "itcast.topic";//消息String message = "喜报!孙悟空大战哥斯拉,胜!";//发送消息,参数依次为:交换机名称,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

小结

image-20240205105655795

消息转化器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

​ 在publisher服务引入依赖

<dependency>   <groupId>com.fasterxml.jackson.core</groupId>   <artifactId>jackson-databind</artifactId>
</dependency>

​ 在publisher服务声明MessageConverter。(原本应该放到配置类中,但启动类也是配置类,所以可以放启动类中)

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter(); 
}

image-20240205111950238

案例 测试发送Object类型消息

image-20240205111336946

结果如下(没有更改JDK序列化方式)

image-20240205111231469

使用json序列化器之后

image-20240205111303797

consumer接收消息过程

step1:加jackson依赖,依赖上面已经放父工程中,就不用做了

step2: 将pulisher中相同的MessageConverter放入consumer 启动类中(发送方与接收方必须相同)

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter(); 
}

step3: 定义一个消费者,监听object.queue队列并消费消息

 @RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){System.out.println("消费者........接收到对象消息:【" + msg + "】" + LocalTime.now());
}

image-20240205135854654

这篇关于MQ,RabbitMQ,SpringAMQP的原理与实操的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/683699

相关文章

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

hdu4407容斥原理

题意: 有一个元素为 1~n 的数列{An},有2种操作(1000次): 1、求某段区间 [a,b] 中与 p 互质的数的和。 2、将数列中某个位置元素的值改变。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.Inpu

hdu4059容斥原理

求1-n中与n互质的数的4次方之和 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.PrintWrit

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

寻迹模块TCRT5000的应用原理和功能实现(基于STM32)

目录 概述 1 认识TCRT5000 1.1 模块介绍 1.2 电气特性 2 系统应用 2.1 系统架构 2.2 STM32Cube创建工程 3 功能实现 3.1 代码实现 3.2 源代码文件 4 功能测试 4.1 检测黑线状态 4.2 未检测黑线状态 概述 本文主要介绍TCRT5000模块的使用原理,包括该模块的硬件实现方式,电路实现原理,还使用STM32类

TL-Tomcat中长连接的底层源码原理实现

长连接:浏览器告诉tomcat不要将请求关掉。  如果不是长连接,tomcat响应后会告诉浏览器把这个连接关掉。    tomcat中有一个缓冲区  如果发送大批量数据后 又不处理  那么会堆积缓冲区 后面的请求会越来越慢。

PHP原理之内存管理中难懂的几个点

PHP的内存管理, 分为俩大部分, 第一部分是PHP自身的内存管理, 这部分主要的内容就是引用计数, 写时复制, 等等面向应用的层面的管理. 而第二部分就是今天我要介绍的, zend_alloc中描写的关于PHP自身的内存管理, 包括它是如何管理可用内存, 如何分配内存等. 另外, 为什么要写这个呢, 因为之前并没有任何资料来介绍PHP内存管理中使用的策略, 数据结构, 或者算法. 而在我们

Smarty模板执行原理

为了实现程序的业务逻辑和内容表现页面的分离从而提高开发速度,php 引入了模板引擎的概念,php 模板引擎里面最流行的可以说是smarty了,smarty因其功能强大而且速度快而被广大php web开发者所认可。本文将记录一下smarty模板引擎的工作执行原理,算是加深一下理解。 其实所有的模板引擎的工作原理是差不多的,无非就是在php程序里面用正则匹配将模板里面的标签替换为php代码从而将两者