RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机

本文主要是介绍RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机

文章目录

    • RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机
        • 1.发布/订阅 模式
          • 1.1 发布/订阅模式工作原理
        • 2.基本概念 交换机
        • 3.代码实战
          • 3.1 直连交换机Direct
            • 3.1.1 生产者
            • 3.1.2 消费者1
            • 3.1.3 消费者2
            • 3.1.4 执行结果
          • 3.2 相同的routingKey,绑定到不同的队列 结果
            • 3.2.1运行结果
          • 3.3 不同的routingKey,绑定到同一个队列 结果
            • 3.3.1 运行结果

1.发布/订阅 模式

发布/订阅模式就是以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图是(RabbitMQ)的发布/订阅模式的图,可以满足不同的业务来处理不同的消息

乍一看,这不就是工作队列么,一个生产者,多个消费者???区别在哪里!!!
仔细看看 工作队列 RabbitMQ系列(四)RabbitMQ进阶-Queue队列特性 (二)工作队列 Work模式

看仔细喽,工作队列是一个队列,多个消费者
这个 发布订阅是 多个队列、多个队列可以对应多个消费者!中间还多个一个多了 exchange 交换机!!!

另外:
工作队列,两个消费者平均消费,能者多劳,但是消息只被消费1次
但是发布订阅的场景是 A队列需要接受到消息后,进行A业务的处理 ,B队列需要接收到消息后,进行B业务的处理

类似于一个员工离职了,财务部门收到离职消息要给他结算工资、账号部门接到离职消息要给他消除账号信息、党支部接到离职消息要给他转移组织关系,各个部门都接到消息后有自己的逻辑业务

1.1 发布/订阅模式工作原理

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听订阅该队列的消费者所接收并消费
对比 工作模式 可以看出来,我们当前的发布/订阅模式,多了 exchange 和多个队列,工作模式是1个队列绑定多个消费者,发布/订阅模式是 经过Exchange交换机后,绑定多个队列,多个消费者
在这里插入图片描述

2.基本概念 交换机

1. 发布者(producer)是发布消息的应用程序。
2. 队列(queue)用于消息存储的缓冲。
3. 消费者(consumer)是接收消息的应用程序。

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
  在这里插入图片描述
最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

  • Direct Exchange

直连交换机。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
在这里插入图片描述

  • Fanout Exchange

扇形交换机。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
在这里插入图片描述

  • Topic Exchange

主题交换机。将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
在这里插入图片描述

  • Headers Exchanges

头交换机。很少使用,头交换机根据发送的消息内容中的headers属性进行匹配。头交换机类似与主题交换机,但是却和主题交换机有着很大的不同。主题交换机使用路由键来进行消息的路由,而头交换机使用消息属性来进行消息的分发,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则

当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则x-match有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

3.代码实战

依旧是Maven项目,项目 pom还是参考之前 RabbitMQ系列(三)RabbitMQ进阶-Queue队列特性 (一)简单队列

新建交换机枚举类

package subscrib3;public enum ExchangeTypeEnum {DIRECT("exchange-direct-name", "direct"),FANOUT("exchange-fanout-name", "fanout"),TOPIC("exchange-topic-name", "topic"),HEADER("exchange-header-name", "headers"),UNKNOWN("unknown-exchange-name", "direct");/*** 交换机名字*/private String name;/*** 交换机类型*/private String type;ExchangeTypeEnum(String name, String type) {this.name = name;this.type = type;}public String getName() {return name;}public String getType() {return type;}public static ExchangeTypeEnum getEnum(String type) {ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();for (ExchangeTypeEnum exchange : exchangeArrays) {if (exchange.getName().equals(type)) {return exchange;}}return ExchangeTypeEnum.UNKNOWN;}}
3.1 直连交换机Direct

直连交换机就是让不同的Key精确的路由到不同的队列中,队列C的Key只有队列C可以接收到消息,队列D的key只有队列D可以收到消息,精确路由

3.1.1 生产者

在java包下 新建 subscribe3/direct包,然后在包下
新建 SubscribeConst

