java grpc四种模式介绍

2024-09-06 11:44
文章标签 java 模式 介绍 四种 grpc

本文主要是介绍java grpc四种模式介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

GRPC功能

一.GRPC的响应模式

1.GRPC的四种响应模式

(1)UNARY(简单模式)
  • 也称简单 RPC,即客户端发起一次请求,服务端响应处理后返回一个结果给客户端。
(2) SERVER_STREAMING(服务端流模式)
  • 客户端发起一次请求,服务端可以连续返回数据流(即分批次返回场景)。
(3)CLIENT_STREAMING(客户端流模式)
  • 服务端数据流模式相反,客户端持续向服务端发送数据流,在发送结束后,由服务端返回一个响应。
(4)BIDI_STREAMING(双向流模式)
  • 客户端和服务端都可以向对方多次收发数据。
(5)对应io.grpc.MethodDescriptor.MethodType的枚举
public enum MethodType {/*** One request message followed by one response message.*/UNARY,/*** Zero or more request messages with one response message.*/CLIENT_STREAMING,/*** One request message followed by zero or more response messages.*/SERVER_STREAMING,/*** Zero or more request and response messages arbitrarily interleaved in time.*/BIDI_STREAMING,/*** Cardinality and temporal relationships are not known. Implementations should not make* buffering assumptions and should largely treat the same as {@link #BIDI_STREAMING}.*/UNKNOWN;/*** Returns {@code true} for {@code UNARY} and {@code SERVER_STREAMING}, which do not permit the* client to stream.** @since 1.0.0*/public final boolean clientSendsOneMessage() {return this == UNARY || this == SERVER_STREAMING;}/*** Returns {@code true} for {@code UNARY} and {@code CLIENT_STREAMING}, which do not permit the* server to stream.** @since 1.0.0*/public final boolean serverSendsOneMessage() {return this == UNARY || this == CLIENT_STREAMING;}}

2. proto文件定义

// @link https://github.com/grpc/grpc-java/blob/master/examples/src/main/proto/grpc/examples/echo/echo.proto
syntax = "proto3";import "google/protobuf/any.proto";option java_multiple_files = true;
//生成java代码的package
option java_package = "com.zzc.rpc.grpc.entity";
//创建的javaBean的文件名
//option java_outer_classname = "DemoProto";
//可以生成rpc接口
//option java_generic_services = true;//声明一个服务名称
service DemoService {//request is unary echo. 简单模式rpc unaryRequest (RequestGrpc) returns (ResponseGrpc) {}// serverStreamingRequest is server side streaming.服务端流模式rpc serverStreamingRequest(RequestGrpc) returns (stream ResponseGrpc) {}// clientStreamingRequest is client side streaming. 客户端流模式rpc clientStreamingRequest(stream RequestGrpc) returns (ResponseGrpc) {}// bidirectionalStreamingRequest is bidi streaming. 双向流模式rpc bidirectionalStreamingRequest(stream RequestGrpc) returns (stream ResponseGrpc) {}
}message MetadataGrpc {string type = 3;string clientIp = 8;map<string, string> headers = 7;
}message RequestGrpc {MetadataGrpc metadata = 2;google.protobuf.Any body = 3;
}message ResponseGrpc {int32 code = 5;string msg = 6;MetadataGrpc metadata = 2;google.protobuf.Any body = 3;
}

3.代码示例

(1)代码结构

代码目录结构

