SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)

2024-03-28 09:20

本文主要是介绍SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1. 前言
  • 2. 思路
  • 3. 消息发送
  • 4. 消息接收
    • 4.1 能者多劳
  • 总结


在这里插入图片描述

1. 前言

上一篇文章,实现了用 SpringBoot实现RabbitMQ的简单队列, 篇文章 操作 用SpringBoot实现RabbitMQWorkQueue(SpringAMQP 实现WorkQueue)

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

在这里插入图片描述

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用 work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

注意:不一定是两个消费者。


2. 思路

代码实现思路:

1.在 publisher 服务中定义测试方法,产生50条消息(每隔20ms发送一条),发送到 test2024.simple.queue

2.在 consumer 服务中定义两个消息监听者,都监听simple.queue队列

3.消费者1处理50条消息(每隔20ms处理一条),消费者2处理50条消息(每隔100ms处理一条)


3. 消息发送

发送消息前先把 服务接收 的服务停止调.

循环 50 次, 每次休息 20 ms , 发送消息到指定队列

@Test
public void test01() throws InterruptedException {//  声明队列名称String queueName = "test2024.simple.queue";String message = "work_";for (int i = 1; i <= 50; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

发送完消息后, 我们看到消息整整齐齐的在 队列里躺着.

在这里插入图片描述


4. 消息接收

来连个监听器监听指定的队列, 但是两个监听队列的消费时间不一样.

@Component
public class SpringRabbitListener {//  监听指定队列,Spring只要接收到该队列的消息就会接收消息@RabbitListener(queues = "test2024.simple.queue")public void rabbitListener1(String message){System.out.println("1号接收器-接收到消息:" + message);;}//  监听指定队列,Spring只要接收到该队列的消息就会接收消息@RabbitListener(queues = "test2024.simple.queue")public void rabbitListener2(String message){System.out.println("2号接收器-接收到消息:" + message);;}}

此时启动消费者服务,然后再启动发送消息.

在这里插入图片描述

可以看到 1 号接收器很快消费了 25 条, 然后 2 号接收器缓慢的完成自己的… 1 号接收器没有给 2 号接收器帮忙. 那么咋生产环境中 ,就会造成服务闲置的情况下不能及时消费消息.

说明:阐述上述原因是因为队列平均分配给每个消费者,即使当前消费者没有消费完,队列也会将消息分配给消费者。然后消费者一个一个消息消费,即使消费很快的消费者,消费完毕,而消费很慢的消费者一直在消费。这样很不合理。应该是哪个消费者消费快应该多消费。哪个消费者消费慢应该少消费。

4.1 能者多劳

在 Spring 中有一个简单的配置叫预取 prefetch,可以解决这个问题。我们修改 consumer 服务的 application.yml 文件,添加配置:

 listener:simple:prefetch: 1   # 消费者每次最多只能预取一条消息,当消费完这条消息后,才能获取下一个消息,这样做的好处是消费能力强的消费者,处理的消息就会更多===》能者多劳

在这里插入图片描述

做了如上配置后,启动消费者服务,再次发送消息:

在这里插入图片描述

我们看到消费能力强的1号接收器完成了更多的工作,这样就达到了能者多劳.


总结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

在这里插入图片描述



这篇关于SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/855186

相关文章

Java对象和JSON字符串之间的转换方法(全网最清晰)

《Java对象和JSON字符串之间的转换方法(全网最清晰)》:本文主要介绍如何在Java中使用Jackson库将对象转换为JSON字符串,并提供了一个简单的工具类示例,该工具类支持基本的转换功能,... 目录前言1. 引入 Jackson 依赖2. 创建 jsON 工具类3. 使用示例转换 Java 对象为

SpringBoot快速接入OpenAI大模型的方法(JDK8)

《SpringBoot快速接入OpenAI大模型的方法(JDK8)》本文介绍了如何使用AI4J快速接入OpenAI大模型,并展示了如何实现流式与非流式的输出,以及对函数调用的使用,AI4J支持JDK8... 目录使用AI4J快速接入OpenAI大模型介绍AI4J-github快速使用创建SpringBoot

Vue项目的甘特图组件之dhtmlx-gantt使用教程和实现效果展示(推荐)

《Vue项目的甘特图组件之dhtmlx-gantt使用教程和实现效果展示(推荐)》文章介绍了如何使用dhtmlx-gantt组件来实现公司的甘特图需求,并提供了一个简单的Vue组件示例,文章还分享了一... 目录一、首先 npm 安装插件二、创建一个vue组件三、业务页面内 引用自定义组件:四、dhtmlx

Java中的Cursor使用详解

《Java中的Cursor使用详解》本文介绍了Java中的Cursor接口及其在大数据集处理中的优势,包括逐行读取、分页处理、流控制、动态改变查询、并发控制和减少网络流量等,感兴趣的朋友一起看看吧... 最近看代码,有一段代码涉及到Cursor,感觉写法挺有意思的。注意是Cursor,而不是Consumer

解决java.lang.NullPointerException问题(空指针异常)

《解决java.lang.NullPointerException问题(空指针异常)》本文详细介绍了Java中的NullPointerException异常及其常见原因,包括对象引用为null、数组元... 目录Java.lang.NullPointerException(空指针异常)NullPointer

javaScript在表单提交时获取表单数据的示例代码

《javaScript在表单提交时获取表单数据的示例代码》本文介绍了五种在JavaScript中获取表单数据的方法:使用FormData对象、手动提取表单数据、使用querySelector获取单个字... 方法 1:使用 FormData 对象FormData 是一个方便的内置对象,用于获取表单中的键值

Vue ElementUI中Upload组件批量上传的实现代码

《VueElementUI中Upload组件批量上传的实现代码》ElementUI中Upload组件批量上传通过获取upload组件的DOM、文件、上传地址和数据,封装uploadFiles方法,使... ElementUI中Upload组件如何批量上传首先就是upload组件 <el-upl

前端知识点之Javascript选择输入框confirm用法

《前端知识点之Javascript选择输入框confirm用法》:本文主要介绍JavaScript中的confirm方法的基本用法、功能特点、注意事项及常见用途,文中通过代码介绍的非常详细,对大家... 目录1. 基本用法2. 功能特点①阻塞行为:confirm 对话框会阻塞脚本的执行,直到用户作出选择。②

Docker部署Jenkins持续集成(CI)工具的实现

《Docker部署Jenkins持续集成(CI)工具的实现》Jenkins是一个流行的开源自动化工具,广泛应用于持续集成(CI)和持续交付(CD)的环境中,本文介绍了使用Docker部署Jenkins... 目录前言一、准备工作二、设置变量和目录结构三、配置 docker 权限和网络四、启动 Jenkins

SpringBoot项目注入 traceId 追踪整个请求的日志链路(过程详解)

《SpringBoot项目注入traceId追踪整个请求的日志链路(过程详解)》本文介绍了如何在单体SpringBoot项目中通过手动实现过滤器或拦截器来注入traceId,以追踪整个请求的日志链... SpringBoot项目注入 traceId 来追踪整个请求的日志链路,有了 traceId, 我们在排