205、使用消息队列实现 RPC(远程过程调用)模型的 服务器端 和 客户端

本文主要是介绍205、使用消息队列实现 RPC(远程过程调用)模型的 服务器端 和 客户端,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

  • ★ RPC模型(远程过程调用通信模型)
  • ▲ 完整过程:
  • 代码演示
    • 总体流程解释:
    • ConstantUtil 常量工具类
    • ConnectionUtil RabbitMQ连接工具类
    • Server 服务端
    • Client 客户端
    • 测试结果
      • 服务端
      • 客户端
  • 完整代码
    • ConstantUtil 常量工具类
    • ConnectionUtil RabbitMQ连接工具类
    • Server 服务端
    • Client 客户端
    • pom.xml

★ RPC模型(远程过程调用通信模型)

PRC 模型相当于一对一,跟调用方法一样,能拿到方法返回的结果,这就是典型的RPC模型(不仅要传参数,还需要拿到返回值)

reply:回答、答复
correlation :关联、相互关联

▲ 通过使用两个独享队列,可以让RabbitMQ实现RPC(远程过程调用)通信模型,

其通信过程其实也很简单:客户端向服务器消费的独享队列发送一条消息,服务器收到该消息后,对该消息进行处理,然后将处理结果发送给客户端消费的独享队列。

服务器端消费的独享队列负责保存调用参数,客户端消费的独享队列负责保存调用的返回值。

▲ 使用独享队列可以避免其他连接来读取队列的消息、只有当前连接才能读取该队列的消息,这样才能保证服务器能读到客户端发送的每条消息,客户端也能读到服务器返回的每条消息。

▲ 为了让服务器知道客户端所消费的独享队列,客户端发送消息时,应该将自己监听的队列名以 reply_to属性 发送给服务器

▲为了能准确识别服务器应答消息(返回值)与客户端请求消息(调用参数)之间的对应关系,
还需要为每条消息都增加一个 correlation_id 属性,两条具有相同 correlation_id 属性值的消息可认为是配对的两条消息。

【备注】:客户端送出的消息要包含2个属性:

  • reply_to:该属性指定了服务器要将返回的消息送回到哪个队列。

  • correlation_id:该属性指定了服务器返回的消息也要添加相同的correlation_id属性。

在这里插入图片描述

▲ 完整过程:

(1)服务器启动时,它会创建一个名为“rpc_queue”的独享队列(名称可以随意),并使用服务器端的消费者监听该独享队列的消息。(所有的RPC 调用,一定都是先从服务器端的启动开始的。)(2)客户端启动时,它会创建一个匿名(默认)(由RabbitMQ命名)的 独享队列,并使用客户端的消费者监听该独享队列的消息。(这个独享队列的名字也是 reply_to 属性的属性值)(3)客户端发送带有两个属性的消息:一个是代表应答队列名的 reply_to属性(该属性值就是第2步客户端所创建的独享队列名),另一个是代表消息标识的 correlation_id 属性。(4)将消息发送到服务器监听的rpc_queue队列中。(5)服务器从rpc_queue队列中读取消息,服务器调用处理程序对该消息进行计算,将计算结果以消息发送给 reply_to属性 指定的队列,并为消息添加相同的 correlation_id属性。(6)客户端从 reply_to 对应的队列中读取消息,当消息出现时,它会检查消息的 correlation_id属性。如果此属性的值与请求消息的 correlation_id 属性值匹配,将它返回给应用。————上面过程,其实就是对P2P模型的应用,因此无需使用自己的Exchange,而是使用系统自动创建的默认Exchange即可。

代码演示

需求:客户端发送个消息到服务端,服务端处理完再返回结果给客户端。
如图:需要有两个消息队列,一个是服务端
在这里插入图片描述

总体流程解释:

仅作为自己梳理代码流程的记录,大佬请直接忽略

Server 类是服务端 , Client 是客户端。

rpc_queue 是自己在服务端声明创建的消息队列,服务端监听着这个消息队列

