微服务:Rabbitmq的WorkQueue模型的使用、默认消费方式(消息队列中间件)

本文主要是介绍微服务:Rabbitmq的WorkQueue模型的使用、默认消费方式(消息队列中间件),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • WorkQueue模型
      • 控制预取消息个数

WorkQueue模型

当然,一个队列,可以由多个消费者去监听。

来实现一下.

生产者:

    @Testpublic void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message-";for (int i = 0; i < 50; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(30); // 每次}}

消费者(这里我们弄两个):

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:" + msg);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:" + msg);}
}

启动看一下结果:

先启动消费者,再发送大量消息:

在这里插入图片描述

这里是因为mq有预分配,一人一半,消费能力一样,所以看起来像是轮流一人执行了一次一样,其实不是,后面会说到。

先发送大量消息,再启动消费者:

在这里插入图片描述

这里是因为消费者1先启动了,2还没启动呢,就被1消费完了。

所以我们改造一下测试代码,让消费者消费能力不同,同时让消费者先都启动,然后再送大量消息:

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:" + msg);Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:" + msg);Thread.sleep(200);}
消费者2接收到消息:hello, message-0
消费者1接收到消息:hello, message-1
消费者1接收到消息:hello, message-3
消费者1接收到消息:hello, message-5
消费者2接收到消息:hello, message-2
消费者1接收到消息:hello, message-7
消费者1接收到消息:hello, message-9
消费者1接收到消息:hello, message-11
消费者2接收到消息:hello, message-4
消费者1接收到消息:hello, message-13
消费者1接收到消息:hello, message-15
消费者1接收到消息:hello, message-17
消费者1接收到消息:hello, message-19
消费者2接收到消息:hello, message-6
消费者1接收到消息:hello, message-21
消费者1接收到消息:hello, message-23
消费者1接收到消息:hello, message-25
消费者2接收到消息:hello, message-8
消费者1接收到消息:hello, message-27
消费者1接收到消息:hello, message-29
消费者1接收到消息:hello, message-31
消费者2接收到消息:hello, message-10
消费者1接收到消息:hello, message-33
消费者1接收到消息:hello, message-35
消费者1接收到消息:hello, message-37
消费者2接收到消息:hello, message-12
消费者1接收到消息:hello, message-39
消费者1接收到消息:hello, message-41
消费者1接收到消息:hello, message-43
消费者2接收到消息:hello, message-14
消费者1接收到消息:hello, message-45
消费者1接收到消息:hello, message-47
消费者1接收到消息:hello, message-49
消费者2接收到消息:hello, message-16
消费者2接收到消息:hello, message-18
消费者2接收到消息:hello, message-20
消费者2接收到消息:hello, message-22
消费者2接收到消息:hello, message-24
消费者2接收到消息:hello, message-26
消费者2接收到消息:hello, message-28
消费者2接收到消息:hello, message-30
消费者2接收到消息:hello, message-32
消费者2接收到消息:hello, message-34
消费者2接收到消息:hello, message-36
消费者2接收到消息:hello, message-38
消费者2接收到消息:hello, message-40
消费者2接收到消息:hello, message-42
消费者2接收到消息:hello, message-44
消费者2接收到消息:hello, message-46
消费者2接收到消息:hello, message-48

可以看到,其实rabbitmq默认有预分配(预取,每个消费者和队列中有一个通道,存放预取的消息),平均分消息,然后各自独立消费,所以消费者2要比消费者1消费完25条(50/2)消息时间长。

显然是不合理的,我们可以改造一下:

控制预取消息个数

配置中prefetch设置为1,每次消费完消息才取下一个。(能力越大,责任越大,消费快的,消费越多)

spring:rabbitmq:host: ip # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码listener:simple:prefetch: 1

重新试一下:

