RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)

2024-02-25 19:38

本文主要是介绍RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

我的开发环境:

操作系统: Windows7 64bit 

开发环境: JDK 1.7 - 1.7.0_55

开发工具: Eclipse Kepler SR2

RabbitMQ版本:  3.6.0

Elang版本: erl7.2.1

关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。

源码地址

https://github.com/chwshuang/rabbitmq.git


        本教程中,我们将学习使用工作队列让多个消费者端来执行耗时的任务。比如我们需要通过远程服务器帮我们计算某个结果。这种模式通常被称之为远程方法调用或RPC.

        我们通过RabbitMQ搭建一个RPC系统,一个客户端和一个RPC服务器,客户端有一个斐波那契数列方面的问题需要解决(Fibonacci numbers),RPC服务器负责技术收到这个消息,然后计算结果,并且返回这个斐波那契数列。


客户端接口

        我们需要创建一个简单的客户端类,通过调用客户端的call方法,来计算结果。

[plain]  view plain copy
  1. FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();     
  2. String result = fibonacciRpc.call("4");  
  3. System.out.println( "fib(4) is " + result);  

远程方法调用的注意事项:

        RPC在软件开发中非常常见,也经常被批评。当一个程序员对代码不熟悉的时候,跟踪RPC的性能问题是出在本地还是远程服务器就非常麻烦,对于RPC的使用,有几点需要特别说明:

  • 使用远程调用时的本地函数最好独立出来
  • 保证代码组件之间的依赖关系清晰明了,并用日志记录不同的执行过程和时间
  • 发生客户端运行缓慢或者假死时,先确认RPC服务器是否还活着!
  • 尽量使用异步队列来处理RPC请求,尽量不要用同步阻塞的方式运行RPC请求

回调队列

        在RabbitMQ的RPC中,客户端发送请求后,还需要得到一个响应结果,我们需要像下面这样,在发送请求时,带上一个回调队列:

[plain]  view plain copy
  1. callbackQueueName = channel.queueDeclare().getQueue();  
  2.   
  3. BasicProperties props = new BasicProperties  
  4.                             .Builder()  
  5.                             .replyTo(callbackQueueName)  
  6.                             .build();  
  7.   
  8. channel.basicPublish("", "rpc_queue", props, message.getBytes());  
  9.   
  10. // ... then code to read a response message from the callback_queue ...  
        上面代码中,我们需要引入一个新的类
[plain]  view plain copy
  1. import com.rabbitmq.client.AMQP.BasicProperties;  


消息属性

传输一条消息,AMQP协议预定义了14个属性,下面几个是使用比较频繁的几个属性:

  • deliveryMode:配置一个消息是否持久化。(2表示持久化)这个在第二章中有说明。
  • contentType :用来描述编码的MIME类型。与html的MIME类型类似,例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。
  • replyTo : 回调队列的名称。
  • correlationId:RPC响应请求的相关编号。这个在下一节讲。


关联编号  Correlation Id

        如果一个客户端有很多的计算任务,按照上面的代码,我们会为每个任务创建一个请求,然后等待返回的结果,这种方法貌似很耗时,如果把所有的任务都放到同一个连接中,那么我们又没法分辨出返回的结果是那个任务的?为了解决这个问题,RabbitMQ提供了一个correlationid属性来解决这个问题。RabbitMQ为每个请求提供唯一的编号,然后在返回队列里如果看到了这个编号,就知道我们的任务处理完成了,如果收到的编号不认识,就可以安全的忽略。

        你可能会疑问,如果忽略了,那么想知道这个返回结果的客户端是不是就收不到这个结果了?这个基本上不会出现,但是,理论上也可能发生,例如一个RPC服务器,在发送确认消息前挂了,你收到的消息可能就是不完整的。这种情况,RabbitMQ会重新发送任务处理请求。这也是为什么客户端必须处理这些重复请求以及RPC启用幂次模式。


总结:



RPC工作方式:

  1. 当客户端启动时,会创建一个匿名的回调队列
  2. 在RPC请求中,定义了两个属性:replyTo,表示回调队列的名称; correlationId,表示请求任务的唯一编号,用来区分不同请求的返回结果。
  3. 将请求发送到rpc_queue队列中
  4. RPC服务器等待rpc_queue队列的请求,如果有消息,就处理,它将计算结果发送到请求中的回调队列里。
  5. 客户端监听回调队列中的消息,如果有返回消息,它根据回调消息中的correlationid进行匹配计算结果。


工程代码


计算斐波那契数列的方法

[plain]  view plain copy
  1. private static int fib(int n) throws Exception {  
  2.     if (n == 0) return 0;  
  3.     if (n == 1) return 1;  
  4.     return fib(n-1) + fib(n-2);  
  5. }  

这个方法只是用来讲解我们的教程,你可别拿它在生产环境跑大数据!下面是客户端的代码

服务器端代码:

RPCServer.java

第一步仍然是建立连接、频道和声明队列。

如果我们运行多个RPC服务器,为了达到负载均衡,需要通过channel.basicQos来设置从队列中预取消息的个数。

我们通过basicConsume 访问队列,如果后消息任务来了,我们就开始工作,并将结果发送到回调队列中。

