RabbitMQ练习(Remote procedure call (RPC))

2024-09-01 09:36

本文主要是介绍RabbitMQ练习(Remote procedure call (RPC)),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 1、RabbitMQ教程

《RabbitMQ Tutorials》icon-default.png?t=N7T8https://www.rabbitmq.com/tutorials

2、环境准备

参考:《RabbitMQ练习(Hello World)》。

确保RabbitMQ、Sender、Receiver容器正常安装和启动。

root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
root@k0test1:~# docker start sender
root@k0test1:~# docker start receiver
root@k0test1:~# docker network inspect bridge

网络拓扑:

 

3、RPC练习

3.1 概述

前面练习了如何使用工作队列(Work queues)来在多个worker之间分配耗时的任务(time-consuming tasks)

但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?那是一个不同的故事。这种模式通常被称为远程过程调用,或RPC。

在本练习中,我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分配的耗时任务,我们将创建一个返回斐波那契数(Fibonacci numbers)虚拟RPC服务(dummy RPC service)

3.2 Client interface

为了展示如何使用RPC服务,将创建一个简单的客户端类(a simple client class)。这个类将暴露一个名为call的方法,该方法发送一个RPC请求,并阻塞直到收到响应。(原文:It's going to expose a method named call which sends an RPC request and blocks until the answer is received)

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")

这段代码展示了如何使用一个RPC客户端来调用远程的斐波那契数计算服务。以下是代码的详细解释:

  1. fibonacci_rpc = FibonacciRpcClient():这行代码创建了一个FibonacciRpcClient的实例。FibonacciRpcClient是一个RPC客户端,用于发送请求到RPC服务器,并接收响应。

  2. result = fibonacci_rpc.call(4):这行代码调用了RPC客户端的call方法,并将数字4作为参数传递给服务器。服务器将计算斐波那契数列中的第4个数,并将结果返回给客户端。

  3. print(f"fib(4) is {result}"):这行代码打印出计算结果。格式化字符串"fib(4) is {result}"将计算得到的斐波那契数显示出来。

在这个例子中,客户端发送了一个请求到RPC服务器,请求计算斐波那契数列的第4个数,然后服务器返回了结果,客户端接收并打印了这个结果。这是一个典型的RPC模式的应用,允许客户端像调用本地函数一样调用远程服务。

 3.3 Callback queue

一般来说,通过RabbitMQ实现远程过程调用(RPC)是很简单的。客户端发送一个请求消息,服务器用一个响应消息来回复。为了能够接收到响应,客户端需要在请求中发送一个“回调”队列地址(原文:In order to receive a response the client needs to send a 'callback' queue address with the request.)。

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queuechannel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = callback_queue,),body=request)# ... and some code to read a response message from the callback_queue ...

这段Python代码是使用RabbitMQ实现RPC(远程过程调用)的一个示例。下面是代码的中文解释:

  1. result = channel.queue_declare(queue='', exclusive=True):这行代码声明了一个私有的队列。queue_declare方法用于声明一个新的队列,如果队列不存在,则创建它。queue=''表示让RabbitMQ自动生成一个唯一的队列名称。exclusive=True表示这个队列是私有的,只对声明它的连接可见。

  2. callback_queue = result.method.queue:这行代码将声明的队列名称赋值给callback_queue变量,这个队列将被用作回调队列,以便服务器可以发送响应到这个队列。

  3. channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties(reply_to=callback_queue,), body=request):这行代码发布一个消息到RabbitMQ。exchange参数为空字符串,表示使用默认的交换机。routing_key='rpc_queue'表示消息将发送到名为rpc_queue的队列。properties参数是一个BasicProperties对象,其中reply_to属性设置为callback_queue,这告诉服务器将响应消息发送到这个回调队列。body参数是实际发送的消息体,这里假设是request变量。

  4. 注释# ... and some code to read a response message from the callback_queue ...:这部分代码应该包含从callback_queue读取响应消息的逻辑,但在这个示例中没有给出。

这段代码展示了客户端如何发送一个RPC请求,并设置回调队列以便接收服务器的响应。服务器需要监听rpc_queue队列,处理请求,并将响应消息发送到客户端指定的callback_queue