消费者2接收到消息:hello, message-0
消费者1接收到消息:hello, message-1
消费者1接收到消息:hello, message-2
消费者1接收到消息:hello, message-3
消费者1接收到消息:hello, message-4
消费者2接收到消息:hello, message-5
消费者1接收到消息:hello, message-6
消费者1接收到消息:hello, message-7
消费者1接收到消息:hello, message-8
消费者1接收到消息:hello, message-9
消费者2接收到消息:hello, message-10
消费者1接收到消息:hello, message-11
消费者1接收到消息:hello, message-12
消费者1接收到消息:hello, message-13
消费者1接收到消息:hello, message-14
消费者2接收到消息:hello, message-15
消费者1接收到消息:hello, message-16
消费者1接收到消息:hello, message-17
消费者1接收到消息:hello, message-18
消费者2接收到消息:hello, message-19
消费者1接收到消息:hello, message-20
消费者1接收到消息:hello, message-21
消费者1接收到消息:hello, message-22
消费者1接收到消息:hello, message-23
消费者2接收到消息:hello, message-24
消费者1接收到消息:hello, message-25
消费者1接收到消息:hello, message-26
消费者1接收到消息:hello, message-27
消费者1接收到消息:hello, message-28
消费者2接收到消息:hello, message-29
消费者1接收到消息:hello, message-30
消费者1接收到消息:hello, message-31
消费者1接收到消息:hello, message-32
消费者1接收到消息:hello, message-33
消费者2接收到消息:hello, message-34
消费者1接收到消息:hello, message-35
消费者1接收到消息:hello, message-36
消费者1接收到消息:hello, message-37
消费者1接收到消息:hello, message-38
消费者2接收到消息:hello, message-39
消费者1接收到消息:hello, message-40
消费者1接收到消息:hello, message-41
消费者1接收到消息:hello, message-42
消费者1接收到消息:hello, message-43
消费者2接收到消息:hello, message-44
消费者1接收到消息:hello, message-45
消费者1接收到消息:hello, message-46
消费者1接收到消息:hello, message-47
消费者1接收到消息:hello, message-48
消费者2接收到消息:hello, message-49

这样,消费能力大的(消费者1),消费的越多。

这篇关于微服务:Rabbitmq的WorkQueue模型的使用、默认消费方式(消息队列中间件)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1026672

相关文章

鸿蒙中@State的原理使用详解(HarmonyOS 5)

《鸿蒙中@State的原理使用详解(HarmonyOS5)》@State是HarmonyOSArkTS框架中用于管理组件状态的核心装饰器,其核心作用是实现数据驱动UI的响应式编程模式,本文给大家介绍... 目录一、@State在鸿蒙中是做什么的?二、@Spythontate的基本原理1. 依赖关系的收集2.

Python基础语法中defaultdict的使用小结

《Python基础语法中defaultdict的使用小结》Python的defaultdict是collections模块中提供的一种特殊的字典类型,它与普通的字典(dict)有着相似的功能,本文主要... 目录示例1示例2python的defaultdict是collections模块中提供的一种特殊的字

Spring中配置ContextLoaderListener方式

《Spring中配置ContextLoaderListener方式》:本文主要介绍Spring中配置ContextLoaderListener方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录Spring中配置ContextLoaderLishttp://www.chinasem.cntene

C++ Sort函数使用场景分析

《C++Sort函数使用场景分析》sort函数是algorithm库下的一个函数,sort函数是不稳定的,即大小相同的元素在排序后相对顺序可能发生改变,如果某些场景需要保持相同元素间的相对顺序,可使... 目录C++ Sort函数详解一、sort函数调用的两种方式二、sort函数使用场景三、sort函数排序

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

Java String字符串的常用使用方法

《JavaString字符串的常用使用方法》String是JDK提供的一个类,是引用类型,并不是基本的数据类型,String用于字符串操作,在之前学习c语言的时候,对于一些字符串,会初始化字符数组表... 目录一、什么是String二、如何定义一个String1. 用双引号定义2. 通过构造函数定义三、St

Pydantic中Optional 和Union类型的使用

《Pydantic中Optional和Union类型的使用》本文主要介绍了Pydantic中Optional和Union类型的使用,这两者在处理可选字段和多类型字段时尤为重要,文中通过示例代码介绍的... 目录简介Optional 类型Union 类型Optional 和 Union 的组合总结简介Pyd

Vue3使用router,params传参为空问题

《Vue3使用router,params传参为空问题》:本文主要介绍Vue3使用router,params传参为空问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录vue3使用China编程router,params传参为空1.使用query方式传参2.使用 Histo

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA

AJAX请求上传下载进度监控实现方式

《AJAX请求上传下载进度监控实现方式》在日常Web开发中,AJAX(AsynchronousJavaScriptandXML)被广泛用于异步请求数据,而无需刷新整个页面,:本文主要介绍AJAX请... 目录1. 前言2. 基于XMLHttpRequest的进度监控2.1 基础版文件上传监控2.2 增强版多