SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型

2024-04-08 06:12

本文主要是介绍SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型

文章目录

  • SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型
  • 3. 发布/订阅模型之fanout模型
    • 1. 说明
    • 1. 消息发布者
      • 1. 创建工作队列的配置类
      • 2. 发布消费Controller
    • 2. 消息消费者One
    • 3. 消息消费者Two
    • 4. 消息消费者Three
    • 5. 输出结果

3. 发布/订阅模型之fanout模型

在这里插入图片描述

1. 说明

RabbitMQ广播(Fnaout)模型,也称为发布/订阅模型,是一种消息传递模式,用于将消息发送给多个消费者。在广播模型中,生产者发送消息到一个交换机(Exchange),而交换机将消息广播给所有绑定到它上面的队列(Queue),每个队列都有一个消费者监听并处理消息。

下面是 RabbitMQ 广播模型的基本原理和步骤:

  1. 交换机(Exchange): 生产者将消息发送到交换机。交换机负责将消息路由到一个或多个与之绑定的队列。
  2. 队列(Queue): 每个消费者都有一个独立的队列。队列存储交换机发送的消息,并将其提供给消费者。
  3. 绑定(Binding): 将队列与交换机进行绑定,指定一个或多个交换机将消息发送到该队列。
  4. 消费者(Consumer): 消费者监听队列,并处理收到的消息。

在广播模型中,有两种类型的交换机可供选择:

  • Fanout Exchange(扇出交换机): 扇出交换机会将消息广播到绑定到它上面的所有队列,无视消息的路由键(Routing Key)。
  • Headers Exchange(头交换机): 头交换机根据消息的 header 信息来进行匹配,并将消息发送到与 header 匹配的队列。

下面是使用 RabbitMQ 广播模型的基本步骤:

  1. 创建交换机,指定交换机类型为 Fanout Exchange。
  2. 创建队列,并将其绑定到交换机上。
  3. 生产者发送消息到交换机。
  4. 消费者监听队列,接收并处理消息。

使用 RabbitMQ 广播模型可以实现消息的一对多传递,适用于需要向多个消费者发送相同消息的场景,比如日志系统、通知系统等。

1. 消息发布者

1. 创建工作队列的配置类

package com.happy.msg.config;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** <p>** @Description: 工作队列模型_创建名称为 work_queue 的队列 <br>* </p>* @Datetime: 2024/3/27 18:18* @Author: Yuan · JinSheng <br>* @Since 2024/3/27 18:18*/
@Configuration
public class WorkQueueConfig {@BeanQueue workQueue() {return QueueBuilder.durable("work_queue").build();}
}

2. 发布消费Controller

package com.happy.msg.publisher;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;/*** <p>** @Description: 生产消息的控制器 <br>* </p>* @Datetime: 2024/3/27 10:53* @Author: Yuan · JinSheng <br>* @Since 2024/3/27 10:53*/
@RestController
@RequestMapping("/work")
public class WorkQueuePublisherController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String sentMessage() {for (int i = 1; i <=10 ; i++) {rabbitTemplate.convertAndSend("work_queue", "work_queue队列第["+i+"]条消息,hello,rabbitmq"+i);}return "发送成功";}
}

2. 消息消费者One