(2)maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.zzc.rpc</groupId><artifactId>rpc-design</artifactId><version>1.0-SNAPSHOT</version></parent><groupId>com.zzc.rpc.grpc</groupId><artifactId>grpc-demo</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><grpc.version>1.65.1</grpc.version><protostuff.version>1.8.0</protostuff.version></properties><dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>${protostuff.version}</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>${protostuff.version}</version></dependency><!-- log4j2日志门面 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></dependency><!-- log4j2日志框架 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></dependency><!-- slf4j日志门面 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><!--log4j2的适配器,为slf4j绑定日志框架 --><!-- 依赖org.slf4j:slf4j-api:1.7.25 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency></dependencies><build><!-- SPI机制加载类 --><resources><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>*.*</include><include>META-INF/services/*</include></includes></resource></resources><!-- 通过proto文件生成java文件相关 --><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.5.0.Final</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.22.4:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.65.1:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
(3)通过proto生成grpc相关的java文件

生成Java文件

(4)具体实现
entity实现对像
  • Payload 接口,继承实现该接口的,会被ServerLoader加载
package com.zzc.rpc.grpc.entity;public interface Payload {}
  • Request 请求对象
package com.zzc.rpc.grpc.entity;import java.util.Map;
import java.util.TreeMap;public class Request implements Payload {/*** 创建header集合,并且不根据key字符大小写匹配排序的*/private final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);private String requestId;public void putHeader(String key, String value) {headers.put(key, value);}public void putAllHeader(Map<String, String> headers) {if (headers == null || headers.isEmpty()) {return;}this.headers.putAll(headers);}public String getHeader(String key) {return headers.get(key);}public String getHeader(String key, String defaultValue) {String value = headers.get(key);return (value == null) ? defaultValue : value;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Map<String, String> getHeaders() {return headers;}public void clearHeaders() {this.headers.clear();}
}
  • Response 响应类
package com.zzc.rpc.grpc.entity;public class Response implements Payload {private int resultCode = 0;private int code;private String msg;private String requestId;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public boolean isSuccess() {return this.resultCode == 0;}public int getResultCode() {return resultCode;}public void setResultCode(int resultCode) {this.resultCode = resultCode;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}public int getCode() {return code;}public void setCode(int code) {this.code = code;}public void setErrorInfo(int errorCode, String errorMsg) {this.resultCode = errorCode;this.code = errorCode;this.msg = errorMsg;}public static Response build() {return new Response();}public Response code(int code) {this.code = code;return this;}public Response msg(String msg) {this.msg = msg;return this;}}
  • 请求头

package com.zzc.rpc.grpc.entity;public class RequestMeta {private String connectionId = "";private String clientIp = "";private String clientVersion = "";public String getClientVersion() {return clientVersion;}public void setClientVersion(String clientVersion) {this.clientVersion = clientVersion;}public String getConnectionId() {return connectionId;}public void setConnectionId(String connectionId) {this.connectionId = connectionId;}public String getClientIp() {return clientIp;}public void setClientIp(String clientIp) {this.clientIp = clientIp;}}
在resources中添加SPI扫描配置
  • 在resources下新建META-INF/services/目录,然后新建接口全限定名的文件:com.zzc.rpc.grpc.entity.Payload,里面加上我们需要用到的实现类。
com.zzc.rpc.grpc.entity.Request
com.zzc.rpc.grpc.entity.Response
添加utils工具类
  • ByteBufferBackedInputStream buffer对象读取
package com.zzc.rpc.grpc.utils;import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;/*** buff读取*/
public class ByteBufferBackedInputStream extends InputStream {protected final ByteBuffer _b;public ByteBufferBackedInputStream(ByteBuffer buf) { _b = buf; }@Override public int available() { return _b.remaining(); }@Overridepublic int read() throws IOException { return _b.hasRemaining() ? (_b.get() & 0xFF) : -1; }@Overridepublic int read(byte[] bytes, int off, int len) throws IOException {if (!_b.hasRemaining()) return -1;len = Math.min(len, _b.remaining());_b.get(bytes, off, len);return len;}
}
  • PayloadRegistry,加载继承Payload的类,实际上Request和Response可以做成abstract类型,然后接收对象只有继承Payload就可以注册了
