205、使用消息队列实现 RPC(远程过程调用)模型的 服务器端 和 客户端

本文主要是介绍205、使用消息队列实现 RPC(远程过程调用)模型的 服务器端 和 客户端,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

  • ★ RPC模型(远程过程调用通信模型)
  • ▲ 完整过程:
  • 代码演示
    • 总体流程解释:
    • ConstantUtil 常量工具类
    • ConnectionUtil RabbitMQ连接工具类
    • Server 服务端
    • Client 客户端
    • 测试结果
      • 服务端
      • 客户端
  • 完整代码
    • ConstantUtil 常量工具类
    • ConnectionUtil RabbitMQ连接工具类
    • Server 服务端
    • Client 客户端
    • pom.xml

★ RPC模型(远程过程调用通信模型)

PRC 模型相当于一对一,跟调用方法一样,能拿到方法返回的结果,这就是典型的RPC模型(不仅要传参数,还需要拿到返回值)

reply:回答、答复
correlation :关联、相互关联

▲ 通过使用两个独享队列,可以让RabbitMQ实现RPC(远程过程调用)通信模型,

其通信过程其实也很简单:客户端向服务器消费的独享队列发送一条消息,服务器收到该消息后,对该消息进行处理,然后将处理结果发送给客户端消费的独享队列。

服务器端消费的独享队列负责保存调用参数,客户端消费的独享队列负责保存调用的返回值。

▲ 使用独享队列可以避免其他连接来读取队列的消息、只有当前连接才能读取该队列的消息,这样才能保证服务器能读到客户端发送的每条消息,客户端也能读到服务器返回的每条消息。

▲ 为了让服务器知道客户端所消费的独享队列,客户端发送消息时,应该将自己监听的队列名以 reply_to属性 发送给服务器

▲为了能准确识别服务器应答消息(返回值)与客户端请求消息(调用参数)之间的对应关系,
还需要为每条消息都增加一个 correlation_id 属性,两条具有相同 correlation_id 属性值的消息可认为是配对的两条消息。

【备注】:客户端送出的消息要包含2个属性:

  • reply_to:该属性指定了服务器要将返回的消息送回到哪个队列。

  • correlation_id:该属性指定了服务器返回的消息也要添加相同的correlation_id属性。

在这里插入图片描述

▲ 完整过程:

(1)服务器启动时,它会创建一个名为“rpc_queue”的独享队列(名称可以随意),并使用服务器端的消费者监听该独享队列的消息。(所有的RPC 调用,一定都是先从服务器端的启动开始的。)(2)客户端启动时,它会创建一个匿名(默认)(由RabbitMQ命名)的 独享队列,并使用客户端的消费者监听该独享队列的消息。(这个独享队列的名字也是 reply_to 属性的属性值)(3)客户端发送带有两个属性的消息:一个是代表应答队列名的 reply_to属性(该属性值就是第2步客户端所创建的独享队列名),另一个是代表消息标识的 correlation_id 属性。(4)将消息发送到服务器监听的rpc_queue队列中。(5)服务器从rpc_queue队列中读取消息,服务器调用处理程序对该消息进行计算,将计算结果以消息发送给 reply_to属性 指定的队列,并为消息添加相同的 correlation_id属性。(6)客户端从 reply_to 对应的队列中读取消息,当消息出现时,它会检查消息的 correlation_id属性。如果此属性的值与请求消息的 correlation_id 属性值匹配,将它返回给应用。————上面过程,其实就是对P2P模型的应用,因此无需使用自己的Exchange,而是使用系统自动创建的默认Exchange即可。

代码演示

需求:客户端发送个消息到服务端,服务端处理完再返回结果给客户端。
如图:需要有两个消息队列,一个是服务端
在这里插入图片描述

总体流程解释:

仅作为自己梳理代码流程的记录,大佬请直接忽略

Server 类是服务端 , Client 是客户端。

rpc_queue 是自己在服务端声明创建的消息队列,服务端监听着这个消息队列

