阐述Dubbo服务提供方的解码原理

2024-05-09 03:36

本文主要是介绍阐述Dubbo服务提供方的解码原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 解码概述

在Dubbo服务消费方(客户端)和服务提供方(服务端)进行网络通信时,服务提供方会通过socket把需要发送的内容序列化为二进制流后发出。接着二进制流通过网络流向服务提供方。服务提供方接收到该请求后会解析该请求包,对接收到的数据进行反序列化后对请求进行处理。

服务提供方接收到请求后解析请求包(即Dubbo协议的内容)的过程即服务提供方的解码过程。

2 解码原理解析

2.1 解码入口

服务提供方解析请求包最终是借助于 InternalDecoder 的 decode 方法来实现的。具体实现代码如下所示。

protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {ChannelBuffer message = new NettyBackedChannelBuffer(input);NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);// decode object.do {int saveReaderIndex = message.readerIndex();Object msg = codec.decode(channel, message);if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {message.readerIndex(saveReaderIndex);break;} else {//is it possible to go here ?if (saveReaderIndex == message.readerIndex()) {throw new IOException("Decode without read data.");}if (msg != null) {out.add(msg);}}} while (message.readable());
}

2.2 解码实现细节

在 InternalDecoder 的 decode 方法内部再调用了具体解码实现来进行解码。如 ExchangeCodec 的 decode 方法,实现如下所示。

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {// 获取dubbo协议帧的字节数int readable = buffer.readableBytes();// 将dubbo协议头读取到数组headerbyte[] header = new byte[Math.min(readable, HEADER_LENGTH)];buffer.readBytes(header);// 解析dubbo协议帧数据部分return decode(channel, buffer, readable, header);
}protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {// check magic number.if (readable > 0 && header[0] != MAGIC_HIGH|| readable > 1 && header[1] != MAGIC_LOW) {int length = header.length;if (header.length < readable) {header = Bytes.copyOf(header, readable);buffer.readBytes(header, length, readable - length);}for (int i = 1; i < header.length - 1; i++) {if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {buffer.readerIndex(buffer.readerIndex() - header.length + i);header = Bytes.copyOf(header, i);break;}}return super.decode(channel, buffer, readable, header);}// check length.// (1)dubbo协议帧实际字节数小于协议头长度,说明该协议帧不是一个完整的协议帧if (readable < HEADER_LENGTH) {return DecodeResult.NEED_MORE_INPUT;}// get data length.int len = Bytes.bytes2int(header, 12);// When receiving response, how to exceed the length, then directly construct a response to the client.// see more detail from https://github.com/apache/dubbo/issues/7021.Object obj = finishRespWhenOverPayload(channel, len, header);if (null != obj) {return obj;}// dubbo协议帧理论上的字节数int tt = len + HEADER_LENGTH;// (2)dubbo协议帧实际字节数小于理论上的字节数,说明该协议帧不是一个完整的协议帧if (readable < tt) {return DecodeResult.NEED_MORE_INPUT;}// limit input stream.ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);try {return decodeBody(channel, is, header);} finally {if (is.available() > 0) {try {if (logger.isWarnEnabled()) {logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", "Skip input stream " + is.available());}StreamUtils.skipUnusedStream(is);} catch (IOException e) {logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", e.getMessage(), e);}}}
}

其中将dubbo协议头读取到header数组的实现如下所示。

    @Overridepublic void readBytes(byte[] dst) {readBytes(dst, 0, dst.length);}

