RabbitMQ(1) 三种exchange总结

2024-08-21 15:32

本文主要是介绍RabbitMQ(1) 三种exchange总结,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

……List……

1.AMQP

2.几个基本概念

3.三种 exchange

4.思考总结



……1.AMQP……


        在AMQP(高级消息队列协议)协议中,queue、exchange、binding构成了协议的核心。exchange是一个重要的组件,那么如何理解exchange呢?刚刚开始学习消息队列的时候,想起作品展时候做的一个基于SMTP协议的邮箱。不同的是MQ中没有发送者和接受者的概念,也不是客户端服务器的概念。MQ专注于应用程序之间的消息通信,是一种生产者和消费者之间的关系。exchange是producer和consumer之间消息交换机。




……2.几个基本概念……


        在介绍exchange类型之前,需要了解几个基本的概念。

  • Broker:消息队列服务器实体
  • exchange:消息交换机,指定消息规则,处理消息和队列之间的关系
  • queue:队列载体,消息投入队列中
  • binding:绑定,把exchange和queue按照路由规则绑定起来
  • Routing Key:路由关键字。exchange根据这个进行消息投递
  • vhost:虚拟消息服务器,每个RabbitMQ服务器都能够创建虚拟消息服务器。
  • producer:生产者,投递消息的程序
  • consumer:消费者,接受消息的程序
  • channel:信道(重要概念),打开信道才能进行通信,一个channel代码一个会话任务。



……3.三种exchange……

        

           vhost通过创建channel连接程序,会根据routingKey将M从exchange到queue,那根据什么规则进行投递消息?如果有好多不同类型的M,那么应该如何选择?AMQP中定义了不同类型的exchange就可以发挥作用了。一种三种类型:fanout,topic,direct。每一种类型实现不同的路由算法。



3.1 Fanout Exchange 不规则路由


         将收到的消息广播到绑定的队列上。当你发送一条消息到fanout exchange上,它会把消息投递给所有附加到此交换机上面的队列。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 


生产者(提供服务的系统):

package cn.itcast.rabbitmq.ps;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "商品已经新增,id = 1000";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}


消费者(调用服务的系统):

package cn.itcast.rabbitmq.ps;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv {private final static String QUEUE_NAME = "test_queue_fanout_1";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 前台系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


3.2 Direct Exchange 处理路由


     生产者在发送消息的时候都需要指定一个routingkey和exchange,exchange在接到rooutingkey的时候与该exchange关联的所有binding中的bindingkey进行比较,如果相等,则发送到binding对应的queue中。


生产者(提供服务的系统):

package cn.itcast.rabbitmq.routing;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "删除商品, id = 1001";channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}


消费者(调用服务的系统):
package cn.itcast.rabbitmq.routing;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv {private final static String QUEUE_NAME = "test_queue_direct_1";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 前台系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


3.3 Topic Exchange 通配符路由


         将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。这个很像Javascript中的正则表达式。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。


生产者(提供服务的系统):

package cn.itcast.rabbitmq.topic;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "删除商品,id = 1001";channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}


消费者(调用服务的系统):

消费者1:

package cn.itcast.rabbitmq.topic;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv {private final static String QUEUE_NAME = "test_queue_topic_1";private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 前台系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}


消费者2:

package cn.itcast.rabbitmq.topic;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class Recv2 {private final static String QUEUE_NAME = "test_queue_topic_2";private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" 搜索系统: '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}



……4.思考总结……


          总结一下生产者和消费者都需要做什么事情。多学习多思考。




这篇关于RabbitMQ(1) 三种exchange总结的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1093551

相关文章

Go语言中三种容器类型的数据结构详解

《Go语言中三种容器类型的数据结构详解》在Go语言中,有三种主要的容器类型用于存储和操作集合数据:本文主要介绍三者的使用与区别,感兴趣的小伙伴可以跟随小编一起学习一下... 目录基本概念1. 数组(Array)2. 切片(Slice)3. 映射(Map)对比总结注意事项基本概念在 Go 语言中,有三种主要

Python中连接不同数据库的方法总结

《Python中连接不同数据库的方法总结》在数据驱动的现代应用开发中,Python凭借其丰富的库和强大的生态系统,成为连接各种数据库的理想编程语言,下面我们就来看看如何使用Python实现连接常用的几... 目录一、连接mysql数据库二、连接PostgreSQL数据库三、连接SQLite数据库四、连接Mo

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构

Git提交代码详细流程及问题总结

《Git提交代码详细流程及问题总结》:本文主要介绍Git的三大分区,分别是工作区、暂存区和版本库,并详细描述了提交、推送、拉取代码和合并分支的流程,文中通过代码介绍的非常详解,需要的朋友可以参考下... 目录1.git 三大分区2.Git提交、推送、拉取代码、合并分支详细流程3.问题总结4.git push

Spring IOC的三种实现方式详解

《SpringIOC的三种实现方式详解》:本文主要介绍SpringIOC的三种实现方式,在Spring框架中,IOC通过依赖注入来实现,而依赖注入主要有三种实现方式,构造器注入、Setter注入... 目录1. 构造器注入(Cons编程tructor Injection)2. Setter注入(Setter

Kubernetes常用命令大全近期总结

《Kubernetes常用命令大全近期总结》Kubernetes是用于大规模部署和管理这些容器的开源软件-在希腊语中,这个词还有“舵手”或“飞行员”的意思,使用Kubernetes(有时被称为“... 目录前言Kubernetes 的工作原理为什么要使用 Kubernetes?Kubernetes常用命令总

linux报错INFO:task xxxxxx:634 blocked for more than 120 seconds.三种解决方式

《linux报错INFO:taskxxxxxx:634blockedformorethan120seconds.三种解决方式》文章描述了一个Linux最小系统运行时出现的“hung_ta... 目录1.问题描述2.解决办法2.1 缩小文件系统缓存大小2.2 修改系统IO调度策略2.3 取消120秒时间限制3

Linux alias的三种使用场景方式

《Linuxalias的三种使用场景方式》文章介绍了Linux中`alias`命令的三种使用场景:临时别名、用户级别别名和系统级别别名,临时别名仅在当前终端有效,用户级别别名在当前用户下所有终端有效... 目录linux alias三种使用场景一次性适用于当前用户全局生效,所有用户都可调用删除总结Linux

VUE动态绑定class类的三种常用方式及适用场景详解

《VUE动态绑定class类的三种常用方式及适用场景详解》文章介绍了在实际开发中动态绑定class的三种常见情况及其解决方案,包括根据不同的返回值渲染不同的class样式、给模块添加基础样式以及根据设... 目录前言1.动态选择class样式(对象添加:情景一)2.动态添加一个class样式(字符串添加:情

Python中实现进度条的多种方法总结

《Python中实现进度条的多种方法总结》在Python编程中,进度条是一个非常有用的功能,它能让用户直观地了解任务的进度,提升用户体验,本文将介绍几种在Python中实现进度条的常用方法,并通过代码... 目录一、简单的打印方式二、使用tqdm库三、使用alive-progress库四、使用progres