package com.happy.msg.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
/*** <p>* @Description: 工作队列模型_消息消费者一 <br>* </p>* @Datetime: 2024/3/28 20:28* @Author: Yuan · JinSheng <br>* @Since 2024/3/28 20:28*/
@Slf4j
@Component
public class WorkQueueConsumerOne {/**** @param message 消息* @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印* @Author: Yuan · JinSheng*/@RabbitListener(queues = "work_queue")public void msg(Message message){byte[] messageBody = message.getBody();String msg = new String(messageBody);log.info("WorkQueueConsumerOne接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());}
}
  1. 输出结果
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[1]条消息,hello,rabbitmq1,===接收时间===2024-03-29T10:34:34.468074100
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[3]条消息,hello,rabbitmq3,===接收时间===2024-03-29T10:34:34.469081600
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[5]条消息,hello,rabbitmq5,===接收时间===2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[7]条消息,hello,rabbitmq7,===接收时间===2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[9]条消息,hello,rabbitmq9,===接收时间===2024-03-29T10:34:34.470811800

3. 消息消费者Two

package com.happy.msg.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
/*** <p>* @Description: 工作队列模型_消息消费者二<br>* </p>* @Datetime: 2024/3/28 20:30* @Author: Yuan · JinSheng <br>* @Since 2024/3/28 20:30*/
@Slf4j
@Component
public class WorkQueueConsumerTwo {/**** @param message 消息* @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印* @Author: Yuan · JinSheng*/@RabbitListener(queues = "work_queue")public void msg(Message message){byte[] messageBody = message.getBody();String msg = new String(messageBody);log.info("WorkQueueConsumerTwo接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());}
}

4. 消息消费者Three

package com.happy.msg.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
/*** <p>* @Description: 工作队列模型_消息消费者三<br>* </p>* @Datetime: 2024/3/28 20:30* @Author: Yuan · JinSheng <br>* @Since 2024/3/28 20:30*/
@Slf4j
@Component
public class WorkQueueConsumerTwo {/**** @param message 消息* @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印* @Author: Yuan · JinSheng*/@RabbitListener(queues = "work_queue")public void msg(Message message){byte[] messageBody = message.getBody();String msg = new String(messageBody);//log.info("WorkQueueConsumerTwo接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());System.out.println("WorkQueueConsumerTwo接收到work_queue队列中的消息==="+msg+",===接收时间==="+LocalDateTime.now());}
}

5. 输出结果

可看到消息被三个消费者按相等数量消费,总共10条消息,消费者1消费了4条,其他两个消费者消费了3条消息

WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[3]条消息,hello,rabbitmq3,===接收时间===2024-03-29T10:39:32.942546200
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[2]条消息,hello,rabbitmq2,===接收时间===2024-03-29T10:39:32.942546200
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[1]条消息,hello,rabbitmq1,===接收时间===2024-03-29T10:39:32.942037700
WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[6]条消息,hello,rabbitmq6,===接收时间===2024-03-29T10:39:32.943806300
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[4]条消息,hello,rabbitmq4,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[9]条消息,hello,rabbitmq9,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[5]条消息,hello,rabbitmq5,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[7]条消息,hello,rabbitmq7,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[10]条消息,hello,rabbitmq10,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[8]条消息,hello,rabbitmq8,===接收时间===2024-03-29T10:39:32.944336900

这篇关于SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/884778

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

Retrieval-based-Voice-Conversion-WebUI模型构建指南

一、模型介绍 Retrieval-based-Voice-Conversion-WebUI(简称 RVC)模型是一个基于 VITS(Variational Inference with adversarial learning for end-to-end Text-to-Speech)的简单易用的语音转换框架。 具有以下特点 简单易用:RVC 模型通过简单易用的网页界面,使得用户无需深入了

透彻!驯服大型语言模型(LLMs)的五种方法,及具体方法选择思路

引言 随着时间的发展,大型语言模型不再停留在演示阶段而是逐步面向生产系统的应用,随着人们期望的不断增加,目标也发生了巨大的变化。在短短的几个月的时间里,人们对大模型的认识已经从对其zero-shot能力感到惊讶,转变为考虑改进模型质量、提高模型可用性。 「大语言模型(LLMs)其实就是利用高容量的模型架构(例如Transformer)对海量的、多种多样的数据分布进行建模得到,它包含了大量的先验

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}

秋招最新大模型算法面试,熬夜都要肝完它

💥大家在面试大模型LLM这个板块的时候,不知道面试完会不会复盘、总结,做笔记的习惯,这份大模型算法岗面试八股笔记也帮助不少人拿到过offer ✨对于面试大模型算法工程师会有一定的帮助,都附有完整答案,熬夜也要看完,祝大家一臂之力 这份《大模型算法工程师面试题》已经上传CSDN,还有完整版的大模型 AI 学习资料,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

AI Toolkit + H100 GPU,一小时内微调最新热门文生图模型 FLUX

上个月,FLUX 席卷了互联网,这并非没有原因。他们声称优于 DALLE 3、Ideogram 和 Stable Diffusion 3 等模型,而这一点已被证明是有依据的。随着越来越多的流行图像生成工具(如 Stable Diffusion Web UI Forge 和 ComyUI)开始支持这些模型,FLUX 在 Stable Diffusion 领域的扩展将会持续下去。 自 FLU