RabbitMQ消息队列(五):Routing 消息路由(转)

2024-03-16 10:32

本文主要是介绍RabbitMQ消息队列(五):Routing 消息路由(转),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  上篇文章中,我们构建了一个简单的日志系统。接下来,我们将丰富它:能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。

1. Bindings绑定

    上篇文章中我们是这么做的绑定:

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. channel.queue_bind(exchange=exchange_name,  
  2.                    queue=queue_name)  
    绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。

    实际上,绑定可以带 routing_key  这个参数。其实这个参数的名称和 basic_publish  的参数名是相同了。为了避免混淆,我们把它成为binding key。
    使用一个key来创建binding :
[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. channel.queue_bind(exchange=exchange_name,  
  2.                    queue=queue_name,  
  3.                    routing_key='black')  
对于fanout的exchange来说,这个参数是被忽略的。

2. Direct exchange

  Direct exchange的路由算法非常简单:通过binding key的完全匹配,可以通过下图来说明。 

    exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
    当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。

3. Multiple bindings

多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。

   

4. Emitting logs

首先是我们要创建一个direct的exchange:

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. channel.exchange_declare(exchange='direct_logs',  
  2.                          type='direct')  
我们将使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。
publish:

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. channel.basic_publish(exchange='direct_logs',  
  2.                       routing_key=severity,  
  3.                       body=message)  
我们使用三种severity:'info', 'warning', 'error'.


5. Subscribing

对于queue,我们需要绑定severity:

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. result = channel.queue_declare(exclusive=True)  
  2. queue_name = result.method.queue  
  3.   
  4. for severity in severities:  
  5.     channel.queue_bind(exchange='direct_logs',  
  6.                        queue=queue_name,  
  7.                        routing_key=severity)  

6. 最终版本

The code for emit_log_direct.py:

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. channel.exchange_declare(exchange='direct_logs',  
  10.                          type='direct')  
  11.   
  12. severity = sys.argv[1if len(sys.argv) > 1 else 'info'  
  13. message = ' '.join(sys.argv[2:]) or 'Hello World!'  
  14. channel.basic_publish(exchange='direct_logs',  
  15.                       routing_key=severity,  
  16.                       body=message)  
  17. print " [x] Sent %r:%r" % (severity, message)  
  18. connection.close()  

The code for  receive_logs_direct.py :

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. #!/usr/bin/env python  
  2. import pika  
  3. import sys  
  4.   
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8.   
  9. channel.exchange_declare(exchange='direct_logs',  
  10.                          type='direct')  
  11.   
  12. result = channel.queue_declare(exclusive=True)  
  13. queue_name = result.method.queue  
  14.   
  15. severities = sys.argv[1:]  
  16. if not severities:  
  17.     print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \  
  18.                          (sys.argv[0],)  
  19.     sys.exit(1)  
  20.   
  21. for severity in severities:  
  22.     channel.queue_bind(exchange='direct_logs',  
  23.                        queue=queue_name,  
  24.                        routing_key=severity)  
  25.   
  26. print ' [*] Waiting for logs. To exit press CTRL+C'  
  27.   
  28. def callback(ch, method, properties, body):  
  29.     print " [x] %r:%r" % (method.routing_key, body,)  
  30.   
  31. channel.basic_consume(callback,  
  32.                       queue=queue_name,  
  33.                       no_ack=True)  
  34.   
  35. channel.start_consuming()  
我们想把warning和error的log记录到一个文件中:

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. $ python receive_logs_direct.py warning error > logs_from_rabbit.log  
打印所有log到屏幕:

[python]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. $ python receive_logs_direct.py info warning error  
  2.  [*] Waiting for logs. To exit press CTRL+C  


尊重原创,转载请注明出处 anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19630147

参考资料:

1. http://www.rabbitmq.com/tutorials/tutorial-four-python.html

这篇关于RabbitMQ消息队列(五):Routing 消息路由(转)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/815182

相关文章

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程

《SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程》本文详细介绍了如何在虚拟机和宝塔面板中安装RabbitMQ,并使用Java代码实现消息的发送和接收,通过异步通讯,可以优化... 目录一、RabbitMQ安装二、启动RabbitMQ三、javascript编写Java代码1、引入

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Go路由注册方法详解

《Go路由注册方法详解》Go语言中,http.NewServeMux()和http.HandleFunc()是两种不同的路由注册方式,前者创建独立的ServeMux实例,适合模块化和分层路由,灵活性高... 目录Go路由注册方法1. 路由注册的方式2. 路由器的独立性3. 灵活性4. 启动服务器的方式5.

关于Gateway路由匹配规则解读

《关于Gateway路由匹配规则解读》本文详细介绍了SpringCloudGateway的路由匹配规则,包括基本概念、常用属性、实际应用以及注意事项,路由匹配规则决定了请求如何被转发到目标服务,是Ga... 目录Gateway路由匹配规则一、基本概念二、常用属性三、实际应用四、注意事项总结Gateway路由