package com.zzc.rpc.grpc.utils;import com.zzc.rpc.grpc.entity.Payload;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;/*** 加载继承Payload的类,实际上Request和Response可以做成abstract类型,然后接收对象只有继承Payload就可以注册了*/
public class PayloadRegistry {private static final Logger log = LoggerFactory.getLogger(PayloadRegistry.class);private static final Map<String, Class<?>> REGISTRY_REQUEST = new HashMap<>();static boolean initialized = false;public static void init() {log.info("start init PayloadRegistry...");scan();}private static synchronized void scan() {if (initialized) {return;}log.info("start scan");ServiceLoader<Payload> payloads = ServiceLoader.load(Payload.class);for (Payload payload : payloads) {log.info("scan classname:{}, class:{}", payload.getClass().getSimpleName(), payload.getClass());register(payload.getClass().getSimpleName(), payload.getClass());}initialized = true;}static void register(String type, Class<?> clazz) {if (Modifier.isAbstract(clazz.getModifiers())) {//抽象类型的不注册return;}if (REGISTRY_REQUEST.containsKey(type)) {throw new RuntimeException(String.format("Fail to register, type:%s ,clazz:%s ", type, clazz.getName()));}log.info("register type:{}, class:{}", type, clazz);REGISTRY_REQUEST.put(type, clazz);}public static Class<?> getClassByType(String type) {return REGISTRY_REQUEST.get(type);}public static void main(String[] args) {init();}
}
  • GRPCUtils 工具类,转换Request、Response等对象
package com.zzc.rpc.grpc.utils;import com.alibaba.fastjson2.JSON;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.zzc.rpc.grpc.entity.MetadataGrpc;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.RequestMeta;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;public class GRPCUtils {private static final Logger log = LoggerFactory.getLogger(GRPCUtils.class);public static RequestGrpc convert(Request request, RequestMeta meta) {RequestGrpc.Builder payloadBuilder = RequestGrpc.newBuilder();MetadataGrpc.Builder metadataBuilder = MetadataGrpc.newBuilder();if (meta != null) {metadataBuilder.putAllHeaders(request.getHeaders()).setType(request.getClass().getSimpleName());}metadataBuilder.setClientIp("127.0.0.1");payloadBuilder.setMetadata(metadataBuilder.build());// request body .byte[] jsonBytes = convertRequestToByte(request);return payloadBuilder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).build();}public static RequestGrpc convert(Request request) {MetadataGrpc newMeta = MetadataGrpc.newBuilder().setType(request.getClass().getSimpleName()).setClientIp("127.0.0.1").putAllHeaders(request.getHeaders()).build();byte[] jsonBytes = convertRequestToByte(request);RequestGrpc.Builder builder = RequestGrpc.newBuilder();return builder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).setMetadata(newMeta).build();}private static byte[] convertRequestToByte(Request request) {Map<String, String> requestHeaders = new HashMap<>(request.getHeaders());request.clearHeaders();byte[] jsonBytes = JSON.toJSONBytes(request);request.putAllHeader(requestHeaders);return jsonBytes;}public static ResponseGrpc convert(Response response) {byte[] jsonBytes = JSON.toJSONBytes(response);MetadataGrpc.Builder metaBuilder = MetadataGrpc.newBuilder().setType(response.getClass().getSimpleName());return ResponseGrpc.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).setMetadata(metaBuilder.build()).build();}public static <T> T parse(RequestGrpc payload) {Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType());log.info("parse classType:{}", classType);if (classType != null) {ByteString byteString = payload.getBody().getValue();ByteBuffer byteBuffer = byteString.asReadOnlyByteBuffer();T obj = JSON.parseObject(new ByteBufferBackedInputStream(byteBuffer), classType);if (obj instanceof Request) {((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());}return obj;} else {throw new RuntimeException( "Unknown payload type:" + payload.getMetadata().getType());}}public static <T> T parse(ResponseGrpc payload) {Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType());log.info("parse classType:{}", classType);if (classType != null) {ByteString byteString = payload.getBody().getValue();ByteBuffer byteBuffer = byteString.asReadOnlyByteBuffer();T obj = JSON.parseObject(new ByteBufferBackedInputStream(byteBuffer), classType);if (obj instanceof Request) {((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());}return obj;} else {throw new RuntimeException( "Unknown payload type:" + payload.getMetadata().getType());}}}
应用实现
  • Grpc服务端实现
package com.zzc.rpc.grpc;import com.alibaba.fastjson2.JSON;
import com.zzc.rpc.grpc.entity.DemoServiceGrpc;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import com.zzc.rpc.grpc.utils.GRPCUtils;
import com.zzc.rpc.grpc.utils.PayloadRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class GrpcServer {static {//使用spi加载继承Payload的类PayloadRegistry.init();}private int port = 8001;private Server server;private void start() {try {server = ServerBuilder.forPort(port).addService(new GrpcServerImpl()).build().start();} catch (Exception e) {log.error("start error.");}Runtime.getRuntime().addShutdownHook(new Thread(){@Overridepublic void run() {GrpcServer.this.stop();}});}private void stop() {if (server != null) {server.isShutdown();}}private void blockUntilShutdown() {if (server != null) {try {server.awaitTermination();} catch (InterruptedException e) {log.error("blockUntilShutdown error.", e);}}}class GrpcServerImpl extends DemoServiceGrpc.DemoServiceImplBase {/*** 简单模式*/@Overridepublic void unaryRequest(RequestGrpc requestGrpc, StreamObserver<ResponseGrpc> responseObserver) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive unaryRequest data:{}", JSON.toJSONString(request));//返回请求Response response = Response.build().code(0).msg("UNARY request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}/*** 服务端流模式*/@Overridepublic void serverStreamingRequest(RequestGrpc requestGrpc, StreamObserver<ResponseGrpc> responseObserver) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive serverStreamingRequest data:{}", JSON.toJSONString(request));//返回请求Response response = Response.build().code(0).msg("SERVER_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}/*** 客户端流模式,可以多次接收客户端的请求,然后只返回一次*/@Overridepublic StreamObserver<RequestGrpc> clientStreamingRequest(StreamObserver<ResponseGrpc> responseObserver) {return new StreamObserver<RequestGrpc>() {@Overridepublic void onNext(RequestGrpc requestGrpc) {//多次接收客户端的数据Request request = GRPCUtils.parse(requestGrpc);log.info("receive clientStreamingRequest data:{}", JSON.toJSONString(request));}@Overridepublic void onError(Throwable throwable) {responseObserver.onError(new StatusException(Status.INTERNAL));log.error("clientStreamingRequest error.", throwable);}@Overridepublic void onCompleted() {//然后返回一次Response response = Response.build().code(0).msg("CLIENT_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}};}/*** 双向流模式*/@Overridepublic StreamObserver<RequestGrpc> bidirectionalStreamingRequest(StreamObserver<ResponseGrpc> responseObserver) {StreamObserver<RequestGrpc> streamObserver = new StreamObserver<RequestGrpc>() {@Overridepublic void onNext(RequestGrpc requestGrpc) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive bidirectionalStreamingRequest data:{}", JSON.toJSONString(request));Response response = Response.build().code(0).msg("SERVER_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));}@Overridepublic void onError(Throwable throwable) {responseObserver.onError(new StatusException(Status.INTERNAL));log.error("bidirectionalStreamingRequest error.", throwable);}@Overridepublic void onCompleted() {Response response = Response.build().code(0).msg("BIDI_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}};return streamObserver;}}public static void main(String[] args) {GrpcServer grpcServer = new GrpcServer();grpcServer.start();grpcServer.blockUntilShutdown();}}
  • Grpc客户端实现
package com.zzc.rpc.grpc;import com.alibaba.fastjson2.JSON;
import com.zzc.rpc.grpc.entity.DemoServiceGrpc;
import com.zzc.rpc.grpc.entity.Payload;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import com.zzc.rpc.grpc.utils.GRPCUtils;
import com.zzc.rpc.grpc.utils.PayloadRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;@Slf4j
public class GrpcClient {static {PayloadRegistry.init();}private final ManagedChannel channel;private final DemoServiceGrpc.DemoServiceBlockingStub blockingStub;private final DemoServiceGrpc.DemoServiceStub stub;public GrpcClient(String host, int port) {this.channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();this.blockingStub = DemoServiceGrpc.newBlockingStub(channel);this.stub = DemoServiceGrpc.newStub(channel);}public void shutdown() {try {channel.shutdown().awaitTermination(30, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("");}}public void unarySend() {//Payload request = Payload.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(JSON.toJSONBytes(name)))).build();Request request = new Request();request.setRequestId("aaaaaaaaaaaaaaaaaaa");request.putHeader("Test", "teste-hader");RequestGrpc requestGrpc = GRPCUtils.convert(request);ResponseGrpc responseGrpc = blockingStub.unaryRequest(requestGrpc);Response rev = GRPCUtils.parse(responseGrpc);log.info("response msg:{}", JSON.toJSONString(rev));}public void clientStreamingSend() {Request request1 = new Request();request1.setRequestId("bbbbbbbbbbbbbbb");request1.putHeader("Test", "teste-hader");RequestGrpc requestGrpc1 = GRPCUtils.convert(request1);Request request2 = new Request();request2.setRequestId("bbbbbbbbbbbbbbb");request2.putHeader("Test", "teste-hader");RequestGrpc requestGrpc2 = GRPCUtils.convert(request2);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {Response response = GRPCUtils.parse(responseGrpc);log.info("onNext response msg:{}, response type:{}", JSON.toJSONString(response), responseGrpc.getMetadata().getType());}@Overridepublic void onError(Throwable throwable) {log.info("onError");}@Overridepublic void onCompleted() {//发送完数据之后的处理,比如多线程的并发执行的 countDown就可以在这里执行log.info("onCompleted clientStreamingSend.");}};StreamObserver<RequestGrpc> clientStreamingRequest = stub.clientStreamingRequest(streamObserver);try {clientStreamingRequest.onNext(requestGrpc1);//发送第一次clientStreamingRequest.onNext(requestGrpc2);//发送第二次clientStreamingRequest.onCompleted();} catch (Exception e) {clientStreamingRequest.onError(e);log.error("clientStreamingSend error.", e);}}public void serverStreamingSend() {Request request = new Request();request.setRequestId("ddddddddddddddddddddddddd");request.putHeader("Test", "teste-hader");RequestGrpc requestGrpc = GRPCUtils.convert(request);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {//服务端返回的结果,可以通过事件或接口回调等方式传递给外部使用Response response = GRPCUtils.parse(responseGrpc);log.info("serverStreamingSend response:{}", JSON.toJSONString(response));}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {//发送完数据之后的处理,比如多线程的并发执行的 countDown就可以在这里执行}};stub.serverStreamingRequest(requestGrpc, streamObserver);//同步阻塞执行/*Iterator<ResponseGrpc> responseGrpcs = blockingStub.serverStreamingRequest(requestGrpc);while (responseGrpcs.hasNext()) {ResponseGrpc responseGrpc = responseGrpcs.next();Response response = GRPCUtils.parse(responseGrpc);log.info("serverStreamingSend response:{}", JSON.toJSONString(response));}*/}private void bidirectionalStreamingSend() {CountDownLatch countDownLatch = new CountDownLatch(2);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {Response response = GRPCUtils.parse(responseGrpc);log.info("bidirectionalStreamingSend response:{}", JSON.toJSONString(response));}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();}};StreamObserver<RequestGrpc> bidirectionalStreamingRequest = stub.bidirectionalStreamingRequest(streamObserver);for (int i = 0; i < 4; i ++) {Request request = new Request();request.setRequestId("eeeeeeeeeeeeeeeeeeeeeeeee");request.putHeader("Test", "teste-hader");bidirectionalStreamingRequest.onNext(GRPCUtils.convert(request));}bidirectionalStreamingRequest.onCompleted();}public static void main(String[] args) throws InterruptedException {GrpcClient client = new GrpcClient("127.0.0.1", 8001);//简单模式client.unarySend();Thread.sleep(5000);//客户端流模式client.clientStreamingSend();Thread.sleep(5000);//服务端流模式client.serverStreamingSend();Thread.sleep(5000);//双向流模式client.bidirectionalStreamingSend();Thread.sleep(5000);client.shutdown();Thread.sleep(5000);//等待channel的消息传输完成等}}

这篇关于java grpc四种模式介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141913

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数