RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机

本文主要是介绍RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机

文章目录

    • RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机
        • 1.发布/订阅 模式
          • 1.1 发布/订阅模式工作原理
        • 2.基本概念 交换机
        • 3.代码实战
          • 3.1 直连交换机Direct
            • 3.1.1 生产者
            • 3.1.2 消费者1
            • 3.1.3 消费者2
            • 3.1.4 执行结果
          • 3.2 相同的routingKey,绑定到不同的队列 结果
            • 3.2.1运行结果
          • 3.3 不同的routingKey,绑定到同一个队列 结果
            • 3.3.1 运行结果

1.发布/订阅 模式

发布/订阅模式就是以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图是(RabbitMQ)的发布/订阅模式的图,可以满足不同的业务来处理不同的消息

乍一看,这不就是工作队列么,一个生产者,多个消费者???区别在哪里!!!
仔细看看 工作队列 RabbitMQ系列(四)RabbitMQ进阶-Queue队列特性 (二)工作队列 Work模式

看仔细喽,工作队列是一个队列,多个消费者
这个 发布订阅是 多个队列、多个队列可以对应多个消费者!中间还多个一个多了 exchange 交换机!!!

另外:
工作队列,两个消费者平均消费,能者多劳,但是消息只被消费1次
但是发布订阅的场景是 A队列需要接受到消息后,进行A业务的处理 ,B队列需要接收到消息后,进行B业务的处理

类似于一个员工离职了,财务部门收到离职消息要给他结算工资、账号部门接到离职消息要给他消除账号信息、党支部接到离职消息要给他转移组织关系,各个部门都接到消息后有自己的逻辑业务

1.1 发布/订阅模式工作原理

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听订阅该队列的消费者所接收并消费
对比 工作模式 可以看出来,我们当前的发布/订阅模式,多了 exchange 和多个队列,工作模式是1个队列绑定多个消费者,发布/订阅模式是 经过Exchange交换机后,绑定多个队列,多个消费者
在这里插入图片描述

2.基本概念 交换机

1. 发布者(producer)是发布消息的应用程序。
2. 队列(queue)用于消息存储的缓冲。
3. 消费者(consumer)是接收消息的应用程序。

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
  在这里插入图片描述
最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

  • Direct Exchange

直连交换机。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
在这里插入图片描述

  • Fanout Exchange

扇形交换机。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
在这里插入图片描述

  • Topic Exchange

主题交换机。将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
在这里插入图片描述

  • Headers Exchanges

头交换机。很少使用,头交换机根据发送的消息内容中的headers属性进行匹配。头交换机类似与主题交换机,但是却和主题交换机有着很大的不同。主题交换机使用路由键来进行消息的路由,而头交换机使用消息属性来进行消息的分发,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则

当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则x-match有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

3.代码实战

依旧是Maven项目,项目 pom还是参考之前 RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

新建交换机枚举类

package subscrib3;public enum ExchangeTypeEnum {DIRECT("exchange-direct-name", "direct"),FANOUT("exchange-fanout-name", "fanout"),TOPIC("exchange-topic-name", "topic"),HEADER("exchange-header-name", "headers"),UNKNOWN("unknown-exchange-name", "direct");/*** 交换机名字*/private String name;/*** 交换机类型*/private String type;ExchangeTypeEnum(String name, String type) {this.name = name;this.type = type;}public String getName() {return name;}public String getType() {return type;}public static ExchangeTypeEnum getEnum(String type) {ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();for (ExchangeTypeEnum exchange : exchangeArrays) {if (exchange.getName().equals(type)) {return exchange;}}return ExchangeTypeEnum.UNKNOWN;}}
3.1 直连交换机Direct

直连交换机就是让不同的Key精确的路由到不同的队列中,队列C的Key只有队列C可以接收到消息,队列D的key只有队列D可以收到消息,精确路由

3.1.1 生产者

在java包下 新建 subscribe3/direct包,然后在包下
新建 SubscribeConst

package subscrib3.direct;public class SubscribeConst {/*** 消息订阅队列 C*/public final static String SUBSCRIBE_QUEUE_NAME_DIRECT_C = "subscribe_queue_direct_C";/*** 消息订阅队列 D*/public final static String SUBSCRIBE_QUEUE_NAME_DIRECT_D = "subscribe_queue_direct_D";/*** 路由RoutingKey*/public final static String ROUTINGKEY_C = "rk_subscribe_queue_C";/*** 路由RoutingKey*/public final static String ROUTINGKEY_D = "rk_subscribe_queue_D";}