package subscrib3.direct;public class SubscribeConst {/*** 消息订阅队列 C*/public final static String SUBSCRIBE_QUEUE_NAME_DIRECT_C = "subscribe_queue_direct_C";/*** 消息订阅队列 D*/public final static String SUBSCRIBE_QUEUE_NAME_DIRECT_D = "subscribe_queue_direct_D";/*** 路由RoutingKey*/public final static String ROUTINGKEY_C = "rk_subscribe_queue_C";/*** 路由RoutingKey*/public final static String ROUTINGKEY_D = "rk_subscribe_queue_D";}

新建生产者 SubscribeProducerDirect

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.time.LocalDate;
import java.time.LocalTime;import static subscrib3.direct.SubscribeConst.*;public class SubscribeProducerDirect {/*** 生产 Direct直连 交换机的MQ消息*/public static void produceDirectExchangeMessage() throws Exception {// 获取到连接以及mq通道Connection connection = MqConnectUtil.getConnectionDefault();// 从连接中创建通道Channel channel = connection.createChannel();/*声明 直连交换机 交换机 String exchange,* 参数明细* 1、交换机名称* 2、交换机类型,direct*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);/*交换机和队列绑定String queue, String exchange, String routingKey* 参数明细* 1、队列名称* 2、交换机名称* 3、路由key rk.subscribe_queue_direct*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C);channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D);//定义消息内容(发布多条消息)String messageC = "Hello World! Time:" + LocalDate.now() + " " + LocalTime.now() + " id:C";/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C, null, messageC.getBytes());System.out.println("CCCCCCC ****  Producer  Sent Message: '" + messageC + "'");//        //定义消息内容(发布多条消息)String messageD = "Hello World! Time:" + LocalDate.now() + " " + LocalTime.now() + " id:D";/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D, null, messageD.getBytes());System.out.println("DDDDD ****  Producer  Sent Message: '" + messageD + "'");//关闭通道和连接channel.close();connection.close();}public static void main(String[] argv) throws Exception {//生产 10条 DirectExchange 的队列消息for (int i = 0; i < 10; i++) {produceDirectExchangeMessage();}}
}
3.1.2 消费者1

消费者1绑定 subscribe_queue_direct_C 队列

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_C;
import static subscrib3.direct.SubscribeConst.SUBSCRIBE_QUEUE_NAME_DIRECT_C;public class SubscribeQueueConsumerDirect1 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_C);System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_DIRECT_C, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.3 消费者2

消费者2 绑定 subscribe_queue_direct_D 队列