amq.gen-3Nl6GNjR5BzPJ4N-By4p1g 是客户端声明创建的一个默认生成的消息队列。
(就是调用 Channel 的 queueDeclare() 方法声明队列时,不指定具体的消息队列的参数,全凭默认生成),客户端监听着这个默认的消息队列。

replyTo 的值是 amq.gen-3xxx 这个默认消息队列,作用是指定了服务器要将返回的消息送回到这个默认队列

correlationId 只是一个单纯的消息标识,可以给个1、2、3、4…作为消息标识

上面这些就是涉及到的一些点,下面就是流程:

首先,客户端会发送几个消息,Exchange时默认的,路由key 是 rpc_queue , 每个消息都携带者 replyTo 和 correlationId 这两个属性值;
(解释:如果消息发布者指定默认的Exchange,那么Exchange就会根据消息发布者发来的消息中携带的路由key(假如路由key叫 aaa) ,去找是否有同样名字叫aaa的消息队列,有的话就把消息分发给消息队列,没有的话该消息就会被丢弃);

这些消息会被默认的Exchange分发给 rpc_queue 这个消息队列。

服务端在声明 rpc_queue 这个消息队列的时候,把这个消息队列设置为独享类型(exclusive:true),那么 rpc_queue 这个消息队列里面的消息就只能被这个服务端消费,不能被其他消费者获取到消息。

因为服务端监听这个 rpc_queue 这个消息队列,所以服务端拿到这个消息队列的消息之后,就会把消息中的 replyTo 和 correlationId 先拿出来,然后同时对消息进行业务逻辑处理。

业务逻辑处理完消息后,服务端需要把这些处理后的消息返回给客户端。
重点就是,每个消息在客户端发来之后,都有一个 correlationId 标识,所以服务端在返回回去时,需要把处理好的消息的原本的correlationId 标识对应的设置回去。

(比如:客户端发来消息 A , A 携带的 correlationId 为 1 ,那么服务端在处理完 A 消息后,需要把 correlationId = 1 再设置回 这个消息 A (就是拿出来,处理完消息,再放回去),这样客户端在接收服务端返回来的处理过后的A消息时,才能根据 correlationId = 1 这个标识,得到想要的被处理过的A消息数据。
因为客户端发的消息可能有成千上万条,需要有这个 correlationId 作为消息的标识,才能准确拿到被处理过的想要的那条A消息)

服务端返回处理过的消息给客户端,也是一个发送消息的过程,所以发送消息指定的消息队列就是这个 replyTo(就是amq.gen-3xxx 这个默认消息队列),这个replyTo 也是从客户端发送来的消息中获取获取一个属性,作用是指定了服务端要将返回的消息送回到这个默认队列。

服务端把处理后的消息返回到 amq.gen-3xxx 这个默认消息队列,因为 客户端就是在监听amq.gen-3xxx 这个默认消息队列,所以客户端就能得到自己一开始发送给服务端,然后服务端处理完成后返回来的消息。

然后客户端就能根据 correlationId 这个标识,准确找到每个被处理修改过后的消息,而不至于找混。再根据需求去对处理过的消息进行业务操作。

(以上仅作为自己梳理代码流程的记录,大佬请直接忽略)
更简单点来说,就是

客户端声明并监听着默认队列 amq.gen-3xxx,然后发送消息到客户端,消息携带有correlationId 和 replyto 两个属性,路由key是rpc_queue,exchange是默认的。

服务端声明并监听 rpc_queue 消息队列,从该队列得到消息(messageA)后,从每个消息中获取该消息对应的 correlationId 和 replyto 两个属性的属性值,然后处理消息,对于处理完的消息(resultMessageA),需要把correlationId 和 replyto 两个属性的属性值重新设置回给resultMessageA,在通过Exchange分发回给 replyto 属性值中指定的消息队列(amq.gen-3xxx)。

然后客户端再从 amq.gen-3xxx 默认的消息队列中获取服务端处理并返回回来的消息,进行对应的消费。

ConstantUtil 常量工具类

在这里插入图片描述