[plain]  view plain copy
  1. import com.rabbitmq.client.ConnectionFactory;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.QueueingConsumer;  
  5. import com.rabbitmq.client.AMQP.BasicProperties;  
  6.   
  7. public class RPCServer {  
  8.   
  9.     private static final String RPC_QUEUE_NAME = "rpc_queue";  
  10.   
  11.     private static int fib(int n) {  
  12.         if (n == 0)  
  13.             return 0;  
  14.         if (n == 1)  
  15.             return 1;  
  16.         return fib(n - 1) + fib(n - 2);  
  17.     }  
  18.   
  19.     public static void main(String[] argv) {  
  20.         Connection connection = null;  
  21.         Channel channel = null;  
  22.         try {  
  23.             ConnectionFactory factory = new ConnectionFactory();  
  24.             factory.setHost("localhost");  
  25.   
  26.             connection = factory.newConnection();  
  27.             channel = connection.createChannel();  
  28.   
  29.             channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  
  30.   
  31.             channel.basicQos(1);  
  32.   
  33.             QueueingConsumer consumer = new QueueingConsumer(channel);  
  34.             channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
  35.   
  36.             System.out.println("RPCServer [x] Awaiting RPC requests");  
  37.   
  38.             while (true) {  
  39.                 String response = null;  
  40.   
  41.                 QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  42.   
  43.                 BasicProperties props = delivery.getProperties();  
  44.                 BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();  
  45.   
  46.                 try {  
  47.                     String message = new String(delivery.getBody(), "UTF-8");  
  48.                     int n = Integer.parseInt(message);  
  49.   
  50.                     System.out.println("RPCServer [.] fib(" + message + ")");  
  51.                     response = "" + fib(n);  
  52.                 } catch (Exception e) {  
  53.                     System.out.println(" [.] " + e.toString());  
  54.                     response = "";  
  55.                 } finally {  
  56.                     channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));  
  57.   
  58.                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
  59.                 }  
  60.             }  
  61.         } catch (Exception e) {  
  62.             e.printStackTrace();  
  63.         } finally {  
  64.             if (connection != null) {  
  65.                 try {  
  66.                     connection.close();  
  67.                 } catch (Exception ignore) {  
  68.                 }  
  69.             }  
  70.         }  
  71.     }  
  72. }  

客户端代码

RPCClient.java

[plain]  view plain copy
  1. import com.rabbitmq.client.ConnectionFactory;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.QueueingConsumer;  
  5. import com.rabbitmq.client.AMQP.BasicProperties;  
  6. import java.util.UUID;  
  7.   
  8. public class RPCClient {  
  9.   
  10.     private Connection connection;  
  11.     private Channel channel;  
  12.     private String requestQueueName = "rpc_queue";  
  13.     private String replyQueueName;  
  14.     private QueueingConsumer consumer;  
  15.   
  16.     public RPCClient() throws Exception {  
  17.         ConnectionFactory factory = new ConnectionFactory();  
  18.         factory.setHost("localhost");  
  19.         connection = factory.newConnection();  
  20.         channel = connection.createChannel();  
  21.   
  22.         replyQueueName = channel.queueDeclare().getQueue();  
  23.         consumer = new QueueingConsumer(channel);  
  24.         channel.basicConsume(replyQueueName, true, consumer);  
  25.     }  
  26.   
  27.     public String call(String message) throws Exception {  
  28.         String response = null;  
  29.         String corrId = UUID.randomUUID().toString();  
  30.   
  31.         BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();  
  32.   
  33.         channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));  
  34.   
  35.         while (true) {  
  36.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  37.             if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
  38.                 response = new String(delivery.getBody(), "UTF-8");  
  39.                 break;  
  40.             }  
  41.         }  
  42.   
  43.         return response;  
  44.     }  
  45.   
  46.     public void close() throws Exception {  
  47.         connection.close();  
  48.     }  
  49.   
  50.     public static void main(String[] argv) {  
  51.         RPCClient fibonacciRpc = null;  
  52.         String response = null;  
  53.         try {  
  54.             fibonacciRpc = new RPCClient();  
  55.   
  56.             System.out.println("RPCClient [x] Requesting fib(30)");  
  57.             response = fibonacciRpc.call("30");  
  58.             System.out.println("RPCClient [.] Got '" + response + "'");  
  59.         } catch (Exception e) {  
  60.             e.printStackTrace();  
  61.         } finally {  
  62.             if (fibonacciRpc != null) {  
  63.                 try {  
  64.                     fibonacciRpc.close();  
  65.                 } catch (Exception ignore) {  
  66.                 }  
  67.             }  
  68.         }  
  69.     }  
  70. }  
在客户端,我们也建立一个连接和通道,并声明一个专用的“回调”队列

我们设置调队列中的唯一编号和回调队列名称

然后我们发送任务消息到RPC服务器

接下来循环监听回调队列中的每一个消息,找到与我们刚才发送任务消息编号相同的消息


总结:

这里的例子只是RabbitMQ中RPC服务的一个实现,你也可以根据业务需要实现更多。

rpc有一个优点,如果一个RPC服务器处理不来,可以再增加一个、两个、三个。

我们的例子中的代码还比较简单,还有很多问题没有解决:

如果没有发现服务器,客户端如何处理?

如果客户端的RPC请求超时了怎么办?

如果服务器出现了故障,发生了异常,是否将异常发送到客户端?

在处理消息前,怎样防止无效的消息?检查范围、类型?


如果你想还想继续了解RabbitMQ,你可以在RabbitMQ中安装管理插件,然后查看消息队列。

这篇关于RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/746486

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟 开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚 第一站:海量资源,应有尽有 走进“智听

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定