amq.gen-3Nl6GNjR5BzPJ4N-By4p1g 是客户端声明创建的一个默认生成的消息队列。
(就是调用 Channel 的 queueDeclare() 方法声明队列时,不指定具体的消息队列的参数,全凭默认生成),客户端监听着这个默认的消息队列。

replyTo 的值是 amq.gen-3xxx 这个默认消息队列,作用是指定了服务器要将返回的消息送回到这个默认队列

correlationId 只是一个单纯的消息标识,可以给个1、2、3、4…作为消息标识

上面这些就是涉及到的一些点,下面就是流程:

首先,客户端会发送几个消息,Exchange时默认的,路由key 是 rpc_queue , 每个消息都携带者 replyTo 和 correlationId 这两个属性值;
(解释:如果消息发布者指定默认的Exchange,那么Exchange就会根据消息发布者发来的消息中携带的路由key(假如路由key叫 aaa) ,去找是否有同样名字叫aaa的消息队列,有的话就把消息分发给消息队列,没有的话该消息就会被丢弃);

这些消息会被默认的Exchange分发给 rpc_queue 这个消息队列。

服务端在声明 rpc_queue 这个消息队列的时候,把这个消息队列设置为独享类型(exclusive:true),那么 rpc_queue 这个消息队列里面的消息就只能被这个服务端消费,不能被其他消费者获取到消息。

因为服务端监听这个 rpc_queue 这个消息队列,所以服务端拿到这个消息队列的消息之后,就会把消息中的 replyTo 和 correlationId 先拿出来,然后同时对消息进行业务逻辑处理。

业务逻辑处理完消息后,服务端需要把这些处理后的消息返回给客户端。
重点就是,每个消息在客户端发来之后,都有一个 correlationId 标识,所以服务端在返回回去时,需要把处理好的消息的原本的correlationId 标识对应的设置回去。

(比如:客户端发来消息 A , A 携带的 correlationId 为 1 ,那么服务端在处理完 A 消息后,需要把 correlationId = 1 再设置回 这个消息 A (就是拿出来,处理完消息,再放回去),这样客户端在接收服务端返回来的处理过后的A消息时,才能根据 correlationId = 1 这个标识,得到想要的被处理过的A消息数据。
因为客户端发的消息可能有成千上万条,需要有这个 correlationId 作为消息的标识,才能准确拿到被处理过的想要的那条A消息)

服务端返回处理过的消息给客户端,也是一个发送消息的过程,所以发送消息指定的消息队列就是这个 replyTo(就是amq.gen-3xxx 这个默认消息队列),这个replyTo 也是从客户端发送来的消息中获取获取一个属性,作用是指定了服务端要将返回的消息送回到这个默认队列。

服务端把处理后的消息返回到 amq.gen-3xxx 这个默认消息队列,因为 客户端就是在监听amq.gen-3xxx 这个默认消息队列,所以客户端就能得到自己一开始发送给服务端,然后服务端处理完成后返回来的消息。

然后客户端就能根据 correlationId 这个标识,准确找到每个被处理修改过后的消息,而不至于找混。再根据需求去对处理过的消息进行业务操作。

(以上仅作为自己梳理代码流程的记录,大佬请直接忽略)
更简单点来说,就是

客户端声明并监听着默认队列 amq.gen-3xxx,然后发送消息到客户端,消息携带有correlationId 和 replyto 两个属性,路由key是rpc_queue,exchange是默认的。

服务端声明并监听 rpc_queue 消息队列,从该队列得到消息(messageA)后,从每个消息中获取该消息对应的 correlationId 和 replyto 两个属性的属性值,然后处理消息,对于处理完的消息(resultMessageA),需要把correlationId 和 replyto 两个属性的属性值重新设置回给resultMessageA,在通过Exchange分发回给 replyto 属性值中指定的消息队列(amq.gen-3xxx)。

然后客户端再从 amq.gen-3xxx 默认的消息队列中获取服务端处理并返回回来的消息,进行对应的消费。

ConstantUtil 常量工具类

在这里插入图片描述

ConnectionUtil RabbitMQ连接工具类

在这里插入图片描述

Server 服务端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Client 客户端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试结果

启动测试的时候,一定要先启动服务端,再启动客户端

