RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机

本文主要是介绍RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机

文章目录

    • RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机
        • 1.发布/订阅 模式
        • 2.Direct 直连交换机
        • 3.Fanout 扇形交换机
          • 3.1 代码实战
            • 3.1.1 生产者
            • 3.1.2 消费者1
            • 3.1.3 消费者2
            • 3.1.4 执行结果

1.发布/订阅 模式

本篇文章紧接 发布订阅模式前篇 RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)
本片文章接着讲 发布订阅模式的另外几种交换机

2.Direct 直连交换机

在上篇文章中讲过 RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)

3.Fanout 扇形交换机

扇型交换机(fanout)很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列,常用场景就是日志系统,本条日志要打印到每一个异步系统中,所以每个系统都要写进自己的log。

这次我们采用一个Fanout exchange,然后绑定多个队列,不同的队列还使用不同的RoutingKey,生产者生产消息时候,不设置RoutingKey及设置RoutingKey看一下效果。

3.1 代码实战

先创建包,新建包fanout,包下面新建交换机枚举ExchangeTypeEnum

package fanout;public enum ExchangeTypeEnum {DIRECT("exchange-direct-name", "direct"),FANOUT("exchange-fanout-name", "fanout"),TOPIC("exchange-topic-name", "topic"),HEADER("exchange-header-name", "headers"),UNKNOWN("unknown-exchange-name", "direct");/*** 交换机名字*/private String name;/*** 交换机类型*/private String type;ExchangeTypeEnum(String name, String type) {this.name = name;this.type = type;}public String getName() {return name;}public String getType() {return type;}public static ExchangeTypeEnum getEnum(String type) {ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();for (ExchangeTypeEnum exchange : exchangeArrays) {if (exchange.getName().equals(type)) {return exchange;}}return ExchangeTypeEnum.UNKNOWN;}}

创建一个常量类FanoutConst

package fanout;public class FanoutConst {/*** 消息订阅队列 C*/public final static String SUBSCRIBE_QUEUE_NAME_FANOUT_F = "subscribe_queue_fanout_F";/*** 消息订阅队列 D*/public final static String SUBSCRIBE_QUEUE_NAME_FANOUT_G = "subscribe_queue_fanout_G";/*** 路由RoutingKey*/public final static String ROUTINGKEY_F = "rk_subscribe_queue_F";/*** 路由RoutingKey*/public final static String ROUTINGKEY_G = "rk_subscribe_queue_G";}
3.1.1 生产者

队列绑定设置RoutingKey,但是我们的 channel.basicPublish(ExchangeTypeEnum.FANOUT.getName(), “”, null, message.getBytes()); basicPublish生产消息 的RoutingKey是空的,并没有指定 rk

package fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.time.LocalDate;
import java.time.LocalTime;import static fanout.FanoutConst.*;public class FanoutProducer {/*** 生产 Direct直连 交换机的MQ消息*/public static void produceFanoutExchangeMessage(Integer i) throws Exception {// 获取到连接以及mq通道Connection connection = MqConnectUtil.getConnectionDefault();// 从连接中创建通道Channel channel = connection.createChannel();/*声明 直连交换机 交换机 String exchange,* 参数明细* 1、交换机名称* 2、交换机类型,fanout*/channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getName(), ExchangeTypeEnum.FANOUT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_FANOUT_F, true, false, false, null);channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_FANOUT_G, true, false, false, null);/*交换机和队列绑定String queue, String exchange, String routingKey* 参数明细* 1、队列名称* 2、交换机名称* 3、路由key rk.subscribe_queue_direct*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_FANOUT_F, ExchangeTypeEnum.FANOUT.getName(), ROUTINGKEY_F);channel.queueBind(SUBSCRIBE_QUEUE_NAME_FANOUT_G, ExchangeTypeEnum.FANOUT.getName(), ROUTINGKEY_G);//定义消息内容(发布多条消息)String message = "id=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.FANOUT.getName(), "", null, message.getBytes());System.out.println(" ****  Producer  Sent Message: '" + message + "'");//关闭通道和连接channel.close();connection.close();}public static void main(String[] argv) throws Exception {//生产 10条 FanoutExchange 的队列消息for (int i = 0; i < 10; i++) {produceFanoutExchangeMessage(i);}}
}
3.1.2 消费者1

消费者1 消费 F队列

package fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static fanout.FanoutConst.ROUTINGKEY_F;
import static fanout.FanoutConst.SUBSCRIBE_QUEUE_NAME_FANOUT_F;public class FanoutConsumer1 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getName(), ExchangeTypeEnum.FANOUT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_FANOUT_F, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_FANOUT_F, ExchangeTypeEnum.FANOUT.getName(), ROUTINGKEY_F);System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_FANOUT_F, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->1 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.3 消费者2

