RabbitMQ(二)七种工作模式

2024-05-25 11:12
文章标签 模式 工作 rabbitmq 七种

本文主要是介绍RabbitMQ(二)七种工作模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 概述:工作模式(七种)
    • 1. "Hello World!"
    • 2. Work Queues(工作队列模式)
    • 3. Publish/Subscribe(发布订阅模式)
    • 4. Routing
    • 5. Topics
    • 6. RPC
    • 7. Publisher Confirms
  • 详细
    • 1. "Hello World!"
    • 2. Work Queues(工作队列模式)
    • 3. Publish/Subscribe(发布订阅模式)
    • 4. Routing
    • 5. Topics
    • 6. RPC
    • 7. Publisher Confirms
  • 小结

概述:工作模式(七种)

官网链接
https://www.rabbitmq.com/tutorials

1. “Hello World!”

在这里插入图片描述

2. Work Queues(工作队列模式)

多个消费者消费同一队列
在这里插入图片描述

3. Publish/Subscribe(发布订阅模式)

通过exchange广播到多个queue中
在这里插入图片描述

4. Routing

通过route key发布到指定queue
在这里插入图片描述

5. Topics

通过pattern匹配一个或者多个queue
在这里插入图片描述

6. RPC

同步调用
在这里插入图片描述

7. Publisher Confirms

可靠性投递

详细

// rabbitMQ版本:3.13-management
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

1. “Hello World!”

2. Work Queues(工作队列模式)

在这里插入图片描述
多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系

Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务
可以提高消息处理的效率。

//生产者import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  public class Producer {  public static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  for (int i = 1; i <= 10; i++) {  String body = i+"hello rabbitmq~~~";  channel.basicPublish("",QUEUE_NAME,null,body.getBytes());  }  channel.close();  connection.close();  }  }
//消费者1public class Consumer1 {  static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("Consumer1 body:"+new String(body));  }  };  channel.basicConsume(QUEUE_NAME,true,consumer);  }  }
//消费者2
public class Consumer2 {static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}}

3. Publish/Subscribe(发布订阅模式)

交换机概念

  • 生产者不是把消息直接发送到队列,而是发送到交换机
  • 交换机接收消息,而如何处理消息取决于交换机的类型

交换机有如下3种常见类型

  • Fanout:广播,将消息发送给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因
此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那
么消息会丢失!

在这里插入图片描述
在这里插入图片描述
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机
  • 发布订阅模式绑定指定交换机
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
//生产者import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class Producer {  public static void main(String[] args) throws Exception {  // 1、获取连接Connection connection = ConnectionUtil.getConnection();  // 2、创建频道Channel channel = connection.createChannel();  // 参数1. exchange:交换机名称// 参数2. type:交换机类型//     DIRECT("direct"):定向//     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  //     TOPIC("topic"):通配符的方式  //     HEADERS("headers"):参数匹配  // 参数3. durable:是否持久化  // 参数4. autoDelete:自动删除  // 参数5. internal:内部使用。一般false  // 参数6. arguments:其它参数  String exchangeName = "test_fanout";  // 3、创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  // 4、创建队列String queue1Name = "test_fanout_queue1";  String queue2Name = "test_fanout_queue2";  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 5、绑定队列和交换机// 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则//   如果交换机的类型为fanout,routingKey设置为""  channel.queueBind(queue1Name,exchangeName,"");  channel.queueBind(queue2Name,exchangeName,"");  String body = "日志信息:张三调用了findAll方法...日志级别:info...";  // 6、发送消息channel.basicPublish(exchangeName,"",null,body.getBytes());  // 7、释放资源channel.close();  connection.close();  }  }
//消费者1import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue1Name = "test_fanout_queue1";  channel.queueDeclare(queue1Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue1Name,true,consumer);  }  }//消费者2
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue2Name = "test_fanout_queue2";  channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue2Name,true,consumer);  }  }

4. Routing

在这里插入图片描述

路由key相当于交叉路口指示牌根据不同指示牌走不同的路

通过『路由绑定』的方式,把交换机和队列关联起来

  • 交换机和队列通过路由键进行绑定
  • 生产者发送消息时不仅要指定交换机,还要指定路由键
  • 交换机接收到消息会发送到路由键绑定的队列
  • 在编码上与 Publish/Subscribe发布与订阅模式的区别:
    • 交换机的类型为:Direct
    • 队列绑定交换机的时候需要指定routing key
//生产者public class Producer {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String exchangeName = "test_direct";  // 创建交换机  channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  // 创建队列  String queue1Name = "test_direct_queue1";  String queue2Name = "test_direct_queue2";  // 声明(创建)队列  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 队列绑定交换机  // 队列1绑定error  channel.queueBind(queue1Name,exchangeName,"error");  // 队列2绑定info error warning  channel.queueBind(queue2Name,exchangeName,"info");  channel.queueBind(queue2Name,exchangeName,"error");  channel.queueBind(queue2Name,exchangeName,"warning");  String message = "日志信息:张三调用了delete方法.错误了,日志级别error";// 发送消息  channel.basicPublish(exchangeName,"error",null,message.getBytes());System.out.println(message);  // 释放资源  channel.close();  connection.close();  }  }
//消费者1public class Producer {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String exchangeName = "test_direct";  // 创建交换机  channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  // 创建队列  String queue1Name = "test_direct_queue1";  String queue2Name = "test_direct_queue2";  // 声明(创建)队列  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 队列绑定交换机  // 队列1绑定error  channel.queueBind(queue1Name,exchangeName,"error");  // 队列2绑定info error warning  channel.queueBind(queue2Name,exchangeName,"info");  channel.queueBind(queue2Name,exchangeName,"error");  channel.queueBind(queue2Name,exchangeName,"warning");  String message = "日志信息:张三调用了delete方法.错误了,日志级别error";// 发送消息  channel.basicPublish(exchangeName,"error",null,message.getBytes());System.out.println(message);  // 释放资源  channel.close();  connection.close();  }  }//消费者2
public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue2Name = "test_direct_queue2";  channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("Consumer2 将日志信息存储到数据库.....");  }  };  channel.basicConsume(queue2Name,true,consumer);  }  }

