本文主要是介绍RabbitMQ练习(Remote procedure call (RPC)),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、RabbitMQ教程
《RabbitMQ Tutorials》https://www.rabbitmq.com/tutorials
2、环境准备
参考:《RabbitMQ练习(Hello World)》。
确保RabbitMQ、Sender、Receiver容器正常安装和启动。
root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
root@k0test1:~# docker start sender
root@k0test1:~# docker start receiver
root@k0test1:~# docker network inspect bridge
网络拓扑:
3、RPC练习
3.1 概述
前面练习了如何使用工作队列(Work queues)来在多个worker之间分配耗时的任务(time-consuming tasks)。
但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?那是一个不同的故事。这种模式通常被称为远程过程调用,或RPC。
在本练习中,我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分配的耗时任务,我们将创建一个返回斐波那契数(Fibonacci numbers)的虚拟RPC服务(dummy RPC service)。
3.2 Client interface
为了展示如何使用RPC服务,将创建一个简单的客户端类(a simple client class)。这个类将暴露一个名为call
的方法,该方法发送一个RPC请求,并阻塞直到收到响应。(原文:It's going to expose a method named call
which sends an RPC request and blocks until the answer is received)
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")
这段代码展示了如何使用一个RPC客户端来调用远程的斐波那契数计算服务。以下是代码的详细解释:
fibonacci_rpc = FibonacciRpcClient()
:这行代码创建了一个FibonacciRpcClient
的实例。FibonacciRpcClient
是一个RPC客户端,用于发送请求到RPC服务器,并接收响应。
result = fibonacci_rpc.call(4)
:这行代码调用了RPC客户端的call
方法,并将数字4作为参数传递给服务器。服务器将计算斐波那契数列中的第4个数,并将结果返回给客户端。
print(f"fib(4) is {result}")
:这行代码打印出计算结果。格式化字符串"fib(4) is {result}"
将计算得到的斐波那契数显示出来。在这个例子中,客户端发送了一个请求到RPC服务器,请求计算斐波那契数列的第4个数,然后服务器返回了结果,客户端接收并打印了这个结果。这是一个典型的RPC模式的应用,允许客户端像调用本地函数一样调用远程服务。
3.3 Callback queue
一般来说,通过RabbitMQ实现远程过程调用(RPC)是很简单的。客户端发送一个请求消息,服务器用一个响应消息来回复。为了能够接收到响应,客户端需要在请求中发送一个“回调”队列地址(原文:In order to receive a response the client needs to send a 'callback' queue address with the request.)。
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queuechannel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = callback_queue,),body=request)# ... and some code to read a response message from the callback_queue ...
这段Python代码是使用RabbitMQ实现RPC(远程过程调用)的一个示例。下面是代码的中文解释:
result = channel.queue_declare(queue='', exclusive=True)
:这行代码声明了一个私有的队列。queue_declare
方法用于声明一个新的队列,如果队列不存在,则创建它。queue=''
表示让RabbitMQ自动生成一个唯一的队列名称。exclusive=True
表示这个队列是私有的,只对声明它的连接可见。
callback_queue = result.method.queue
:这行代码将声明的队列名称赋值给callback_queue
变量,这个队列将被用作回调队列,以便服务器可以发送响应到这个队列。
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties(reply_to=callback_queue,), body=request)
:这行代码发布一个消息到RabbitMQ。exchange
参数为空字符串,表示使用默认的交换机。routing_key='rpc_queue'
表示消息将发送到名为rpc_queue
的队列。properties
参数是一个BasicProperties
对象,其中reply_to
属性设置为callback_queue
,这告诉服务器将响应消息发送到这个回调队列。body
参数是实际发送的消息体,这里假设是request
变量。注释
# ... and some code to read a response message from the callback_queue ...
:这部分代码应该包含从callback_queue
读取响应消息的逻辑,但在这个示例中没有给出。这段代码展示了客户端如何发送一个RPC请求,并设置回调队列以便接收服务器的响应。服务器需要监听
rpc_queue
队列,处理请求,并将响应消息发送到客户端指定的callback_queue
。
AMQP 0-9-1 协议预定义了一组共14个属性,这些属性与消息相关。除了以下几个常用的属性外,大多数属性很少使用:
- delivery_mode:标记消息是持久的(值为2)还是瞬时的(其他任何值)。你可能还记得这个属性是从第二个教程中学到的。
- content_type:用于描述消息编码的MIME类型。例如,对于常用的JSON编码,将此属性设置为
application/json
是一个好习惯。- reply_to:通常用于命名一个回调队列。
- correlation_id:用于将RPC响应与请求相关联,非常有用。
这些属性在RabbitMQ中非常有用,因为它们允许你控制消息的行为,以及如何在不同的应用程序组件之间进行通信。例如,
delivery_mode
可以帮助确保消息在RabbitMQ服务器重启后仍然可用,而reply_to
和correlation_id
使得RPC模式的实现成为可能,允许服务器将响应发送回正确的客户端。content_type
则有助于接收方理解消息的内容格式,从而正确地解析消息。
3.4 Correlation id
在上述介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这种方法效率不高,但幸运的是有更好的解决方案——让我们为每个客户端创建一个单一的回调队列。
这引发了一个新的问题:在队列中收到响应时,不清楚该响应属于哪个请求。这时,我们使用correlation_id
属性。我们将为每个请求设置一个唯一的值。之后,当我们在回调队列中收到消息时,我们会查看这个属性,并基于它将响应与请求匹配起来。如果我们看到一个未知的correlation_id
值,我们可以安全地丢弃该消息——它不属于我们的请求。
你可能会问,为什么我们应该忽略回调队列中的未知消息,而不是以错误失败?这是由于服务器端可能存在竞争条件。虽然不太可能,但RPC服务器可能在我们收到答案后但在发送请求确认消息之前就崩溃了。如果发生这种情况,重启的RPC服务器将再次处理请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而且RPC应该是幂等的(原文: That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.)。
RPC(远程过程调用)应该是幂等的,意味着对于同一个请求,无论服务器端接收到多少次相同的RPC调用,其结果都应该是一致的,不会对系统状态产生额外的影响。换句话说,即使一个请求被重复发送多次,它也只会产生一次实际的效应或结果。
幂等性在分布式系统中非常重要,特别是在网络通信和服务器处理中可能存在延迟或重试的情况下。如果RPC调用不是幂等的,那么重复的请求可能会导致数据不一致或系统状态错误。例如,如果一个RPC调用是用来增加某个计数器的值,幂等性保证了无论这个调用被执行多少次,计数器的最终值都只会增加一次。
在实际应用中,确保RPC调用的幂等性可以通过以下方式实现:
- 唯一标识符:为每个请求分配一个唯一的ID,这样即使请求被重复发送,服务器也可以通过这个ID识别并避免重复处理。
- 状态检查:在执行操作之前,服务器可以检查请求是否已经被处理过,如果已经处理,则不再执行。
- 事务控制:使用数据库事务或其他形式的事务控制来确保操作的原子性,即要么完全执行,要么完全不执行,从而避免部分执行导致的问题。
- 重试机制:设计客户端时,可以包含重试逻辑,但要确保重试不会对系统状态产生负面影响。
幂等性是分布式系统设计中的一个关键原则,有助于提高系统的健壮性和可靠性。
3.5 Summary
我们的RPC将按照以下方式工作:
- 当客户端启动时,它会创建一个匿名的专用回调队列(an anonymous exclusive callback queue)。
- 对于RPC请求,客户端发送一个带有两个属性的消息:reply_to,设置为回调队列,correlation_id,为每个请求设置一个唯一值。
- 请求被发送到一个名为rpc_queue的队列。
- RPC工作器(RPC worker)(也就是服务器)在该队列上等待请求。当出现请求时,它执行任务,并将结果消息发送回客户端,使用的队列来自reply_to字段。
- 客户端在回调队列上等待数据。当出现一条消息时,它会检查correlation_id属性。如果它与请求中的值匹配,它将响应返回给应用程序。
3.6 代码说明
3.6.1 rpc_server.py
进入receiver容器,vi编写rpc_server.py:
root@receiver:/# vi rpc_server.py
root@receiver:/# cat rpc_server.py
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)def on_request(ch, method, props, body):n = int(body)print(f" [.] fib({n})")response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = \props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)print(" [x] Awaiting RPC requests")
channel.start_consuming()
root@receiver:/#
这段代码是一个使用Python编写的RabbitMQ RPC(远程过程调用)服务器的实现。它使用
pika
库与RabbitMQ进行交互。下面是对代码中每个部分的详细说明:
import pika
:导入pika
库,这是一个Python客户端库,用于与RabbitMQ消息代理进行通信。创建RabbitMQ连接:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
这行代码创建了一个到RabbitMQ服务器的阻塞式连接。
host='172.17.0.2'
指定了RabbitMQ服务器的IP地址。创建通信通道:
channel = connection.channel()
通过连接创建了一个通信通道,用于发送和接收消息。
声明队列:
channel.queue_declare(queue='rpc_queue')
声明了一个名为
rpc_queue
的队列。如果该队列不存在,则会被创建。定义斐波那契函数:
def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)
这是一个递归函数,用于计算斐波那契数列的第
n
项。定义请求处理回调函数:
def on_request(ch, method, props, body):n = int(body)print(f" [.] fib({n})")response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)
这个函数是当队列
rpc_queue
接收到消息时会被调用的回调函数。它执行以下操作:
- 将消息体(
body
)转换为整数n
。- 调用
fib
函数计算斐波那契数。- 打印正在处理的请求。
- 使用
basic_publish
方法将响应消息发送到客户端指定的回复队列(reply_to
)。- 使用
basic_ack
方法确认消息已被处理。设置QoS(服务质量):
channel.basic_qos(prefetch_count=1)
这行代码设置了预取数量为1,意味着每次只有一个消息被分派给消费者,直到消费者确认该消息。
设置消息消费:
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
这行代码设置了消息消费,指定了队列
rpc_queue
和消息处理回调函数on_request
。打印等待RPC请求的消息:
print(" [x] Awaiting RPC requests")
开始消费消息:
channel.start_consuming()
这行代码启动了消息通道的消费过程,使得服务器开始监听和处理消息。
整个代码的目的是创建一个RPC服务器,它监听
rpc_queue
队列中的请求,对每个请求计算斐波那契数,并将结果发送回请求者。这个服务器是阻塞式的,意味着它会一直运行,直到被显式地停止或发生错误。
以下具体解释on_request是怎么被调用的:
on_request
函数被调用的过程涉及到RabbitMQ的RPC(远程过程调用)模式和消息确认机制。以下是详细步骤:
客户端发送RPC请求:客户端向RabbitMQ发送一个RPC请求消息,这个消息会被发送到指定的队列(在这个例子中是'rpc_queue')。
消息到达队列:RabbitMQ将客户端发送的消息放入'rpc_queue'队列中。
服务端监听队列:服务端(即上述代码)通过
channel.basic_consume
方法设置了一个监听器,指定了队列'rpc_queue'和消息处理回调函数on_request
。消息被消费:当RabbitMQ中的'rpc_queue'队列接收到新消息时,它会通知服务端。
回调函数被触发:服务端收到通知后,调用
on_request
函数。这个函数接收四个参数:
ch
:当前的通道对象。method
:描述了消息的接收方法。props
:包含了消息的属性,如reply_to
(客户端期望接收响应的队列名称)和correlation_id
(用于匹配请求和响应的相关ID)。body
:消息体,包含了客户端发送的数据。处理请求:
on_request
函数内部,首先将消息体body
转换为整数n
,然后调用fib
函数计算斐波那契数。发送响应:计算完成后,使用
ch.basic_publish
方法将计算结果发送回客户端。这里会使用reply_to
属性指定的路由键,以及correlation_id
来确保响应能够正确匹配到原始请求。确认消息:最后,通过
ch.basic_ack
方法确认消息已被处理,这样RabbitMQ就知道可以移除这条消息了。整个过程是异步的,服务端在
channel.start_consuming()
调用后进入等待状态,一旦有消息到达,就会触发on_request
函数的执行。
服务器代码相当直接明了:
首先,我们建立连接并声明队列rpc_queue
。 我们声明了斐波那契函数。它假设输入的是有效的正整数。(不要指望这个函数能处理大数字,这可能是最慢的递归实现方式)。 我们为basic_consume
声明了一个回调函数on_request
,这是RPC服务器的核心。当收到请求时,它会被执行。它完成工作并将响应发送回去。 我们可能想要运行多个服务器进程。为了将负载平均分配给多个服务器,我们需要设置prefetch_count
参数。
3.6.2 rpc_client.py
进入sender容器,vi编写rpc_client.py:
root@sender:/# vi rpc_client.py
root@sender:/# cat rpc_client.py
import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))self.channel = self.connection.channel()result = self.channel.queue_declare(queue='', exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)self.response = Noneself.corr_id = Nonedef on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events(time_limit=None)return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(f" [.] Got {response}")
root@sender:/#
这段代码是一个使用Python编写的RabbitMQ RPC(远程过程调用)客户端的实现。它使用
pika
库与RabbitMQ进行交互。下面是对代码中每个部分的详细说明:
import pika
和import uuid
:导入pika
库用于与RabbitMQ进行通信,导入uuid
库用于生成唯一的标识符。定义
FibonacciRpcClient
类:class FibonacciRpcClient(object):
这个类封装了RPC客户端的功能。
类初始化方法
__init__
:def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))self.channel = self.connection.channel()
- 创建一个到RabbitMQ服务器的阻塞式连接,服务器的IP地址是
172.17.0.2
。- 通过连接创建了一个通信通道。
声明一个独占队列:
result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue
- 使用
queue_declare
声明一个独占队列,这个队列在连接关闭时会被删除。- 将声明的队列名称存储在
self.callback_queue
中,用于接收服务器的响应。设置消息消费:
self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)
- 使用
basic_consume
设置消息消费,指定队列、消息处理回调函数on_response
和自动确认消息。定义
on_response
方法:def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = body
- 这个方法是当客户端接收到响应时会被调用的回调函数。
- 检查响应的消息属性中的
correlation_id
是否与请求时设置的correlation_id
相匹配,如果匹配,则将响应的消息体存储在self.response
中。定义
call
方法:def call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events(time_limit=None)return int(self.response)
- 这个方法用于发送RPC请求并等待响应。
- 首先,将
self.response
设置为None
,生成一个新的correlation_id
。- 使用
basic_publish
方法发送RPC请求,指定交换机、路由键、消息属性(包括回复队列和相关ID)和消息体。- 通过循环等待响应,使用
process_data_events
方法处理数据事件,直到收到响应。- 收到响应后,将响应的消息体转换为整数并返回。
创建
FibonacciRpcClient
实例并发送请求:fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(f" [.] Got {response}")
- 创建
FibonacciRpcClient
类的实例。- 发送请求计算斐波那契数列的第30项,并打印响应结果。
整个代码的目的是创建一个RPC客户端,它发送请求到RabbitMQ服务器,等待服务器处理请求并返回结果。这个客户端是阻塞式的,意味着它会一直等待直到收到响应。
客户端代码稍微复杂一些:
- 我们建立一个连接,创建一个通道(channel),并声明一个用于接收响应的独占回调队列。
- 我们订阅(basic_consume)这个回调队列,以便我们能够接收RPC响应。
on_response
回调函数在每次响应时执行一个非常简单的任务,对于每个响应消息,它检查correlation_id
是否是我们正在寻找的那个。如果是,它将响应保存在self.response
中,并中断消费循环。- 接下来,我们定义了主要的
call
方法——它执行实际的RPC请求。 - 在
call
方法中,我们生成一个唯一的correlation_id
并保存它——on_response
回调函数将使用这个值来捕获适当的响应。 - 同样在
call
方法中,我们发布了请求消息(basic_publish),附带两个属性:reply_to
和correlation_id
。 - 最后,我们等待适当的响应到达,并将响应返回给用户。
3.7 开始测试
1、启动RPC服务器(Server)
root@receiver:/# python3 rpc_server.py[x] Awaiting RPC requests
2、查看mq server信息
root@5d37b5b451ba:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
rpc_queue 0root@5d37b5b451ba:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name type
amq.topic topic
amq.fanout fanout
amq.direct direct
amq.headers headers
amq.match headers
amq.rabbitmq.trace topicdirect
root@5d37b5b451ba:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name source_kind destination_name destination_kind routing_key argumentsexchange rpc_queue queue rpc_queue []
root@5d37b5b451ba:/#
3、启动RPC客户端(Client),验证结果
root@sender:/# python3 rpc_client.py[x] Requesting fib(30) <--观察容器终端输出,确认客户端已发送请求并正在等待响应。[.] Got 832040 <--客户端将等待服务器处理请求并返回结果。一旦服务器处理完毕,客户端将接收响应并显示结果。
root@sender:/#
4、服务器端显示内容
root@receiver:/# python3 rpc_server.py[x] Awaiting RPC requests[.] fib(30)
5、一些说明
展示的设计并非RPC服务的唯一可能实现方式,但它有一些重要的优点:
-
如果RPC服务器运行太慢,你可以通过简单地运行另一个服务器来扩展。尝试在新的控制台中运行第二个
rpc_server.py
。 -
在客户端,RPC只需要发送和接收一条消息。不需要像
queue_declare
这样的同步调用。因此,对于单个RPC请求,RPC客户端只需要一个网络往返。
尽管如此,我们的代码仍然相当简单,并没有尝试解决更复杂(但重要)的问题,比如:
- 如果没有服务器运行,客户端应该如何反应?
- 客户端是否应该有RPC的某种超时机制?
- 如果服务器出现故障并引发异常,是否应该将异常转发给客户端?
- 在处理之前,如何保护系统免受无效输入消息的影响(例如,检查边界)?
这些问题都是构建健壮的RPC服务时需要考虑的实际问题,它们涉及到错误处理、超时机制、异常处理和安全验证等方面。
4、Wireshark抓包
4.1 抓包方式
参考:《RabbitMQ练习(Hello World)》
4.2 抓包信息
典型数据包:
1、rpc client发送rpc请求时,携带reply_to和correlation_id:
Frame 47: 156 bytes on wire (1248 bits), 156 bytes captured (1248 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 52506, Dst Port: 5672, Seq: 492, Ack: 673, Len: 90
Advanced Message Queuing ProtocolType: Content header (2)Channel: 1Length: 82Class ID: Basic (60)Weight: 0Body size: 2Property flags: 0x0600PropertiesCorrelation-Id: c1e5f310-0996-45fb-a3d8-bf2081cd891cReply-To: amq.gen-p169XGGXAjsVdvgeJjMlOw
这段信息描述的是AMQP(Advanced Message Queuing Protocol,高级消息队列协议)中的一个消息头(Content header),它包含了消息的元数据和属性。下面是对这些字段的解释:
Type: 消息头的类型,这里指的是内容头(Content header)。
Channel: 消息发送的通道号,这里是1。
Length: 消息头的长度,这里是82字节。
Class ID: 消息所属的类标识符,
Basic (60)
表示这是一个基本类消息。Weight: 消息的优先级权重,这里设置为0,表示没有特别的优先级。
Body size: 消息体的大小,这里是2字节。
Property flags: 属性标志,这里是
0x0600
,表示设置了某些属性。Properties:
- Correlation-Id:
c1e5f310-0996-45fb-a3d8-bf2081cd891c
,这是消息的唯一标识符,用于将响应与请求关联起来。- Reply-To:
amq.gen-p169XGGXAjsVdvgeJjMlOw
,这是回复队列的名称,当消费者处理完消息后,会将响应发送到这个队列。AMQP是一种提供高度可靠的异步消息传递协议,广泛应用于分布式系统中。消息头中的这些属性使得消息传递更加灵活和可靠,例如,通过
Correlation-Id
可以将请求和响应正确地匹配起来,而Reply-To
属性则允许消费者指定一个队列来接收响应消息。
4.3 流量图
5、小结
这篇文章是 RabbitMQ 官方教程的第六部分的练习,主题是关于如何使用 Python 客户端库 Pika 实现远程过程调用(RPC)。以下是主要内容小结:
1、预备条件
- 假设 RabbitMQ 已经安装并运行在标准端口(5672)上。
- 如果使用不同的主机、端口或凭据,需要调整连接设置。
- 使用 Pika 客户端库版本 1.0.0。
2、RPC 客户端接口
- 教程提供了一个简单的 RPC 客户端类,它公开了一个名为
call
的方法,用于发送 RPC 请求并阻塞等待响应。
3、RPC 的注意事项
- RPC 是计算中常见的模式,但也常受到批评,因为它可能导致系统不可预测和调试复杂化。
- 建议明确哪些函数调用是本地的,哪些是远程的,并且文档化系统,处理错误情况。
4、回调队列
- 客户端发送请求消息时,需要提供一个“回调”队列地址,以便接收响应。
- 教程中展示了如何声明一个独占的回调队列,并使用它来接收响应。
5、correlation_id
- 为了匹配请求和响应,教程介绍了
correlation_id
属性的使用。每个请求都被赋予一个唯一的correlation_id
。 - 如果客户端在回调队列中收到一个未知的
correlation_id
,它应该安全地丢弃该消息,因为这表示该响应不属于客户端的任何请求。
6、RPC 工作流程
- 客户端启动时创建一个匿名的独占回调队列。
- 客户端发送带有
reply_to
和correlation_id
属性的 RPC 请求消息。 - RPC 工作者(服务器)在
rpc_queue
队列上等待请求,处理请求后将结果消息发送回客户端。 - 客户端在回调队列上等待数据,当消息出现时,检查
correlation_id
属性,如果匹配,则将响应返回给应用程序。
7、服务器和客户端代码
- 教程提供了
rpc_server.py
和rpc_client.py
的示例代码,展示了如何实现 RPC 服务器和客户端。
8、扩展性和问题
- 如果 RPC 服务器运行缓慢,可以通过运行另一个服务器实例来扩展。
- RPC 客户端只需要一次网络往返即可完成单个 RPC 请求。
- 代码示例相对简单,没有解决一些更复杂但重要的问题,例如服务器不可用时客户端的反应、RPC 超时、服务器异常的处理以及对无效输入消息的保护。
9、实验和资源
- 教程建议使用 RabbitMQ 管理 UI 来查看队列,这可能对实验很有帮助。
这个教程为读者提供了一个使用 RabbitMQ 和 Python 实现 RPC 机制的实用指南,包括代码示例和一些最佳实践建议。
这篇关于RabbitMQ练习(Remote procedure call (RPC))的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!