ConnectionUtil RabbitMQ连接工具类

在这里插入图片描述

Server 服务端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Client 客户端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试结果

启动测试的时候,一定要先启动服务端,再启动客户端

服务端

在这里插入图片描述

客户端

在这里插入图片描述

QUEUE
在这里插入图片描述

完整代码

ConstantUtil 常量工具类

package cn.ljh.rabbitmq.util;//常量
public class ConstantUtil
{//消息队列实现 RPC(远程过程调用)模型 之 服务器端----------------//消息队列public final static String RPC_QUEUE = "rpc_queue";// ------------topic类型的Exchange,需要的相关常量----------public final static String QUEUET01 = "qt_01";public final static String QUEUET02 = "qt_02";// topic 通配符类型的 Exchangepublic static final String EXCHANGE_NAME_TOPIC = "myex03.topic";// Exchange 绑定 Queue 队列的路由key  ,通配符类型      *:匹配一个单词。#:匹配零个或多个单词。public static final String[] ROUTING_TOPIC_PATTERNS = {"*.crazyit.*", "*.org", "edu.#"};// 生产者发送消息给Excahnge携带的路由keypublic static final String[] ROUTING_TOPIC_KEYS = { "www.crazyit.org", "www.crazyit.cn","edu.crazyit.org", "crazyit.org", "fkjava.org", "edu.fkjava.org", "edu.fkjava", "edu.org"};//-------------------------------------------------------// 消息队列的名称public final static String QUEUE01 = "queue_01";public final static String QUEUE02 = "queue_02";// Exchange的名称public static final String EXCHANGE_NAME = "myex02.direct";// 三个路由key定义成一个数组的名称public static final String[] ROUTING_KEYS = {"info", "error", "warning"};}

ConnectionUtil RabbitMQ连接工具类

package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory =  new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}

Server 服务端

package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC(远程过程调用)模型 之 服务器端
public class Server
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、使用系统自动创建的默认Exchange,无需声明Exchange//消息队列设置为独占(exclusive:true)--------------------------------------------------------------------------------------声明消息队列channel.queueDeclare(ConstantUtil.RPC_QUEUE,true,   /* 是否持久化 */true,  /* 是否只允许只有这个消息队列的消息消费者才可以消费这个消息队列的消息 */false, /* 是否自动删除 */null); /* 指定这个消息队列的额外参数属性 *///不需要关闭资源,因为它也要监听自己消费消息的队列//4、调用Channel 的 basicConsume()方法开始消费消息----------------------------------------------------------------------------1、服务端监听并消费消息channel.basicConsume(ConstantUtil.RPC_QUEUE, /* 消费这个名字的消费队列里面的消息 */true,new DefaultConsumer(channel){//处理消息:当这个 ConstantUtil.RPC_QUEUE 消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /* 消息的那些属性 */,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来,此处读取到的消息,就相当于调用参数-------------------------------------------------------2、服务端获取消息队列的消息String param = new String(body, StandardCharsets.UTF_8);//之前只需要用到消息,现在需要额外读取消息里面携带的两个属性:reply_to 和 correlation_id//消息的属性都存放在 AMQP.BasicProperties 这个属性里面,从这个属性获取 reply_to 和 correlation_idString replyTo = properties.getReplyTo();System.err.println("replyTo:  " + replyTo);String correlationId = properties.getCorrelationId();System.err.println("correlationId:  " + correlationId);//调用服务器的处理消息的方法,最终得到处理后的结果。该方法可以是任意的业务处理,该方法的返回值result是要被送回客户端的。------3、服务端处理消费消息String result = format(param);//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("服务端 收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(), envelope.getRoutingKey(), param);//发送消息的方法,需要把返回值result发送回客户端-------------------------------------------------------4、服务端处理消费完的消息返回客户端的操作channel.basicPublish("", /* 使用默认的Exchange */replyTo,/* 此处的routing key 应该填 reply_to 属性;  reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 *///把从客户端的 AMQP.BasicProperties 属性获取到的correlationId,再作为参数传回去,用于客户端和服务器的匹配。new AMQP.BasicProperties().builder().correlationId(correlationId) /* 也需要返回额外的 correlation_id,要与从客户端消息中读取的 correlation_id 完全一样 */.deliveryMode(2) /* 设置这个消息是持久化类型的 */.build(), /*这个.build()的作用就是构建得到这个 BasicProperties 对象,这个对象就包含了 correlationId 属性因为服务器端返回的消息一定要有这个correlationId。 */result.getBytes(StandardCharsets.UTF_8));}});}//模拟服务器端消费消息要做的处理业务逻辑操作public static String format(String name){//此处模拟让服务器处理这里的业务有快有慢的情况,看correlation_id 能不能还是把数据对应上int rand = (new Random().nextInt(40) + 20) * 30;try{Thread.sleep(rand);} catch (InterruptedException e){e.printStackTrace();}return "《" + name + "》";}}