5. Topics

在这里插入图片描述

  • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队
    列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用
    通配符
  • Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,
    例如:item.insert
  • 通配符规则:
    • #:匹配零个或多个词
    • *:匹配一个词
//生产者public class Producer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);// 绑定队列和交换机// 参数1. queue:队列名称// 参数2. exchange:交换机名称// 参数3. routingKey:路由键,绑定规则//      如果交换机的类型为fanout ,routingKey设置为""// routing key 常用格式:系统的名称.日志的级别。// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库channel.queueBind(queue1Name, exchangeName, "#.error");channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*");// 分别发送消息到队列:order.info、goods.info、goods.errorString body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
//        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());//        body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
//        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());channel.close();connection.close();}}
//消费者public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue1";channel.queueDeclare(QUEUE_NAME, true, false, false, null);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:" + new String(body));}};channel.basicConsume(QUEUE_NAME, true, consumer);}}//消费者2
public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue2";channel.queueDeclare(QUEUE_NAME, true, false, false, null);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:" + new String(body));}};channel.basicConsume(QUEUE_NAME, true, consumer);}}

6. RPC

在这里插入图片描述
远程过程调用,本质上是同步调用,和我们使用OpenFeign调用远程接口一

7. Publisher Confirms

发送端消息确认

小结

直接发送到队列:底层使用了默认交换机

  • 经过交换机发送到队列
  • Fanout:没有Routing key直接绑定队列
  • Direct:通过Routing key绑定队列,消息发送到绑定的队列上
    • 一个交换机绑定一个队列:定点发送
    • 一个交换机绑定多个队列:广播发送
  • Topic:针对Routing key使用通配符

这篇关于RabbitMQ(二)七种工作模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1001343

相关文章

找完工作该补充的东西

首先: 锻炼身体,包括乒乓球,羽毛球,都必须练习,学习,锻炼身体等是一个很重要的与人交际沟通的方式; 打牌,娱乐:会玩是一个人很重要的交际沟通的法宝; 摄影:这个是一个兴趣爱好,也是提高自己的审美,生活品质,当然也是与人沟通的重要途径; 做饭:这个的话就是对自己,对朋友非常有益的一件事情;

如何开启和关闭3GB模式

https://jingyan.baidu.com/article/4d58d5414dfc2f9dd4e9c082.html

十四、观察者模式与访问者模式详解

21.观察者模式 21.1.课程目标 1、 掌握观察者模式和访问者模式的应用场景。 2、 掌握观察者模式在具体业务场景中的应用。 3、 了解访问者模式的双分派。 4、 观察者模式和访问者模式的优、缺点。 21.2.内容定位 1、 有 Swing开发经验的人群更容易理解观察者模式。 2、 访问者模式被称为最复杂的设计模式。 21.3.观察者模式 观 察 者 模 式 ( Obser

工作流Activiti初体验—流程撤回【二】

已经玩工作流了,打算还是研究一下撤回的功能。但是流程图里面并不带撤回的组件,所以需要自己动态改造一下,还是延续上一个流程继续试验撤回功能。《工作流Activiti初体验【一】》 完整流程图 我们研究一下分发任务撤回到发起任务,其他环节的撤回类似 撤回的原理大概如下: 将分发任务后面的方向清空,把发起任务拼接到原来的判断网关,然后结束分发任务,这样流程就到发起任务了 此时的流程如上图,

工作流Activiti初体验【一】

在这里记录一下我的Activiti历程:(以下示例不涉及真实业务,所有逻辑均建立在学习的基础上) bpmn图 发起任务我设置了一个权限组user1,只要是这个权限的用户都可以发起任务 分发任务我设置了一个用户组,用户组中每个用户都可以处理这步流程,只要有一个人处理这步任务,分发的流程就算结束了 分发任务这一环节还有个判断,允许任务下发和不允许任务下发 任务分发完成则来到子流程,每个被分

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

七种排序方式总结

/*2018.01.23*A:YUAN*T:其中排序算法:冒泡排序,简单排序,直接插入排序,希尔排序,堆排序,归并排序,快速排序*/#include <stdio.h>#include <math.h>#include <malloc.h>#define MAXSIZE 10000#define FALSE 0#define TRUE 1typedef struct {i

Builder模式的实现

概念 在创建复杂对象时,将创建该对象的工作交给一个建造者,这个建造者就是一个Builder。在日常的开发中,常常看到,如下这些代码: AlertDialog的实现 AlertDialog.Builder builder = new AlertDialog.Builder(context);builder.setMessage("你好建造者");builder.setTitle

[分布式网络通讯框架]----ZooKeeper下载以及Linux环境下安装与单机模式部署(附带每一步截图)

首先进入apache官网 点击中间的see all Projects->Project List菜单项进入页面 找到zookeeper,进入 在Zookeeper主页的顶部点击菜单Project->Releases,进入Zookeeper发布版本信息页面,如下图: 找到需要下载的版本 进行下载既可,这里我已经下载过3.4.10,所以以下使用3.4.10进行演示其他的步骤。