RabbitMQ的发布、订阅模式(广播)

2024-06-04 20:58

本文主要是介绍RabbitMQ的发布、订阅模式(广播),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在本文中,我们将会讲解另一种RabbitMQ消息传递模式。

即将同一条消息传递给多个接收者。这种模式也称之为发布、订阅模式

场景描述

在本文中,我们将会实现一个日志处理系统。
该系统包含两个部分:

  1. 第一部分是产生日志。
  2. 第二部分是接收日志并打印日志。

在运行的过程中,我们会启动多个接收日志并打印日志的服务。
我们希望可以看到每个服务都接收到全部的日志信息。也就是说,服务1产生的日志最终会广播至所有的接收者。

Exchanges

在之前的文章中,我们讲解了一个RabbitMQ模型由以下几个部分组成:

  • 消息生产者:产生消息的来源。
  • 队列:存储尚未处理的消息。
  • 消息处理者:接收消息并处理。

实际上,这个模型仅仅是一个简化版的RabbitMQ模型。
对于真实的RabbitMQ模型而言,消息生产者是不会直接将消息传入队列中的。相反,消息生产者会把消息发送给Exchanges(中转所),而Exchanges(中转所)在接收到消息后,才会把消息插入到队列中。
在Exchanges(中转所)中,实现的功能包括:

  • 该消息是否需要插入某个队列中
  • 该消息仅需要发送至一个队列还是需要发送至多个队列
  • 该消息是否根据某些Exchanges(中转所)的类型需要被忽略等等。

Exchanges包含如下几个类型:direct, topic, headers以及fanout。
在本文中,我们首先来学习fanout类型。
fanout的含义是将每条消息都广播发送给所有的消息接收者。
例如,可以实现如下:

channel.exchange_declare(exchange='logs',exchange_type='fanout')

即声明exchange的类型为fanout,且名称为logs。
在声明了exchange后,我们可以继续发布消息:

channel.basic_publish(exchange='logs',routing_key='',body=message)

临时队列

在之前的文章中,我们需要指定一个特定的队列名称,因为我们需要将一组Worker用于接收某个指定的队列名称中的消息。
但是,对于发布、订阅模式场景而言,我们需要的是:

  1. 接收全部的消息,而不是其中的一部分消息。
  2. 只接收最新产生的消息,而忽略之前传入的消息。

因此,我们需要完成以下两个部分的工作:

  1. 每次在连接到RabbitMQ时,创建一个空的队列。实现该功能的方式是我们可以创建一个随机名称的队列。

result = channel.queue_declare()
  1. 在不指定参数时,默认将会产生一个随机字符串组成的队列。

  2. 此外,我们需要在创建队列时添加一个额外的参数:

result = channel.queue_declare(exclusive=True)
  1. exclusive=True表示当消息接收者断开连接时,字段删除该队列。

将Exchange与消息队列进行关联

目前,我们已经创建了一个fanout类型的exchange。同时,在消息接收者中也创建了队列。
现在,我们需要做的是将exchange和消息队列关联起来。

channel.queue_bind(exchange='logs',queue=result.method.queue)

 

完整实现

最后,我们来给出消息生产者和消息接收者的完整实现:

消息生产者: emit_log.py

#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()

消息接收者: receive_logs.py

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

实际来看一下效果吧:
我们可以先启动两个消息接收者:
receiver1:

python receive_logs.py

receiver2:

python receive_logs.py

然后,我们来发送几条消息:

python emit_log.py
python emit_log.py

这篇关于RabbitMQ的发布、订阅模式(广播)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1031100

相关文章

如何开启和关闭3GB模式

https://jingyan.baidu.com/article/4d58d5414dfc2f9dd4e9c082.html

十四、观察者模式与访问者模式详解

21.观察者模式 21.1.课程目标 1、 掌握观察者模式和访问者模式的应用场景。 2、 掌握观察者模式在具体业务场景中的应用。 3、 了解访问者模式的双分派。 4、 观察者模式和访问者模式的优、缺点。 21.2.内容定位 1、 有 Swing开发经验的人群更容易理解观察者模式。 2、 访问者模式被称为最复杂的设计模式。 21.3.观察者模式 观 察 者 模 式 ( Obser

WordPress网创自动采集并发布插件

网创教程:WordPress插件网创自动采集并发布 阅读更新:随机添加文章的阅读数量,购买数量,喜欢数量。 使用插件注意事项 如果遇到404错误,请先检查并调整网站的伪静态设置,这是最常见的问题。需要定制化服务,请随时联系我。 本次更新内容 我们进行了多项更新和优化,主要包括: 界面设置:用户现在可以更便捷地设置文章分类和发布金额。代码优化:改进了采集和发布代码,提高了插件的稳定

AI赋能天气:微软研究院发布首个大规模大气基础模型Aurora

编者按:气候变化日益加剧,高温、洪水、干旱,频率和强度不断增加的全球极端天气给整个人类社会都带来了难以估计的影响。这给现有的天气预测模型提出了更高的要求——这些模型要更准确地预测极端天气变化,为政府、企业和公众提供更可靠的信息,以便做出及时的准备和响应。为了应对这一挑战,微软研究院开发了首个大规模大气基础模型 Aurora,其超高的预测准确率、效率及计算速度,实现了目前最先进天气预测系统性能的显著

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

Builder模式的实现

概念 在创建复杂对象时,将创建该对象的工作交给一个建造者,这个建造者就是一个Builder。在日常的开发中,常常看到,如下这些代码: AlertDialog的实现 AlertDialog.Builder builder = new AlertDialog.Builder(context);builder.setMessage("你好建造者");builder.setTitle

[分布式网络通讯框架]----ZooKeeper下载以及Linux环境下安装与单机模式部署(附带每一步截图)

首先进入apache官网 点击中间的see all Projects->Project List菜单项进入页面 找到zookeeper,进入 在Zookeeper主页的顶部点击菜单Project->Releases,进入Zookeeper发布版本信息页面,如下图: 找到需要下载的版本 进行下载既可,这里我已经下载过3.4.10,所以以下使用3.4.10进行演示其他的步骤。

RabbitMQ实践——临时队列

临时队列是一种自动删除队列。当这个队列被创建后,如果没有消费者监听,则会一直存在,还可以不断向其发布消息。但是一旦的消费者开始监听,然后断开监听后,它就会被自动删除。 新建自动删除队列 我们创建一个名字叫queue.auto.delete的临时队列 绑定 我们直接使用默认交换器,所以不用创建新的交换器,也不用建立绑定关系。 实验 发布消息 我们在后台管理页面的默认交换器下向这个队列

物联网系统运维——移动电商应用发布,Tomcat应用服务器,实验CentOS 7安装JDK与Tomcat,配置Tomcat Web管理界面

一.Tomcat应用服务器 1.Tomcat介绍 Tomcat是- -个免费的开源的Ser Ivet容器,它是Apache基金会的Jakarta 项目中的一个核心项目,由Apache, Sun和其他一 些公司及个人共同开发而成。Tomcat是一一个小型的轻量级应用服务器,在中小型系统和并发访问用户不是很多的场合下被普遍使用,是开发和调试JSP程序的首选。 在Tomcat中,应用程序的成部署很简