服务端

在这里插入图片描述

客户端

在这里插入图片描述

QUEUE
在这里插入图片描述

完整代码

ConstantUtil 常量工具类

package cn.ljh.rabbitmq.util;//常量
public class ConstantUtil
{//消息队列实现 RPC(远程过程调用)模型 之 服务器端----------------//消息队列public final static String RPC_QUEUE = "rpc_queue";// ------------topic类型的Exchange,需要的相关常量----------public final static String QUEUET01 = "qt_01";public final static String QUEUET02 = "qt_02";// topic 通配符类型的 Exchangepublic static final String EXCHANGE_NAME_TOPIC = "myex03.topic";// Exchange 绑定 Queue 队列的路由key  ,通配符类型      *:匹配一个单词。#:匹配零个或多个单词。public static final String[] ROUTING_TOPIC_PATTERNS = {"*.crazyit.*", "*.org", "edu.#"};// 生产者发送消息给Excahnge携带的路由keypublic static final String[] ROUTING_TOPIC_KEYS = { "www.crazyit.org", "www.crazyit.cn","edu.crazyit.org", "crazyit.org", "fkjava.org", "edu.fkjava.org", "edu.fkjava", "edu.org"};//-------------------------------------------------------// 消息队列的名称public final static String QUEUE01 = "queue_01";public final static String QUEUE02 = "queue_02";// Exchange的名称public static final String EXCHANGE_NAME = "myex02.direct";// 三个路由key定义成一个数组的名称public static final String[] ROUTING_KEYS = {"info", "error", "warning"};}

ConnectionUtil RabbitMQ连接工具类

package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory =  new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}

Server 服务端

package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC(远程过程调用)模型 之 服务器端
public class Server
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、使用系统自动创建的默认Exchange,无需声明Exchange//消息队列设置为独占(exclusive:true)--------------------------------------------------------------------------------------声明消息队列channel.queueDeclare(ConstantUtil.RPC_QUEUE,true,   /* 是否持久化 */true,  /* 是否只允许只有这个消息队列的消息消费者才可以消费这个消息队列的消息 */false, /* 是否自动删除 */null); /* 指定这个消息队列的额外参数属性 *///不需要关闭资源,因为它也要监听自己消费消息的队列//4、调用Channel 的 basicConsume()方法开始消费消息----------------------------------------------------------------------------1、服务端监听并消费消息channel.basicConsume(ConstantUtil.RPC_QUEUE, /* 消费这个名字的消费队列里面的消息 */true,new DefaultConsumer(channel){//处理消息:当这个 ConstantUtil.RPC_QUEUE 消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /* 消息的那些属性 */,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来,此处读取到的消息,就相当于调用参数-------------------------------------------------------2、服务端获取消息队列的消息String param = new String(body, StandardCharsets.UTF_8);//之前只需要用到消息,现在需要额外读取消息里面携带的两个属性:reply_to 和 correlation_id//消息的属性都存放在 AMQP.BasicProperties 这个属性里面,从这个属性获取 reply_to 和 correlation_idString replyTo = properties.getReplyTo();System.err.println("replyTo:  " + replyTo);String correlationId = properties.getCorrelationId();System.err.println("correlationId:  " + correlationId);//调用服务器的处理消息的方法,最终得到处理后的结果。该方法可以是任意的业务处理,该方法的返回值result是要被送回客户端的。------3、服务端处理消费消息String result = format(param);//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("服务端 收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(), envelope.getRoutingKey(), param);//发送消息的方法,需要把返回值result发送回客户端-------------------------------------------------------4、服务端处理消费完的消息返回客户端的操作channel.basicPublish("", /* 使用默认的Exchange */replyTo,/* 此处的routing key 应该填 reply_to 属性;  reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 *///把从客户端的 AMQP.BasicProperties 属性获取到的correlationId,再作为参数传回去,用于客户端和服务器的匹配。new AMQP.BasicProperties().builder().correlationId(correlationId) /* 也需要返回额外的 correlation_id,要与从客户端消息中读取的 correlation_id 完全一样 */.deliveryMode(2) /* 设置这个消息是持久化类型的 */.build(), /*这个.build()的作用就是构建得到这个 BasicProperties 对象,这个对象就包含了 correlationId 属性因为服务器端返回的消息一定要有这个correlationId。 */result.getBytes(StandardCharsets.UTF_8));}});}//模拟服务器端消费消息要做的处理业务逻辑操作public static String format(String name){//此处模拟让服务器处理这里的业务有快有慢的情况,看correlation_id 能不能还是把数据对应上int rand = (new Random().nextInt(40) + 20) * 30;try{Thread.sleep(rand);} catch (InterruptedException e){e.printStackTrace();}return "《" + name + "》";}}