新建生产者 SubscribeProducerDirect

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.time.LocalDate;
import java.time.LocalTime;import static subscrib3.direct.SubscribeConst.*;public class SubscribeProducerDirect {/*** 生产 Direct直连 交换机的MQ消息*/public static void produceDirectExchangeMessage() throws Exception {// 获取到连接以及mq通道Connection connection = MqConnectUtil.getConnectionDefault();// 从连接中创建通道Channel channel = connection.createChannel();/*声明 直连交换机 交换机 String exchange,* 参数明细* 1、交换机名称* 2、交换机类型,direct*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);/*交换机和队列绑定String queue, String exchange, String routingKey* 参数明细* 1、队列名称* 2、交换机名称* 3、路由key rk.subscribe_queue_direct*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C);channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D);//定义消息内容(发布多条消息)String messageC = "Hello World! Time:" + LocalDate.now() + " " + LocalTime.now() + " id:C";/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C, null, messageC.getBytes());System.out.println("CCCCCCC ****  Producer  Sent Message: '" + messageC + "'");//        //定义消息内容(发布多条消息)String messageD = "Hello World! Time:" + LocalDate.now() + " " + LocalTime.now() + " id:D";/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D, null, messageD.getBytes());System.out.println("DDDDD ****  Producer  Sent Message: '" + messageD + "'");//关闭通道和连接channel.close();connection.close();}public static void main(String[] argv) throws Exception {//生产 10条 DirectExchange 的队列消息for (int i = 0; i < 10; i++) {produceDirectExchangeMessage();}}
}
3.1.2 消费者1

消费者1绑定 subscribe_queue_direct_C 队列

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_C;
import static subscrib3.direct.SubscribeConst.SUBSCRIBE_QUEUE_NAME_DIRECT_C;public class SubscribeQueueConsumerDirect1 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C);System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_DIRECT_C, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.3 消费者2