对dubbo协议body进行解析则是调用了 decodeBody 方法,实现如下所示。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// get request id.long id = Bytes.bytes2long(header, 4);if ((flag & FLAG_REQUEST) == 0) {// decode response.Response res = new Response(id);if ((flag & FLAG_EVENT) != 0) {res.setEvent(true);}// get status.byte status = header[3];res.setStatus(status);try {if (status == Response.OK) {Object data;if (res.isEvent()) {byte[] eventPayload = CodecSupport.getPayload(is);if (CodecSupport.isHeartBeat(eventPayload, proto)) {// heart beat response data is always null;data = null;} else {data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);}} else {data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));}res.setResult(data);} else {res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());}} catch (Throwable t) {res.setStatus(Response.CLIENT_ERROR);res.setErrorMessage(StringUtils.toString(t));}return res;} else {// decode request.Request req;try {Object data;if ((flag & FLAG_EVENT) != 0) {byte[] eventPayload = CodecSupport.getPayload(is);if (CodecSupport.isHeartBeat(eventPayload, proto)) {// heart beat response data is always null;req = new HeartBeatRequest(id);((HeartBeatRequest) req).setProto(proto);data = null;} else {req = new Request(id);data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);}req.setEvent(true);} else {req = new Request(id);data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));}req.setData(data);} catch (Throwable t) {// bad requestreq = new Request(id);req.setBroken(true);req.setData(t);}req.setVersion(Version.getProtocolVersion());req.setTwoWay((flag & FLAG_TWOWAY) != 0);return req;}
}

这篇关于阐述Dubbo服务提供方的解码原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/972274

相关文章

Android 悬浮窗开发示例((动态权限请求 | 前台服务和通知 | 悬浮窗创建 )

《Android悬浮窗开发示例((动态权限请求|前台服务和通知|悬浮窗创建)》本文介绍了Android悬浮窗的实现效果,包括动态权限请求、前台服务和通知的使用,悬浮窗权限需要动态申请并引导... 目录一、悬浮窗 动态权限请求1、动态请求权限2、悬浮窗权限说明3、检查动态权限4、申请动态权限5、权限设置完毕后

TP-Link PDDNS服将于务6月30日正式停运:用户需转向第三方DDNS服务

《TP-LinkPDDNS服将于务6月30日正式停运:用户需转向第三方DDNS服务》近期,路由器制造巨头普联(TP-Link)在用户群体中引发了一系列重要变动,上个月,公司发出了一则通知,明确要求所... 路由器厂商普联(TP-Link)上个月发布公告要求所有用户必须完成实名认证后才能继续使用普联提供的 D

MySQL中的MVCC底层原理解读

《MySQL中的MVCC底层原理解读》本文详细介绍了MySQL中的多版本并发控制(MVCC)机制,包括版本链、ReadView以及在不同事务隔离级别下MVCC的工作原理,通过一个具体的示例演示了在可重... 目录简介ReadView版本链演示过程总结简介MVCC(Multi-Version Concurr

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构

Java中使用Java Mail实现邮件服务功能示例

《Java中使用JavaMail实现邮件服务功能示例》:本文主要介绍Java中使用JavaMail实现邮件服务功能的相关资料,文章还提供了一个发送邮件的示例代码,包括创建参数类、邮件类和执行结... 目录前言一、历史背景二编程、pom依赖三、API说明(一)Session (会话)(二)Message编程客

windos server2022的配置故障转移服务的图文教程

《windosserver2022的配置故障转移服务的图文教程》本文主要介绍了windosserver2022的配置故障转移服务的图文教程,以确保服务和应用程序的连续性和可用性,文中通过图文介绍的非... 目录准备环境:步骤故障转移群集是 Windows Server 2022 中提供的一种功能,用于在多个

解决systemctl reload nginx重启Nginx服务报错:Job for nginx.service invalid问题

《解决systemctlreloadnginx重启Nginx服务报错:Jobfornginx.serviceinvalid问题》文章描述了通过`systemctlstatusnginx.se... 目录systemctl reload nginx重启Nginx服务报错:Job for nginx.javas

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

SpringCloud配置动态更新原理解析

《SpringCloud配置动态更新原理解析》在微服务架构的浩瀚星海中,服务配置的动态更新如同魔法一般,能够让应用在不重启的情况下,实时响应配置的变更,SpringCloud作为微服务架构中的佼佼者,... 目录一、SpringBoot、Cloud配置的读取二、SpringCloud配置动态刷新三、更新@R