package subscrib3.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.io.IOException;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_D;
import static subscrib3.direct.SubscribeConst.SUBSCRIBE_QUEUE_NAME_DIRECT_D;public class SubscribeQueueConsumerDirect2 {public static void main(String[] argv) throws IOException {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), ROUTINGKEY_D);System.out.println(" **** Consumer->2 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SUBSCRIBE_QUEUE_NAME_DIRECT_D, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.1.4 执行结果

启动上生产者,生产10条消息,先看下 交换机及交换机绑定
exchange-direct-name 直连交换机通过 routingkey rk.subscribe_queue_direct 绑定了两个队列C和D
在这里插入图片描述
生产10条消息,分别设置不同的routingkey,可以看到 队列C和队列D各有10条,消息分别被分配到了C和D
在这里插入图片描述

队列中各10条数据

启动消费者C/D进行消费,消费完毕,队列消息清零
在这里插入图片描述
队列清零
在这里插入图片描述


到此我们完成了 通过RoutingKey去区分消费不同的消费者的消息订阅模式

3.2 相同的routingKey,绑定到不同的队列 结果

下面我们测试下,我们新建两个不同的队列,使用相同的routingKey,看下是不是每个队列都能消费到该消息,还是说竞争消费到同一条消息,C消费了D就不消费了
SameQueueConst.java

package subscrib3.same;public class SameQueueConst {/*** 消息订阅队列 C*/public final static String SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C = "same_subscribe_queue_direct_C";/*** 消息订阅队列 D*/public final static String SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D = "same_subscribe_queue_direct_D";/*** 路由RoutingKey*/public final static String SAME_ROUTINGKEY= "rk_same_subscribe_queue";}

生产者 SubscribeProducerDirect.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.time.LocalDate;
import java.time.LocalTime;import static subscrib3.direct.SubscribeConst.ROUTINGKEY_C;
import static subscrib3.direct.SubscribeConst.ROUTINGKEY_D;
import static subscrib3.same.SameQueueConst.*;public class SameSubscribeProducerDirect {/*** 生产 Direct直连 交换机的MQ消息*/public static void produceDirectExchangeMessage() throws Exception {// 获取到连接以及mq通道Connection connection = MqConnectUtil.getConnectionDefault();// 从连接中创建通道Channel channel = connection.createChannel();/*声明 直连交换机 交换机 String exchange,* 参数明细* 1、交换机名称* 2、交换机类型,direct*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);/*交换机和队列绑定String queue, String exchange, String routingKey* 参数明细* 1、队列名称* 2、交换机名称* 3、路由key rk.subscribe_queue_direct*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);for (int i = 0; i < 10; i++) {//定义消息内容(发布多条消息)String messageC = "id=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body* exchange - 交换机  DirectExchange* queuename - 队列信息* props - 参数信息* message 消息体 byte[]类型*/channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY, null, messageC.getBytes());System.out.println("****  Producer  Sent Message: '" + messageC + "'");}//关闭通道和连接channel.close();connection.close();}public static void main(String[] argv) throws Exception {//生产 10条 DirectExchange 的队列消息produceDirectExchangeMessage();}
}

消费者1 SameSubscribeQueueConsumerDirect1.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import static subscrib3.same.SameQueueConst.SAME_ROUTINGKEY;
import static subscrib3.same.SameQueueConst.SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C;public class SameSubscribeQueueConsumerDirect1 {public static void main(String[] argv) throws Exception {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_C, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}

消费者2 SameSubscribeQueueConsumerDirect2.java

package subscrib3.same;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;import java.io.IOException;import static subscrib3.same.SameQueueConst.SAME_ROUTINGKEY;
import static subscrib3.same.SameQueueConst.SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D;public class SameSubscribeQueueConsumerDirect2 {public static void main(String[] argv) throws IOException {Connection connection = null;Channel channel = null;try {connection = MqConnectUtil.getConnectionDefault();channel = connection.createChannel();/*声明交换机 String exchange* 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());/*声明队列* 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, ExchangeTypeEnum.DIRECT.getName(), SAME_ROUTINGKEY);System.out.println(" **** Consumer->2 Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);/* 消息确认机制* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态*          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈*          !!!!!! 注意这里是 false,手动确认*/channel.basicConsume(SAME_SUBSCRIBE_QUEUE_NAME_DIRECT_D, false, consumer);int count = 0;while (count < 10) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");doSomeThing(message);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);count++;}} catch (Exception e) {e.printStackTrace();} finally {channel.close();connection.close();}}/*** 模拟处理复杂逻辑:休眠100ms** @param message* @throws Exception*/public static void doSomeThing(String message) throws Exception {//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑Thread.sleep(100);}
}
3.2.1运行结果

生产10条 ,C和D全都收到10条
在这里插入图片描述

运行消费者C和D,消费完毕
在这里插入图片描述
由此可以看出,多个队列绑定同一个直连exchange的key,每个消费者都会消费这条消息
类似于广播fanout

3.3 不同的routingKey,绑定到同一个队列 结果

下面我们把上面的代码改下,新建1个队列,绑定两个Routingkey,看看发往 两个routingKey的消息,当前队列是否会收到两个key的,还是说key只能绑定一个,另一个会被覆盖,不会接受消息

3.3.1 运行结果

在这里插入图片描述

开启消费者
每个id有两条消息被消费,消费完毕,队列清零
在这里插入图片描述
看下队列绑定关系
该队列绑定多个Routingkey和Exchange,说明一个队列可以同时绑定多个不同的Routingkey,Routingkey之间互相不影响
在这里插入图片描述

!!! 特别注意一下 channel和connect 建立完毕后要 close,如果不close,会出现意想不到的效果
比如我的消费者代码里, while(count < 10 ) 换成 while(true) 一直循环等待消息消费,然后把 channel及connect的close代码删掉
这样你循环等待消费消息时候,链接是保持的,这样下次再运行 生产者,生产10条消息,channel的链接是保持的,这样就会出现部分消息丢失、并不是10条消息全都分配到C队列中的情况
!!!!

下篇我们介绍 RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机

这篇关于RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/965435

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

[MySQL表的增删改查-进阶]

🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 💻💻💻数据库约束 🔭🔭🔭约束类型 not null: 指示某列不能存储 NULL 值unique: 保证某列的每行必须有唯一的值default: 规定没有给列赋值时的默认值.primary key:

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一