AMQP 0-9-1 协议预定义了一组共14个属性,这些属性与消息相关。除了以下几个常用的属性外,大多数属性很少使用:

  1. delivery_mode:标记消息是持久的(值为2)还是瞬时的(其他任何值)。你可能还记得这个属性是从第二个教程中学到的。
  2. content_type:用于描述消息编码的MIME类型。例如,对于常用的JSON编码,将此属性设置为application/json是一个好习惯。
  3. reply_to:通常用于命名一个回调队列。
  4. correlation_id:用于将RPC响应与请求相关联,非常有用。

这些属性在RabbitMQ中非常有用,因为它们允许你控制消息的行为,以及如何在不同的应用程序组件之间进行通信。例如,delivery_mode 可以帮助确保消息在RabbitMQ服务器重启后仍然可用,而 reply_tocorrelation_id 使得RPC模式的实现成为可能,允许服务器将响应发送回正确的客户端。content_type 则有助于接收方理解消息的内容格式,从而正确地解析消息。

3.4 Correlation id

在上述介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这种方法效率不高,但幸运的是有更好的解决方案——让我们为每个客户端创建一个单一的回调队列。

这引发了一个新的问题:在队列中收到响应时,不清楚该响应属于哪个请求。这时,我们使用correlation_id属性。我们将为每个请求设置一个唯一的值。之后,当我们在回调队列中收到消息时,我们会查看这个属性,并基于它将响应与请求匹配起来。如果我们看到一个未知的correlation_id值,我们可以安全地丢弃该消息——它不属于我们的请求。

你可能会问,为什么我们应该忽略回调队列中的未知消息,而不是以错误失败?这是由于服务器端可能存在竞争条件。虽然不太可能,但RPC服务器可能在我们收到答案后但在发送请求确认消息之前就崩溃了。如果发生这种情况,重启的RPC服务器将再次处理请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而且RPC应该是幂等的(原文: That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.)。

RPC(远程过程调用)应该是幂等的,意味着对于同一个请求,无论服务器端接收到多少次相同的RPC调用,其结果都应该是一致的,不会对系统状态产生额外的影响。换句话说,即使一个请求被重复发送多次,它也只会产生一次实际的效应或结果。

幂等性在分布式系统中非常重要,特别是在网络通信和服务器处理中可能存在延迟或重试的情况下。如果RPC调用不是幂等的,那么重复的请求可能会导致数据不一致或系统状态错误。例如,如果一个RPC调用是用来增加某个计数器的值,幂等性保证了无论这个调用被执行多少次,计数器的最终值都只会增加一次。

在实际应用中,确保RPC调用的幂等性可以通过以下方式实现:

  1. 唯一标识符:为每个请求分配一个唯一的ID,这样即使请求被重复发送,服务器也可以通过这个ID识别并避免重复处理。
  2. 状态检查:在执行操作之前,服务器可以检查请求是否已经被处理过,如果已经处理,则不再执行。
  3. 事务控制:使用数据库事务或其他形式的事务控制来确保操作的原子性,即要么完全执行,要么完全不执行,从而避免部分执行导致的问题。
  4. 重试机制:设计客户端时,可以包含重试逻辑,但要确保重试不会对系统状态产生负面影响。

幂等性是分布式系统设计中的一个关键原则,有助于提高系统的健壮性和可靠性。

3.5 Summary 

我们的RPC将按照以下方式工作:

  • 当客户端启动时,它会创建一个匿名的专用回调队列(an anonymous exclusive callback queue)
  • 对于RPC请求,客户端发送一个带有两个属性的消息:reply_to,设置为回调队列,correlation_id,为每个请求设置一个唯一值。
  • 请求被发送到一个名为rpc_queue的队列。
  • RPC工作器(RPC worker)(也就是服务器)在该队列上等待请求。当出现请求时,它执行任务,并将结果消息发送回客户端,使用的队列来自reply_to字段。
  • 客户端在回调队列上等待数据。当出现一条消息时,它会检查correlation_id属性。如果它与请求中的值匹配,它将响应返回给应用程序。

 3.6 代码说明

3.6.1 rpc_server.py

进入receiver容器,vi编写rpc_server.py:

root@receiver:/# vi rpc_server.py
root@receiver:/# cat rpc_server.py 
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)def on_request(ch, method, props, body):n = int(body)print(f" [.] fib({n})")response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = \props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)print(" [x] Awaiting RPC requests")
channel.start_consuming()
root@receiver:/# 