Client 客户端

package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC(远程过程调用)模型 之 客户端
public class Client
{// paramMap 保存了 correlationid 与 参数(消息)之间的对应关系public static Map<String, String> paramMap = new ConcurrentHashMap<>();//客户端发送的消息(参数)public static String[] params = new String[]{"火影忍者", "七龙珠", "哆啦A梦", "蜡笔小新"};public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,声明一个有 RabbitMQ 自动创建的、自动命名的、持久化的、独享的、会自动删除的【默认队列】AMQP.Queue.DeclareOk declareOk = channel.queueDeclare();System.out.println("declareOk  "+declareOk);//.getQueue() 用于得到默认队列的返回值,也就是默认队列的名字,之前声明是我们自己设置队列名,这里用默认的队列,就用.getQueue() 得到队列名。String queueName = declareOk.getQueue();System.out.println("queueName: "+queueName);//4、调用Channel 的 basicConsume()方法开始处理消费消息-----------------------------------------------------------------2、客户端监听服务端处理完消息后返回来的消息channel.basicConsume(queueName /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String resultMessage = new String(body, StandardCharsets.UTF_8);//此处,需要指定每个返回值对应的是哪个参数,靠的就是correlation_idString correlationId = properties.getCorrelationId();//根据服务器端返回的消息中的correlation_id 获取对应的参数String param = paramMap.get(correlationId);System.err.println("客户端发出去的消息内容:"+param +"  , 服务端处理后返回来的消息内容:"+resultMessage);//printf:格式化输出函数   %s:输出字符串  %n:换行System.out.printf("客户端 收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(), envelope.getRoutingKey(), resultMessage);//得到服务器的返回值之后,整个调用过程就完成了,此时就应该从 Map 中删除这组 key-value对了( correlationId 与 参数的对应关系 )。paramMap.remove(correlationId);}});//客户端发送消息---------------------------------------------------------------------------代码运行后先执行这段--------------------1、客户端发送消息for (int i = 0 ; i < params.length ; i++){paramMap.put( i + "" , params[i] );channel.basicPublish("", /* 使用默认的Exchange */ConstantUtil.RPC_QUEUE, /* 客户端发送消息携带的路由key是服务端监听的消息队列的名字,且使用了默认的Exchange,这就意味着消息会被发送给服务器监听的那个消息队列 */new AMQP.BasicProperties().builder().correlationId(i + "")  /* 设置 correlation_id 属性;  correlation_id:该属性指定了服务器返回的消息也要添加相同的correlation_id属性*/.replyTo(queueName)     /* reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 , 设置 reply_to 属性 */.deliveryMode(2)  /* 持久化消息 */.build(),          /* 构建这个BasicProperties对象,这个对象主要存这个correlationId属性 */params[i].getBytes(StandardCharsets.UTF_8));}}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmq_rpc</artifactId><version>1.0.0</version><name>rabbitmq_rpc</name><!--  属性  --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!--  依赖  --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies></project>

这篇关于205、使用消息队列实现 RPC(远程过程调用)模型的 服务器端 和 客户端的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/211195

相关文章

Python调用Orator ORM进行数据库操作

《Python调用OratorORM进行数据库操作》OratorORM是一个功能丰富且灵活的PythonORM库,旨在简化数据库操作,它支持多种数据库并提供了简洁且直观的API,下面我们就... 目录Orator ORM 主要特点安装使用示例总结Orator ORM 是一个功能丰富且灵活的 python O

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.