本文主要是介绍java grpc四种模式介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
GRPC功能
一.GRPC的响应模式
1.GRPC的四种响应模式
(1)UNARY(简单模式)
- 也称简单 RPC,即客户端发起一次请求,服务端响应处理后返回一个结果给客户端。
(2) SERVER_STREAMING(服务端流模式)
- 客户端发起一次请求,服务端可以连续返回数据流(即分批次返回场景)。
(3)CLIENT_STREAMING(客户端流模式)
- 服务端数据流模式相反,客户端持续向服务端发送数据流,在发送结束后,由服务端返回一个响应。
(4)BIDI_STREAMING(双向流模式)
- 客户端和服务端都可以向对方多次收发数据。
(5)对应io.grpc.MethodDescriptor.MethodType的枚举
public enum MethodType {/*** One request message followed by one response message.*/UNARY,/*** Zero or more request messages with one response message.*/CLIENT_STREAMING,/*** One request message followed by zero or more response messages.*/SERVER_STREAMING,/*** Zero or more request and response messages arbitrarily interleaved in time.*/BIDI_STREAMING,/*** Cardinality and temporal relationships are not known. Implementations should not make* buffering assumptions and should largely treat the same as {@link #BIDI_STREAMING}.*/UNKNOWN;/*** Returns {@code true} for {@code UNARY} and {@code SERVER_STREAMING}, which do not permit the* client to stream.** @since 1.0.0*/public final boolean clientSendsOneMessage() {return this == UNARY || this == SERVER_STREAMING;}/*** Returns {@code true} for {@code UNARY} and {@code CLIENT_STREAMING}, which do not permit the* server to stream.** @since 1.0.0*/public final boolean serverSendsOneMessage() {return this == UNARY || this == CLIENT_STREAMING;}}
2. proto文件定义
// @link https://github.com/grpc/grpc-java/blob/master/examples/src/main/proto/grpc/examples/echo/echo.proto
syntax = "proto3";import "google/protobuf/any.proto";option java_multiple_files = true;
//生成java代码的package
option java_package = "com.zzc.rpc.grpc.entity";
//创建的javaBean的文件名
//option java_outer_classname = "DemoProto";
//可以生成rpc接口
//option java_generic_services = true;//声明一个服务名称
service DemoService {//request is unary echo. 简单模式rpc unaryRequest (RequestGrpc) returns (ResponseGrpc) {}// serverStreamingRequest is server side streaming.服务端流模式rpc serverStreamingRequest(RequestGrpc) returns (stream ResponseGrpc) {}// clientStreamingRequest is client side streaming. 客户端流模式rpc clientStreamingRequest(stream RequestGrpc) returns (ResponseGrpc) {}// bidirectionalStreamingRequest is bidi streaming. 双向流模式rpc bidirectionalStreamingRequest(stream RequestGrpc) returns (stream ResponseGrpc) {}
}message MetadataGrpc {string type = 3;string clientIp = 8;map<string, string> headers = 7;
}message RequestGrpc {MetadataGrpc metadata = 2;google.protobuf.Any body = 3;
}message ResponseGrpc {int32 code = 5;string msg = 6;MetadataGrpc metadata = 2;google.protobuf.Any body = 3;
}
3.代码示例
(1)代码结构
(2)maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.zzc.rpc</groupId><artifactId>rpc-design</artifactId><version>1.0-SNAPSHOT</version></parent><groupId>com.zzc.rpc.grpc</groupId><artifactId>grpc-demo</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><grpc.version>1.65.1</grpc.version><protostuff.version>1.8.0</protostuff.version></properties><dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>${protostuff.version}</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>${protostuff.version}</version></dependency><!-- log4j2日志门面 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></dependency><!-- log4j2日志框架 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></dependency><!-- slf4j日志门面 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><!--log4j2的适配器,为slf4j绑定日志框架 --><!-- 依赖org.slf4j:slf4j-api:1.7.25 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency></dependencies><build><!-- SPI机制加载类 --><resources><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>*.*</include><include>META-INF/services/*</include></includes></resource></resources><!-- 通过proto文件生成java文件相关 --><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.5.0.Final</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.22.4:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.65.1:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
(3)通过proto生成grpc相关的java文件
(4)具体实现
entity实现对像
- Payload 接口,继承实现该接口的,会被ServerLoader加载
package com.zzc.rpc.grpc.entity;public interface Payload {}
- Request 请求对象
package com.zzc.rpc.grpc.entity;import java.util.Map;
import java.util.TreeMap;public class Request implements Payload {/*** 创建header集合,并且不根据key字符大小写匹配排序的*/private final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);private String requestId;public void putHeader(String key, String value) {headers.put(key, value);}public void putAllHeader(Map<String, String> headers) {if (headers == null || headers.isEmpty()) {return;}this.headers.putAll(headers);}public String getHeader(String key) {return headers.get(key);}public String getHeader(String key, String defaultValue) {String value = headers.get(key);return (value == null) ? defaultValue : value;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Map<String, String> getHeaders() {return headers;}public void clearHeaders() {this.headers.clear();}
}
- Response 响应类
package com.zzc.rpc.grpc.entity;public class Response implements Payload {private int resultCode = 0;private int code;private String msg;private String requestId;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public boolean isSuccess() {return this.resultCode == 0;}public int getResultCode() {return resultCode;}public void setResultCode(int resultCode) {this.resultCode = resultCode;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}public int getCode() {return code;}public void setCode(int code) {this.code = code;}public void setErrorInfo(int errorCode, String errorMsg) {this.resultCode = errorCode;this.code = errorCode;this.msg = errorMsg;}public static Response build() {return new Response();}public Response code(int code) {this.code = code;return this;}public Response msg(String msg) {this.msg = msg;return this;}}
- 请求头
package com.zzc.rpc.grpc.entity;public class RequestMeta {private String connectionId = "";private String clientIp = "";private String clientVersion = "";public String getClientVersion() {return clientVersion;}public void setClientVersion(String clientVersion) {this.clientVersion = clientVersion;}public String getConnectionId() {return connectionId;}public void setConnectionId(String connectionId) {this.connectionId = connectionId;}public String getClientIp() {return clientIp;}public void setClientIp(String clientIp) {this.clientIp = clientIp;}}
在resources中添加SPI扫描配置
- 在resources下新建META-INF/services/目录,然后新建接口全限定名的文件:com.zzc.rpc.grpc.entity.Payload,里面加上我们需要用到的实现类。
com.zzc.rpc.grpc.entity.Request
com.zzc.rpc.grpc.entity.Response
添加utils工具类
- ByteBufferBackedInputStream buffer对象读取
package com.zzc.rpc.grpc.utils;import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;/*** buff读取*/
public class ByteBufferBackedInputStream extends InputStream {protected final ByteBuffer _b;public ByteBufferBackedInputStream(ByteBuffer buf) { _b = buf; }@Override public int available() { return _b.remaining(); }@Overridepublic int read() throws IOException { return _b.hasRemaining() ? (_b.get() & 0xFF) : -1; }@Overridepublic int read(byte[] bytes, int off, int len) throws IOException {if (!_b.hasRemaining()) return -1;len = Math.min(len, _b.remaining());_b.get(bytes, off, len);return len;}
}
- PayloadRegistry,加载继承Payload的类,实际上Request和Response可以做成abstract类型,然后接收对象只有继承Payload就可以注册了
package com.zzc.rpc.grpc.utils;import com.zzc.rpc.grpc.entity.Payload;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;/*** 加载继承Payload的类,实际上Request和Response可以做成abstract类型,然后接收对象只有继承Payload就可以注册了*/
public class PayloadRegistry {private static final Logger log = LoggerFactory.getLogger(PayloadRegistry.class);private static final Map<String, Class<?>> REGISTRY_REQUEST = new HashMap<>();static boolean initialized = false;public static void init() {log.info("start init PayloadRegistry...");scan();}private static synchronized void scan() {if (initialized) {return;}log.info("start scan");ServiceLoader<Payload> payloads = ServiceLoader.load(Payload.class);for (Payload payload : payloads) {log.info("scan classname:{}, class:{}", payload.getClass().getSimpleName(), payload.getClass());register(payload.getClass().getSimpleName(), payload.getClass());}initialized = true;}static void register(String type, Class<?> clazz) {if (Modifier.isAbstract(clazz.getModifiers())) {//抽象类型的不注册return;}if (REGISTRY_REQUEST.containsKey(type)) {throw new RuntimeException(String.format("Fail to register, type:%s ,clazz:%s ", type, clazz.getName()));}log.info("register type:{}, class:{}", type, clazz);REGISTRY_REQUEST.put(type, clazz);}public static Class<?> getClassByType(String type) {return REGISTRY_REQUEST.get(type);}public static void main(String[] args) {init();}
}
- GRPCUtils 工具类,转换Request、Response等对象
package com.zzc.rpc.grpc.utils;import com.alibaba.fastjson2.JSON;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.zzc.rpc.grpc.entity.MetadataGrpc;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.RequestMeta;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;public class GRPCUtils {private static final Logger log = LoggerFactory.getLogger(GRPCUtils.class);public static RequestGrpc convert(Request request, RequestMeta meta) {RequestGrpc.Builder payloadBuilder = RequestGrpc.newBuilder();MetadataGrpc.Builder metadataBuilder = MetadataGrpc.newBuilder();if (meta != null) {metadataBuilder.putAllHeaders(request.getHeaders()).setType(request.getClass().getSimpleName());}metadataBuilder.setClientIp("127.0.0.1");payloadBuilder.setMetadata(metadataBuilder.build());// request body .byte[] jsonBytes = convertRequestToByte(request);return payloadBuilder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).build();}public static RequestGrpc convert(Request request) {MetadataGrpc newMeta = MetadataGrpc.newBuilder().setType(request.getClass().getSimpleName()).setClientIp("127.0.0.1").putAllHeaders(request.getHeaders()).build();byte[] jsonBytes = convertRequestToByte(request);RequestGrpc.Builder builder = RequestGrpc.newBuilder();return builder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).setMetadata(newMeta).build();}private static byte[] convertRequestToByte(Request request) {Map<String, String> requestHeaders = new HashMap<>(request.getHeaders());request.clearHeaders();byte[] jsonBytes = JSON.toJSONBytes(request);request.putAllHeader(requestHeaders);return jsonBytes;}public static ResponseGrpc convert(Response response) {byte[] jsonBytes = JSON.toJSONBytes(response);MetadataGrpc.Builder metaBuilder = MetadataGrpc.newBuilder().setType(response.getClass().getSimpleName());return ResponseGrpc.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).setMetadata(metaBuilder.build()).build();}public static <T> T parse(RequestGrpc payload) {Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType());log.info("parse classType:{}", classType);if (classType != null) {ByteString byteString = payload.getBody().getValue();ByteBuffer byteBuffer = byteString.asReadOnlyByteBuffer();T obj = JSON.parseObject(new ByteBufferBackedInputStream(byteBuffer), classType);if (obj instanceof Request) {((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());}return obj;} else {throw new RuntimeException( "Unknown payload type:" + payload.getMetadata().getType());}}public static <T> T parse(ResponseGrpc payload) {Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType());log.info("parse classType:{}", classType);if (classType != null) {ByteString byteString = payload.getBody().getValue();ByteBuffer byteBuffer = byteString.asReadOnlyByteBuffer();T obj = JSON.parseObject(new ByteBufferBackedInputStream(byteBuffer), classType);if (obj instanceof Request) {((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());}return obj;} else {throw new RuntimeException( "Unknown payload type:" + payload.getMetadata().getType());}}}
应用实现
- Grpc服务端实现
package com.zzc.rpc.grpc;import com.alibaba.fastjson2.JSON;
import com.zzc.rpc.grpc.entity.DemoServiceGrpc;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import com.zzc.rpc.grpc.utils.GRPCUtils;
import com.zzc.rpc.grpc.utils.PayloadRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class GrpcServer {static {//使用spi加载继承Payload的类PayloadRegistry.init();}private int port = 8001;private Server server;private void start() {try {server = ServerBuilder.forPort(port).addService(new GrpcServerImpl()).build().start();} catch (Exception e) {log.error("start error.");}Runtime.getRuntime().addShutdownHook(new Thread(){@Overridepublic void run() {GrpcServer.this.stop();}});}private void stop() {if (server != null) {server.isShutdown();}}private void blockUntilShutdown() {if (server != null) {try {server.awaitTermination();} catch (InterruptedException e) {log.error("blockUntilShutdown error.", e);}}}class GrpcServerImpl extends DemoServiceGrpc.DemoServiceImplBase {/*** 简单模式*/@Overridepublic void unaryRequest(RequestGrpc requestGrpc, StreamObserver<ResponseGrpc> responseObserver) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive unaryRequest data:{}", JSON.toJSONString(request));//返回请求Response response = Response.build().code(0).msg("UNARY request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}/*** 服务端流模式*/@Overridepublic void serverStreamingRequest(RequestGrpc requestGrpc, StreamObserver<ResponseGrpc> responseObserver) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive serverStreamingRequest data:{}", JSON.toJSONString(request));//返回请求Response response = Response.build().code(0).msg("SERVER_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}/*** 客户端流模式,可以多次接收客户端的请求,然后只返回一次*/@Overridepublic StreamObserver<RequestGrpc> clientStreamingRequest(StreamObserver<ResponseGrpc> responseObserver) {return new StreamObserver<RequestGrpc>() {@Overridepublic void onNext(RequestGrpc requestGrpc) {//多次接收客户端的数据Request request = GRPCUtils.parse(requestGrpc);log.info("receive clientStreamingRequest data:{}", JSON.toJSONString(request));}@Overridepublic void onError(Throwable throwable) {responseObserver.onError(new StatusException(Status.INTERNAL));log.error("clientStreamingRequest error.", throwable);}@Overridepublic void onCompleted() {//然后返回一次Response response = Response.build().code(0).msg("CLIENT_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}};}/*** 双向流模式*/@Overridepublic StreamObserver<RequestGrpc> bidirectionalStreamingRequest(StreamObserver<ResponseGrpc> responseObserver) {StreamObserver<RequestGrpc> streamObserver = new StreamObserver<RequestGrpc>() {@Overridepublic void onNext(RequestGrpc requestGrpc) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive bidirectionalStreamingRequest data:{}", JSON.toJSONString(request));Response response = Response.build().code(0).msg("SERVER_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));}@Overridepublic void onError(Throwable throwable) {responseObserver.onError(new StatusException(Status.INTERNAL));log.error("bidirectionalStreamingRequest error.", throwable);}@Overridepublic void onCompleted() {Response response = Response.build().code(0).msg("BIDI_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}};return streamObserver;}}public static void main(String[] args) {GrpcServer grpcServer = new GrpcServer();grpcServer.start();grpcServer.blockUntilShutdown();}}
- Grpc客户端实现
package com.zzc.rpc.grpc;import com.alibaba.fastjson2.JSON;
import com.zzc.rpc.grpc.entity.DemoServiceGrpc;
import com.zzc.rpc.grpc.entity.Payload;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import com.zzc.rpc.grpc.utils.GRPCUtils;
import com.zzc.rpc.grpc.utils.PayloadRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;@Slf4j
public class GrpcClient {static {PayloadRegistry.init();}private final ManagedChannel channel;private final DemoServiceGrpc.DemoServiceBlockingStub blockingStub;private final DemoServiceGrpc.DemoServiceStub stub;public GrpcClient(String host, int port) {this.channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();this.blockingStub = DemoServiceGrpc.newBlockingStub(channel);this.stub = DemoServiceGrpc.newStub(channel);}public void shutdown() {try {channel.shutdown().awaitTermination(30, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("");}}public void unarySend() {//Payload request = Payload.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(JSON.toJSONBytes(name)))).build();Request request = new Request();request.setRequestId("aaaaaaaaaaaaaaaaaaa");request.putHeader("Test", "teste-hader");RequestGrpc requestGrpc = GRPCUtils.convert(request);ResponseGrpc responseGrpc = blockingStub.unaryRequest(requestGrpc);Response rev = GRPCUtils.parse(responseGrpc);log.info("response msg:{}", JSON.toJSONString(rev));}public void clientStreamingSend() {Request request1 = new Request();request1.setRequestId("bbbbbbbbbbbbbbb");request1.putHeader("Test", "teste-hader");RequestGrpc requestGrpc1 = GRPCUtils.convert(request1);Request request2 = new Request();request2.setRequestId("bbbbbbbbbbbbbbb");request2.putHeader("Test", "teste-hader");RequestGrpc requestGrpc2 = GRPCUtils.convert(request2);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {Response response = GRPCUtils.parse(responseGrpc);log.info("onNext response msg:{}, response type:{}", JSON.toJSONString(response), responseGrpc.getMetadata().getType());}@Overridepublic void onError(Throwable throwable) {log.info("onError");}@Overridepublic void onCompleted() {//发送完数据之后的处理,比如多线程的并发执行的 countDown就可以在这里执行log.info("onCompleted clientStreamingSend.");}};StreamObserver<RequestGrpc> clientStreamingRequest = stub.clientStreamingRequest(streamObserver);try {clientStreamingRequest.onNext(requestGrpc1);//发送第一次clientStreamingRequest.onNext(requestGrpc2);//发送第二次clientStreamingRequest.onCompleted();} catch (Exception e) {clientStreamingRequest.onError(e);log.error("clientStreamingSend error.", e);}}public void serverStreamingSend() {Request request = new Request();request.setRequestId("ddddddddddddddddddddddddd");request.putHeader("Test", "teste-hader");RequestGrpc requestGrpc = GRPCUtils.convert(request);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {//服务端返回的结果,可以通过事件或接口回调等方式传递给外部使用Response response = GRPCUtils.parse(responseGrpc);log.info("serverStreamingSend response:{}", JSON.toJSONString(response));}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {//发送完数据之后的处理,比如多线程的并发执行的 countDown就可以在这里执行}};stub.serverStreamingRequest(requestGrpc, streamObserver);//同步阻塞执行/*Iterator<ResponseGrpc> responseGrpcs = blockingStub.serverStreamingRequest(requestGrpc);while (responseGrpcs.hasNext()) {ResponseGrpc responseGrpc = responseGrpcs.next();Response response = GRPCUtils.parse(responseGrpc);log.info("serverStreamingSend response:{}", JSON.toJSONString(response));}*/}private void bidirectionalStreamingSend() {CountDownLatch countDownLatch = new CountDownLatch(2);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {Response response = GRPCUtils.parse(responseGrpc);log.info("bidirectionalStreamingSend response:{}", JSON.toJSONString(response));}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();}};StreamObserver<RequestGrpc> bidirectionalStreamingRequest = stub.bidirectionalStreamingRequest(streamObserver);for (int i = 0; i < 4; i ++) {Request request = new Request();request.setRequestId("eeeeeeeeeeeeeeeeeeeeeeeee");request.putHeader("Test", "teste-hader");bidirectionalStreamingRequest.onNext(GRPCUtils.convert(request));}bidirectionalStreamingRequest.onCompleted();}public static void main(String[] args) throws InterruptedException {GrpcClient client = new GrpcClient("127.0.0.1", 8001);//简单模式client.unarySend();Thread.sleep(5000);//客户端流模式client.clientStreamingSend();Thread.sleep(5000);//服务端流模式client.serverStreamingSend();Thread.sleep(5000);//双向流模式client.bidirectionalStreamingSend();Thread.sleep(5000);client.shutdown();Thread.sleep(5000);//等待channel的消息传输完成等}}
这篇关于java grpc四种模式介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!