这段代码是一个使用Python编写的RabbitMQ RPC(远程过程调用)服务器的实现。它使用pika库与RabbitMQ进行交互。下面是对代码中每个部分的详细说明:

  1. import pika:导入pika库,这是一个Python客户端库,用于与RabbitMQ消息代理进行通信。

  2. 创建RabbitMQ连接:

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))

    这行代码创建了一个到RabbitMQ服务器的阻塞式连接。host='172.17.0.2'指定了RabbitMQ服务器的IP地址。

  3. 创建通信通道:

    channel = connection.channel()

    通过连接创建了一个通信通道,用于发送和接收消息。

  4. 声明队列:

    channel.queue_declare(queue='rpc_queue')

    声明了一个名为rpc_queue的队列。如果该队列不存在,则会被创建。

  5. 定义斐波那契函数:

    def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n - 1) + fib(n - 2)

    这是一个递归函数,用于计算斐波那契数列的第n项。

  6. 定义请求处理回调函数:

    def on_request(ch, method, props, body):n = int(body)print(f" [.] fib({n})")response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)

    这个函数是当队列rpc_queue接收到消息时会被调用的回调函数。它执行以下操作:

    • 将消息体(body)转换为整数n
    • 调用fib函数计算斐波那契数。
    • 打印正在处理的请求。
    • 使用basic_publish方法将响应消息发送到客户端指定的回复队列(reply_to)。
    • 使用basic_ack方法确认消息已被处理。
  7. 设置QoS(服务质量):

    channel.basic_qos(prefetch_count=1)

    这行代码设置了预取数量为1,意味着每次只有一个消息被分派给消费者,直到消费者确认该消息。

  8. 设置消息消费:

    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

    这行代码设置了消息消费,指定了队列rpc_queue和消息处理回调函数on_request

  9. 打印等待RPC请求的消息:

    print(" [x] Awaiting RPC requests")
  10. 开始消费消息:

    channel.start_consuming()

    这行代码启动了消息通道的消费过程,使得服务器开始监听和处理消息。

整个代码的目的是创建一个RPC服务器,它监听rpc_queue队列中的请求,对每个请求计算斐波那契数,并将结果发送回请求者。这个服务器是阻塞式的,意味着它会一直运行,直到被显式地停止或发生错误。

以下具体解释on_request是怎么被调用的:

on_request 函数被调用的过程涉及到RabbitMQ的RPC(远程过程调用)模式和消息确认机制。以下是详细步骤:

  1. 客户端发送RPC请求:客户端向RabbitMQ发送一个RPC请求消息,这个消息会被发送到指定的队列(在这个例子中是'rpc_queue')。

  2. 消息到达队列:RabbitMQ将客户端发送的消息放入'rpc_queue'队列中。

  3. 服务端监听队列:服务端(即上述代码)通过channel.basic_consume方法设置了一个监听器,指定了队列'rpc_queue'和消息处理回调函数on_request

  4. 消息被消费:当RabbitMQ中的'rpc_queue'队列接收到新消息时,它会通知服务端。

  5. 回调函数被触发:服务端收到通知后,调用on_request函数。这个函数接收四个参数:

    • ch:当前的通道对象。
    • method:描述了消息的接收方法。
    • props:包含了消息的属性,如reply_to(客户端期望接收响应的队列名称)和correlation_id(用于匹配请求和响应的相关ID)。
    • body:消息体,包含了客户端发送的数据。
  6. 处理请求on_request函数内部,首先将消息体body转换为整数n,然后调用fib函数计算斐波那契数。

  7. 发送响应:计算完成后,使用ch.basic_publish方法将计算结果发送回客户端。这里会使用reply_to属性指定的路由键,以及correlation_id来确保响应能够正确匹配到原始请求。

  8. 确认消息:最后,通过ch.basic_ack方法确认消息已被处理,这样RabbitMQ就知道可以移除这条消息了。

整个过程是异步的,服务端在channel.start_consuming()调用后进入等待状态,一旦有消息到达,就会触发on_request函数的执行。

服务器代码相当直接明了:

首先,我们建立连接并声明队列rpc_queue。 我们声明了斐波那契函数。它假设输入的是有效的正整数。(不要指望这个函数能处理大数字,这可能是最慢的递归实现方式)。 我们为basic_consume声明了一个回调函数on_request,这是RPC服务器的核心。当收到请求时,它会被执行。它完成工作并将响应发送回去。 我们可能想要运行多个服务器进程。为了将负载平均分配给多个服务器,我们需要设置prefetch_count参数。

3.6.2 rpc_client.py

进入sender容器,vi编写rpc_client.py:

root@sender:/# vi rpc_client.py
root@sender:/# cat rpc_client.py 
import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))self.channel = self.connection.channel()result = self.channel.queue_declare(queue='', exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)self.response = Noneself.corr_id = Nonedef on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events(time_limit=None)return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(f" [.] Got {response}")
root@sender:/# 

这段代码是一个使用Python编写的RabbitMQ RPC(远程过程调用)客户端的实现。它使用pika库与RabbitMQ进行交互。下面是对代码中每个部分的详细说明:

  1. import pikaimport uuid:导入pika库用于与RabbitMQ进行通信,导入uuid库用于生成唯一的标识符。

  2. 定义FibonacciRpcClient类:

    class FibonacciRpcClient(object):

    这个类封装了RPC客户端的功能。

  3. 类初始化方法__init__

    def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))self.channel = self.connection.channel()
    • 创建一个到RabbitMQ服务器的阻塞式连接,服务器的IP地址是172.17.0.2
    • 通过连接创建了一个通信通道。
  4. 声明一个独占队列:

    result = self.channel.queue_declare(queue='', exclusive=True)
    self.callback_queue = result.method.queue
    • 使用queue_declare声明一个独占队列,这个队列在连接关闭时会被删除。
    • 将声明的队列名称存储在self.callback_queue中,用于接收服务器的响应。
  5. 设置消息消费:

    self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)
    • 使用basic_consume设置消息消费,指定队列、消息处理回调函数on_response和自动确认消息。
  6. 定义on_response方法:

    def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = body
    • 这个方法是当客户端接收到响应时会被调用的回调函数。
    • 检查响应的消息属性中的correlation_id是否与请求时设置的correlation_id相匹配,如果匹配,则将响应的消息体存储在self.response中。
  7. 定义call方法:

    def call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events(time_limit=None)return int(self.response)
    • 这个方法用于发送RPC请求并等待响应。
    • 首先,将self.response设置为None,生成一个新的correlation_id
    • 使用basic_publish方法发送RPC请求,指定交换机、路由键、消息属性(包括回复队列和相关ID)和消息体。
    • 通过循环等待响应,使用process_data_events方法处理数据事件,直到收到响应。
    • 收到响应后,将响应的消息体转换为整数并返回。
  8. 创建FibonacciRpcClient实例并发送请求:

    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(f" [.] Got {response}")
    • 创建FibonacciRpcClient类的实例。
    • 发送请求计算斐波那契数列的第30项,并打印响应结果。

整个代码的目的是创建一个RPC客户端,它发送请求到RabbitMQ服务器,等待服务器处理请求并返回结果。这个客户端是阻塞式的,意味着它会一直等待直到收到响应。

客户端代码稍微复杂一些:

  1. 我们建立一个连接,创建一个通道(channel),并声明一个用于接收响应的独占回调队列。
  2. 我们订阅(basic_consume)这个回调队列,以便我们能够接收RPC响应。
  3. on_response回调函数在每次响应时执行一个非常简单的任务,对于每个响应消息,它检查correlation_id是否是我们正在寻找的那个。如果是,它将响应保存在self.response中,并中断消费循环。
  4. 接下来,我们定义了主要的call方法——它执行实际的RPC请求。
  5. call方法中,我们生成一个唯一的correlation_id并保存它——on_response回调函数将使用这个值来捕获适当的响应。
  6. 同样在call方法中,我们发布了请求消息(basic_publish),附带两个属性:reply_tocorrelation_id
  7. 最后,我们等待适当的响应到达,并将响应返回给用户。

3.7 开始测试

 1、启动RPC服务器(Server)

root@receiver:/# python3 rpc_server.py[x] Awaiting RPC requests

 2、查看mq server信息

