本文主要是介绍手写RPC第一版,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
手写RPC第一版
- 前言
- 创建项目
- goods-api
- goods-provider
- user-server
- 最后
前言
通过学习,将自己掌握的的技术记录下来,以便后面学习。
创建项目
创建两个项目,一个用户系统user-server,一个商品系统goods-server。系统goods-server系统有两个Module分别是goods-api(商品系统的接口模块),goods-provider(提供具体的服务)。user-server和goods-server分别部署到不同的服务器上,模拟goods-server服务中提供一个IGoodsService接口,接口中描述两个方法,queryGoodsList(),queryGoodsById(String id)。user-server服务通过socket远程调用GoodsServer中的api。
接下来我们先写goods-server这个服务
goods-api
创建一个RpcRequest类,该类作为user-server和goods-server两个服务间通过Socket远程调用时传输的数据,所以实现Serializable,序列化及反序列化。
import java.io.Serializable;
import java.util.Arrays;/*** 用于socket传输数据,所以要实现Serializable接口。*/
public class RpcRequest implements Serializable {// 类名称private String className;// 方法名称private String methodName;// 方法参数private Object[] args;// 参数类型private Class[] types;public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Object[] getArgs() {return args;}public void setArgs(Object[] args) {this.args = args;}public Class[] getTypes() {return types;}public void setTypes(Class[] types) {this.types = types;}@Overridepublic String toString() {return "RpcRequest{" +"className='" + className + '\'' +", methodName='" + methodName + '\'' +", args=" + Arrays.toString(args) +", types=" + Arrays.toString(types) +'}';}
}
创建一个接口,用于客户端调用。
/*** 用户客户端调用的api*/
public interface IGoodsService {String queryGoodsList();String queryGoodsById(String id);
}
goods-provider
创建一个GoodsServiceImpl,实现IGoodsService中的方法
public class GoodsServiceImpl implements IGoodsService {@Overridepublic String queryGoodsList() {return "EXECUTE QUERY_LIST METHOD";}@Overridepublic String queryGoodsById(String id) {return "EXECUTE QUERY_INFO METHOD";}
}
创建一个RpcProxyServer类,用来提供发布服务的方法publisher(),其中正常的Socket通信,会产生IO阻塞(客户端A调用服务,处于阻塞状态时,客户端B是无法调用服务的),所以第一版本采用线程池的方式,去提高服务端处理多客户端请求的性能。ProcessHandler这个类用来处理服务端接收到的请求以及返回客户端的数据。
public class RpcProxyServer {private final ExecutorService executorService = Executors.newCachedThreadPool();public void publisher(Object service, int port) {ServerSocket serverSocket = null;try {serverSocket = new ServerSocket(port);// 阻塞while (true) {// 获得Socket对象Socket socket = serverSocket.accept();// 通过线程池提供服务处理客户端远程调用的能力executorService.execute(new ProcessHandler(service, socket));}} catch (IOException e) {e.printStackTrace();}}
}
ProcessHandler 类在run方法中通过ObjectInputStream获取客户端请求的参数RpcRequest,在通过invoke方法反射调用服务端的服务,在通过ObjectOutputStream将result返回给客户端。
public class ProcessHandler implements Runnable {private Object service;private Socket socket;public ProcessHandler(Object service, Socket socket) {this.service = service;this.socket = socket;}@Overridepublic void run() {ObjectInputStream inputStream = null;ObjectOutputStream outputStream = null;try {// 读取客户端请求过来的数据inputStream = new ObjectInputStream(socket.getInputStream());RpcRequest request = (RpcRequest) inputStream.readObject();// 根据客户端请求参数调用服务端具体的服务Object result = invoke(request);System.out.println("服务端处理的结果:" + result);// 将服务端返回的数据写会给客户端outputStream = new ObjectOutputStream(socket.getOutputStream());outputStream.writeObject(result);outputStream.flush();} catch (Exception e) {e.printStackTrace();} finally {// 关闭流if (inputStream != null) {try {inputStream.close();} catch (IOException e) {e.printStackTrace();}}if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}}}// 根据客户端请求参数调用服务端具体的服务private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {// 获取请求的类名String className = request.getClassName();// 获得具体的类Class<?> clazz = Class.forName(className);// 通过方法名称及参数类型,获得具体的方法Method method = clazz.getMethod(request.getMethodName(), request.getTypes());// 通过反射调用方法return method.invoke(service, request.getArgs());}
}
user-server
接下来我们先写user-server这个服务
首先写一个动态代理类,提供clientProxy方法,该方法需要传入客户端Socket连接的Ip,和端口,以及要代理哪个类
public class RpcProxyClient {public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[] {interfaceCls}, new RemoteInvocationHandler(host, port));}
}
代理对象的具体操作在RemoteInvocationHandler中去实现。首先组装RpcRequest参数,然后通过RpcNetTransport远程调用服务端
public class RemoteInvocationHandler implements InvocationHandler {private String host;private int port;public RemoteInvocationHandler(String host, int port) {this.host = host;this.port = port;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 组装RpcRequestRpcRequest request = new RpcRequest();request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setArgs(args);request.setTypes(method.getParameterTypes());// 调用远程服务RpcNetTransport netTransport = new RpcNetTransport(host, port);Object result = netTransport.send(request);return result;}
}
RpcNetTransport 提供一个newSocket()方法,通过host,port获取Socket连接。然后提供send()方法,远程调用服务端。
public class RpcNetTransport {private String host;private int port;public RpcNetTransport(String host, int port) {this.host = host;this.port = port;}public Socket newSocket() throws IOException {Socket socket = new Socket(host, port);return socket;}public Object send(RpcRequest request) {ObjectOutputStream outputStream = null;ObjectInputStream inputStream = null;try {// 获得Socket连接Socket socket = newSocket();// 向服务器写数据outputStream = new ObjectOutputStream(socket.getOutputStream());outputStream.writeObject(request);outputStream.flush();// 读取服务器返回数据inputStream = new ObjectInputStream(socket.getInputStream());Object result = inputStream.readObject();return result;} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();} finally {// 关闭流if (inputStream != null) {try {inputStream.close();} catch (IOException e) {e.printStackTrace();}}if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}}return null;}
}
服务端测试类
public class App {public static void main( String[] args ) {IGoodsService goodsService = new GoodsServiceImpl();RpcProxyServer server = new RpcProxyServer();server.publisher(goodsService, 8080);}
}
客户端测试类
public class App {public static void main( String[] args ) {RpcProxyClient client = new RpcProxyClient();IGoodsService goodsService = client.clientProxy(IGoodsService.class, "localhost", 8080);System.out.println(goodsService.queryGoodsById("test"));System.out.println(goodsService.queryGoodsList());System.out.println(goodsService.queryGoodsList());System.out.println(goodsService.queryGoodsList());System.out.println(goodsService.queryGoodsList());System.out.println(goodsService.queryGoodsList());System.out.println(goodsService.queryGoodsList());System.out.println(goodsService.queryGoodsList());}
}
服务端打印结果:
服务端处理的结果:EXECUTE QUERY_INFO METHOD
服务端处理的结果:EXECUTE QUERY_LIST METHOD
服务端处理的结果:EXECUTE QUERY_LIST METHOD
服务端处理的结果:EXECUTE QUERY_LIST METHOD
服务端处理的结果:EXECUTE QUERY_LIST METHOD
服务端处理的结果:EXECUTE QUERY_LIST METHOD
服务端处理的结果:EXECUTE QUERY_LIST METHOD
服务端处理的结果:EXECUTE QUERY_LIST METHOD
客户端打印结果:
EXECUTE QUERY_INFO METHOD
EXECUTE QUERY_LIST METHOD
EXECUTE QUERY_LIST METHOD
EXECUTE QUERY_LIST METHOD
EXECUTE QUERY_LIST METHOD
EXECUTE QUERY_LIST METHOD
EXECUTE QUERY_LIST METHOD
EXECUTE QUERY_LIST METHOD
最后
本文是自己学习后,变成自己的知识,默写出来的。主要是用来以后自己查看方便使用。下篇文章将基于这个版本实现通过注解的方式调用和发布服务。
这篇关于手写RPC第一版的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!