消费者2 消费 G队列

package fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static fanout.FanoutConst.ROUTINGKEY_F;
import static fanout.FanoutConst.SUBSCRIBE_QUEUE_NAME_FANOUT_G;public class FanoutConsumer2 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getName(), ExchangeTypeEnum.FANOUT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_FANOUT_G, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_FANOUT_G, ExchangeTypeEnum.FANOUT.getName(), ROUTINGKEY_F);System.out.println(" **** Consumer->2 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_FANOUT_G, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.4 执行结果

生产10条消息,消费者1和2全都收到了10条消息
在这里插入图片描述
执行消费者1和消费者2,消息消费完毕,队列清空
在这里插入图片描述

下面我们测试一下,如果我们的生产者,在生产消息的时候随便设置一个RoutingKey,我们到底能不能在队列中找到消息
代码设置 rk为123456

channel.basicPublish(ExchangeTypeEnum.FANOUT.getName(), "123456", null, message.getBytes());

看下队列,依旧是每一个队列中都有10条消息,这说明,FANOUT交换机,生产者设置的RK并不能生效,而是根据绑定的队列来进行所有队列收发消息的
在这里插入图片描述


下篇我们介绍 RabbitMQ系列(七)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(3)-主题交换机

这篇关于RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965436

相关文章

使用Python构建一个Hexo博客发布工具

《使用Python构建一个Hexo博客发布工具》虽然Hexo的命令行工具非常强大,但对于日常的博客撰写和发布过程,我总觉得缺少一个直观的图形界面来简化操作,下面我们就来看看如何使用Python构建一个... 目录引言Hexo博客系统简介设计需求技术选择代码实现主框架界面设计核心功能实现1. 发布文章2. 加

售价599元起! 华为路由器X1/Pro发布 配置与区别一览

《售价599元起!华为路由器X1/Pro发布配置与区别一览》华为路由器X1/Pro发布,有朋友留言问华为路由X1和X1Pro怎么选择,关于这个问题,本期图文将对这二款路由器做了期参数对比,大家看... 华为路由 X1 系列已经正式发布并开启预售,将在 4 月 25 日 10:08 正式开售,两款产品分别为华

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

利用Python快速搭建Markdown笔记发布系统

《利用Python快速搭建Markdown笔记发布系统》这篇文章主要为大家详细介绍了使用Python生态的成熟工具,在30分钟内搭建一个支持Markdown渲染、分类标签、全文搜索的私有化知识发布系统... 目录引言:为什么要自建知识博客一、技术选型:极简主义开发栈二、系统架构设计三、核心代码实现(分步解析

微信公众号脚本-获取热搜自动新建草稿并发布文章

《微信公众号脚本-获取热搜自动新建草稿并发布文章》本来想写一个自动化发布微信公众号的小绿书的脚本,但是微信公众号官网没有小绿书的接口,那就写一个获取热搜微信普通文章的脚本吧,:本文主要介绍微信公众... 目录介绍思路前期准备环境要求获取接口token获取热搜获取热搜数据下载热搜图片给图片加上标题文字上传图片

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Linux系统配置NAT网络模式的详细步骤(附图文)

《Linux系统配置NAT网络模式的详细步骤(附图文)》本文详细指导如何在VMware环境下配置NAT网络模式,包括设置主机和虚拟机的IP地址、网关,以及针对Linux和Windows系统的具体步骤,... 目录一、配置NAT网络模式二、设置虚拟机交换机网关2.1 打开虚拟机2.2 管理员授权2.3 设置子

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

新特性抢先看! Ubuntu 25.04 Beta 发布:Linux 6.14 内核

《新特性抢先看!Ubuntu25.04Beta发布:Linux6.14内核》Canonical公司近日发布了Ubuntu25.04Beta版,这一版本被赋予了一个活泼的代号——“Plu... Canonical 昨日(3 月 27 日)放出了 Beta 版 Ubuntu 25.04 系统镜像,代号“Pluc