Client 客户端

package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC(远程过程调用)模型 之 客户端
public class Client
{// paramMap 保存了 correlationid 与 参数(消息)之间的对应关系public static Map<String, String> paramMap = new ConcurrentHashMap<>();//客户端发送的消息(参数)public static String[] params = new String[]{"火影忍者", "七龙珠", "哆啦A梦", "蜡笔小新"};public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,声明一个有 RabbitMQ 自动创建的、自动命名的、持久化的、独享的、会自动删除的【默认队列】AMQP.Queue.DeclareOk declareOk = channel.queueDeclare();System.out.println("declareOk  "+declareOk);//.getQueue() 用于得到默认队列的返回值,也就是默认队列的名字,之前声明是我们自己设置队列名,这里用默认的队列,就用.getQueue() 得到队列名。String queueName = declareOk.getQueue();System.out.println("queueName: "+queueName);//4、调用Channel 的 basicConsume()方法开始处理消费消息-----------------------------------------------------------------2、客户端监听服务端处理完消息后返回来的消息channel.basicConsume(queueName /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String resultMessage = new String(body, StandardCharsets.UTF_8);//此处,需要指定每个返回值对应的是哪个参数,靠的就是correlation_idString correlationId = properties.getCorrelationId();//根据服务器端返回的消息中的correlation_id 获取对应的参数String param = paramMap.get(correlationId);System.err.println("客户端发出去的消息内容:"+param +"  , 服务端处理后返回来的消息内容:"+resultMessage);//printf:格式化输出函数   %s:输出字符串  %n:换行System.out.printf("客户端 收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(), envelope.getRoutingKey(), resultMessage);//得到服务器的返回值之后,整个调用过程就完成了,此时就应该从 Map 中删除这组 key-value对了( correlationId 与 参数的对应关系 )。paramMap.remove(correlationId);}});//客户端发送消息---------------------------------------------------------------------------代码运行后先执行这段--------------------1、客户端发送消息for (int i = 0 ; i < params.length ; i++){paramMap.put( i + "" , params[i] );channel.basicPublish("", /* 使用默认的Exchange */ConstantUtil.RPC_QUEUE, /* 客户端发送消息携带的路由key是服务端监听的消息队列的名字,且使用了默认的Exchange,这就意味着消息会被发送给服务器监听的那个消息队列 */new AMQP.BasicProperties().builder().correlationId(i + "")  /* 设置 correlation_id 属性;  correlation_id:该属性指定了服务器返回的消息也要添加相同的correlation_id属性*/.replyTo(queueName)     /* reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 , 设置 reply_to 属性 */.deliveryMode(2)  /* 持久化消息 */.build(),          /* 构建这个BasicProperties对象,这个对象主要存这个correlationId属性 */params[i].getBytes(StandardCharsets.UTF_8));}}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmq_rpc</artifactId><version>1.0.0</version><name>rabbitmq_rpc</name><!--  属性  --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!--  依赖  --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies></project>

这篇关于205、使用消息队列实现 RPC(远程过程调用)模型的 服务器端 和 客户端的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/211195

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传