root@5d37b5b451ba:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
rpc_queue       0root@5d37b5b451ba:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topicdirect
root@5d37b5b451ba:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     argumentsexchange        rpc_queue       queue   rpc_queue       []
root@5d37b5b451ba:/# 

3、启动RPC客户端(Client),验证结果

root@sender:/# python3 rpc_client.py[x] Requesting fib(30) <--观察容器终端输出,确认客户端已发送请求并正在等待响应。[.] Got 832040  <--客户端将等待服务器处理请求并返回结果。一旦服务器处理完毕,客户端将接收响应并显示结果。
root@sender:/# 

4、服务器端显示内容

root@receiver:/# python3 rpc_server.py[x] Awaiting RPC requests[.] fib(30)

5、一些说明

展示的设计并非RPC服务的唯一可能实现方式,但它有一些重要的优点:

  1. 如果RPC服务器运行太慢,你可以通过简单地运行另一个服务器来扩展。尝试在新的控制台中运行第二个rpc_server.py

  2. 在客户端,RPC只需要发送和接收一条消息。不需要像queue_declare这样的同步调用。因此,对于单个RPC请求,RPC客户端只需要一个网络往返。

尽管如此,我们的代码仍然相当简单,并没有尝试解决更复杂(但重要)的问题,比如:

  • 如果没有服务器运行,客户端应该如何反应?
  • 客户端是否应该有RPC的某种超时机制?
  • 如果服务器出现故障并引发异常,是否应该将异常转发给客户端?
  • 在处理之前,如何保护系统免受无效输入消息的影响(例如,检查边界)?

这些问题都是构建健壮的RPC服务时需要考虑的实际问题,它们涉及到错误处理、超时机制、异常处理和安全验证等方面。

4、Wireshark抓包

4.1 抓包方式

参考:《RabbitMQ练习(Hello World)》

4.2 抓包信息

典型数据包:

1、rpc client发送rpc请求时,携带reply_to和correlation_id:

 