消费者2 绑定 subscribe_queue_direct_D 队列

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.io.IOException;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_D;
import static subscrib3.direct.SubscribeConst.SUBSCRIBE_QUEUE_NAME_DIRECT_D;public class SubscribeQueueConsumerDirect2 {public static void main(String[] argv) throws IOException {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D);System.out.println(" **** Consumer->2 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_DIRECT_D, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.4 执行结果

启动上生产者,生产10条消息,先看下 交换机及交换机绑定
exchange-direct-name 直连交换机通过 routingkey rk.subscribe_queue_direct 绑定了两个队列C和D
在这里插入图片描述
生产10条消息,分别设置不同的routingkey,可以看到 队列C和队列D各有10条,消息分别被分配到了C和D
在这里插入图片描述

队列中各10条数据

启动消费者C/D进行消费,消费完毕,队列消息清零
在这里插入图片描述
队列清零
在这里插入图片描述


到此我们完成了 通过RoutingKey去区分消费不同的消费者的消息订阅模式

3.2 相同的routingKey,绑定到不同的队列 结果

下面我们测试下,我们新建两个不同的队列,使用相同的routingKey,看下是不是每个队列都能消费到该消息,还是说竞争消费到同一条消息,C消费了D就不消费了
SameQueueConst.java

package subscrib3.same;public class SameQueueConst {/*** 消息订阅队列 C*/public final static String SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C = "same_subscribe_queue_direct_C";/*** 消息订阅队列 D*/public final static String SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D = "same_subscribe_queue_direct_D";/*** 路由RoutingKey*/public final static String SAME_ROUTINGKEY= "rk_same_subscribe_queue";}

生产者 SubscribeProducerDirect.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.time.LocalDate;
import java.time.LocalTime;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_C;
import static subscrib3.direct.SubscribeConst.ROUTINGKEY_D;
import static subscrib3.same.SameQueueConst.*;public class SameSubscribeProducerDirect {/*** 生产 Direct直连 交换机的MQ消息*/public static void produceDirectExchangeMessage() throws Exception {// 获取到连接以及mq通道Connection connection = MqConnectUtil.getConnectionDefault();// 从连接中创建通道Channel channel = connection.createChannel();/*声明 直连交换机 交换机 String exchange,* 参数明细* 1、交换机名称* 2、交换机类型,direct*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);/*交换机和队列绑定String queue, String exchange, String routingKey* 参数明细* 1、队列名称* 2、交换机名称* 3、路由key rk.subscribe_queue_direct*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);for (int i = 0; i < 10; i++) {//定义消息内容(发布多条消息)String messageC = "id=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY, null, messageC.getBytes());System.out.println("****  Producer  Sent Message: '" + messageC + "'");}//关闭通道和连接channel.close();connection.close();}public static void main(String[] argv) throws Exception {//生产 10条 DirectExchange 的队列消息produceDirectExchangeMessage();}
}

消费者1 SameSubscribeQueueConsumerDirect1.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static subscrib3.same.SameQueueConst.SAME_ROUTINGKEY;
import static subscrib3.same.SameQueueConst.SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C;public class SameSubscribeQueueConsumerDirect1 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}

消费者2 SameSubscribeQueueConsumerDirect2.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.io.IOException;import static subscrib3.same.SameQueueConst.SAME_ROUTINGKEY;
import static subscrib3.same.SameQueueConst.SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D;public class SameSubscribeQueueConsumerDirect2 {public static void main(String[] argv) throws IOException {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);System.out.println(" **** Consumer->2 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.2.1运行结果

生产10条 ,C和D全都收到10条
在这里插入图片描述

运行消费者C和D,消费完毕
在这里插入图片描述
由此可以看出,多个队列绑定同一个直连exchange的key,每个消费者都会消费这条消息
类似于广播fanout

3.3 不同的routingKey,绑定到同一个队列 结果

下面我们把上面的代码改下,新建1个队列,绑定两个Routingkey,看看发往 两个routingKey的消息,当前队列是否会收到两个key的,还是说key只能绑定一个,另一个会被覆盖,不会接受消息

3.3.1 运行结果

在这里插入图片描述

开启消费者
每个id有两条消息被消费,消费完毕,队列清零
在这里插入图片描述
看下队列绑定关系
该队列绑定多个Routingkey和Exchange,说明一个队列可以同时绑定多个不同的Routingkey,Routingkey之间互相不影响
在这里插入图片描述

!!! 特别注意一下 channel和connect 建立完毕后要 close,如果不close,会出现意想不到的效果
比如我的消费者代码里, while(count < 10 ) 换成 while(true) 一直循环等待消息消费,然后把 channel及connect的close代码删掉
这样你循环等待消费消息时候,链接是保持的,这样下次再运行 生产者,生产10条消息,channel的链接是保持的,这样就会出现部分消息丢失、并不是10条消息全都分配到C队列中的情况
!!!!

下篇我们介绍 RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机

这篇关于RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965435

相关文章

Rust中的Drop特性之解读自动化资源清理的魔法

《Rust中的Drop特性之解读自动化资源清理的魔法》Rust通过Drop特性实现了自动清理机制,确保资源在对象超出作用域时自动释放,避免了手动管理资源时可能出现的内存泄漏或双重释放问题,智能指针如B... 目录自动清理机制:Rust 的析构函数提前释放资源:std::mem::drop android的妙

SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程

《SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程》本文详细介绍了如何在虚拟机和宝塔面板中安装RabbitMQ,并使用Java代码实现消息的发送和接收,通过异步通讯,可以优化... 目录一、RabbitMQ安装二、启动RabbitMQ三、javascript编写Java代码1、引入

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

定价129元!支持双频 Wi-Fi 5的华为AX1路由器发布

《定价129元!支持双频Wi-Fi5的华为AX1路由器发布》华为上周推出了其最新的入门级Wi-Fi5路由器——华为路由AX1,建议零售价129元,这款路由器配置如何?详细请看下文介... 华为 Wi-Fi 5 路由 AX1 已正式开售,新品支持双频 1200 兆、配有四个千兆网口、提供可视化智能诊断功能,建

Java实现状态模式的示例代码

《Java实现状态模式的示例代码》状态模式是一种行为型设计模式,允许对象根据其内部状态改变行为,本文主要介绍了Java实现状态模式的示例代码,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来... 目录一、简介1、定义2、状态模式的结构二、Java实现案例1、电灯开关状态案例2、番茄工作法状态案例

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构

Python进阶之Excel基本操作介绍

《Python进阶之Excel基本操作介绍》在现实中,很多工作都需要与数据打交道,Excel作为常用的数据处理工具,一直备受人们的青睐,本文主要为大家介绍了一些Python中Excel的基本操作,希望... 目录概述写入使用 xlwt使用 XlsxWriter读取修改概述在现实中,很多工作都需要与数据打交

Redis延迟队列的实现示例

《Redis延迟队列的实现示例》Redis延迟队列是一种使用Redis实现的消息队列,本文主要介绍了Redis延迟队列的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、什么是 Redis 延迟队列二、实现原理三、Java 代码示例四、注意事项五、使用 Redi