Frame 47: 156 bytes on wire (1248 bits), 156 bytes captured (1248 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 52506, Dst Port: 5672, Seq: 492, Ack: 673, Len: 90
Advanced Message Queuing ProtocolType: Content header (2)Channel: 1Length: 82Class ID: Basic (60)Weight: 0Body size: 2Property flags: 0x0600PropertiesCorrelation-Id: c1e5f310-0996-45fb-a3d8-bf2081cd891cReply-To: amq.gen-p169XGGXAjsVdvgeJjMlOw

这段信息描述的是AMQP(Advanced Message Queuing Protocol,高级消息队列协议)中的一个消息头(Content header),它包含了消息的元数据和属性。下面是对这些字段的解释:

  1. Type: 消息头的类型,这里指的是内容头(Content header)。

  2. Channel: 消息发送的通道号,这里是1。

  3. Length: 消息头的长度,这里是82字节。

  4. Class ID: 消息所属的类标识符,Basic (60)表示这是一个基本类消息。

  5. Weight: 消息的优先级权重,这里设置为0,表示没有特别的优先级。

  6. Body size: 消息体的大小,这里是2字节。

  7. Property flags: 属性标志,这里是0x0600,表示设置了某些属性。

  8. Properties:

    • Correlation-Idc1e5f310-0996-45fb-a3d8-bf2081cd891c,这是消息的唯一标识符,用于将响应与请求关联起来。
    • Reply-Toamq.gen-p169XGGXAjsVdvgeJjMlOw,这是回复队列的名称,当消费者处理完消息后,会将响应发送到这个队列。

AMQP是一种提供高度可靠的异步消息传递协议,广泛应用于分布式系统中。消息头中的这些属性使得消息传递更加灵活和可靠,例如,通过Correlation-Id可以将请求和响应正确地匹配起来,而Reply-To属性则允许消费者指定一个队列来接收响应消息。

4.3 流量图

 5、小结

这篇文章是 RabbitMQ 官方教程的第六部分的练习,主题是关于如何使用 Python 客户端库 Pika 实现远程过程调用(RPC)。以下是主要内容小结:

1、预备条件

  • 假设 RabbitMQ 已经安装并运行在标准端口(5672)上。
  • 如果使用不同的主机、端口或凭据,需要调整连接设置。
  • 使用 Pika 客户端库版本 1.0.0。

2、RPC 客户端接口

  • 教程提供了一个简单的 RPC 客户端类,它公开了一个名为 call 的方法,用于发送 RPC 请求并阻塞等待响应。

3、RPC 的注意事项

  • RPC 是计算中常见的模式,但也常受到批评,因为它可能导致系统不可预测和调试复杂化。
  • 建议明确哪些函数调用是本地的,哪些是远程的,并且文档化系统,处理错误情况。

4、回调队列

  • 客户端发送请求消息时,需要提供一个“回调”队列地址,以便接收响应。
  • 教程中展示了如何声明一个独占的回调队列,并使用它来接收响应。

5、correlation_id

  • 为了匹配请求和响应,教程介绍了 correlation_id 属性的使用。每个请求都被赋予一个唯一的 correlation_id
  • 如果客户端在回调队列中收到一个未知的 correlation_id,它应该安全地丢弃该消息,因为这表示该响应不属于客户端的任何请求。

6、RPC 工作流程

  • 客户端启动时创建一个匿名的独占回调队列。
  • 客户端发送带有 reply_to 和 correlation_id 属性的 RPC 请求消息。
  • RPC 工作者(服务器)在 rpc_queue 队列上等待请求,处理请求后将结果消息发送回客户端。
  • 客户端在回调队列上等待数据,当消息出现时,检查 correlation_id 属性,如果匹配,则将响应返回给应用程序。

7、服务器和客户端代码

  • 教程提供了 rpc_server.py 和 rpc_client.py 的示例代码,展示了如何实现 RPC 服务器和客户端。

8、扩展性和问题

  • 如果 RPC 服务器运行缓慢,可以通过运行另一个服务器实例来扩展。
  • RPC 客户端只需要一次网络往返即可完成单个 RPC 请求。
  • 代码示例相对简单,没有解决一些更复杂但重要的问题,例如服务器不可用时客户端的反应、RPC 超时、服务器异常的处理以及对无效输入消息的保护。

9、实验和资源

  • 教程建议使用 RabbitMQ 管理 UI 来查看队列,这可能对实验很有帮助。

这个教程为读者提供了一个使用 RabbitMQ 和 Python 实现 RPC 机制的实用指南,包括代码示例和一些最佳实践建议。

这篇关于RabbitMQ练习(Remote procedure call (RPC))的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1126566

相关文章

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

【Rust练习】12.枚举

练习题来自:https://practice-zh.course.rs/compound-types/enum.html 1 // 修复错误enum Number {Zero,One,Two,}enum Number1 {Zero = 0,One,Two,}// C语言风格的枚举定义enum Number2 {Zero = 0.0,One = 1.0,Two = 2.0,}fn m

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

html css jquery选项卡 代码练习小项目

在学习 html 和 css jquery 结合使用的时候 做好是能尝试做一些简单的小功能,来提高自己的 逻辑能力,熟悉代码的编写语法 下面分享一段代码 使用html css jquery选项卡 代码练习 <div class="box"><dl class="tab"><dd class="active">手机</dd><dd>家电</dd><dd>服装</dd><dd>数码</dd><dd

AutoGen Function Call 函数调用解析(一)

目录 一、AutoGen Function Call 1.1 register_for_llm 注册调用 1.2 register_for_execution 注册执行 1.3 三种注册方法 1.3.1 函数定义和注册分开 1.3.2 定义函数时注册 1.3.3  register_function 函数注册 二、实例 本文主要对 AutoGen Function Call

014.Python爬虫系列_解析练习

我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈 入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈 虚 拟 环 境 搭 建 :👉👉 Python项目虚拟环境(超详细讲解) 👈👈 PyQt5 系 列 教 程:👉👉 Python GUI(PyQt5)文章合集 👈👈 Oracle数据库教程:👉👉 Oracle数据库文章合集 👈👈 优

ORACLE语法-包(package)、存储过程(procedure)、游标(cursor)以及java对Result结果集的处理

陈科肇 示例: 包规范 CREATE OR REPLACE PACKAGE PACK_WMS_YX IS-- Author : CKZ-- Created : 2015/8/28 9:52:29-- Purpose : 同步数据-- Public type declarations,游标 退休订单TYPE retCursor IS REF CURSOR;-- RETURN vi_co_co