dubbo remoting 层之 exchange

2023-10-20 15:18
文章标签 dubbo exchange remoting

本文主要是介绍dubbo remoting 层之 exchange,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

补充点额外知识:

CompletableFuture 现在 只需要知道是一个和异步任务相关的东西即可.

AbstractTimerTask 实现的是 TimerTask 接口,先来分析下这个抽象类,因为这个类是其他任务类的父类.

// channel 集合
private final ChannelProvider channelProvider;
// 下次任务执行的间隔
private final Long tick;
// 是否取消
protected volatile boolean cancel = false;

lastRead 会从 channel 中获取读取的时间戳
lastWrite 会从 channel 中获取写的时间戳
now 返回当前时间
cancel 取消任务
reput 方法将任务重新放入调度器中.
run 方法执行任务
子类需要重写抽象方法 doTask,用于实现自己的业务逻辑,AbstractTimerTask 在实现的过程中,已经自动的实现了 reput 的功能.

CloseTimerTask 继承 AbstractTimerTask 类,实现关闭 channel 的功能,如果判定可以关闭了?现在距上次读/写时间超过空闲时间,则关闭 channel.

HeartbeatTimerTask 同样继承 AbstractTimerTask 类,实现心跳检测功能,比如说当前 channel 距上次读/写时间超过心跳时间(60s),就发送一个心跳事件去看下.

ReconnectTimerTask 继承 AbstractTimerTask 类,实现重连功能. 空闲时间必须心跳间隔的 2 倍以上,因为客户端可能会重试. 什么时候会触发重连了?1.channel 没有连接上,2.距上次读超过了空闲时间.

DefaultFuture 继承 CompletableFuture 类,首先关注下该类的重要属性.

private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}

在来关注下 TimeoutCheckTask 这个任务,如果任务顺利做完,则一切 ok,如果没有顺利做完,则肯定有异常,要么服务端超时,要么客户端超时. 最后调用 DefaultFuture.received 方法进行处理. 然后到 doReceived 方法. 其实是不是有一个疑问?如果正常执行完,那么正常的 response 是在哪里设置进去的了?答案是 HeaderExchangeHandler.handleResponse 方法.

先去网上盗个图:

ExchangeChannel 继承 Channel,额外封装了两个方法:request,用于更贴近用户使用习惯.
ExchangeClient 继承 Client 和 ExchangeChannel 接口,这个表达的意思是客户端是 Channel 的具体化.

HeaderExchangeClient 实现 ExchangeClient 接口,同样采用了装饰者模式,HeaderExchangeClient 持有 client 和 channel 属性. client 本身就是从 channel 细化而来,所以 channel 是直接 new HeaderExchangeChannel(client). 其实这一步就是在增强 channel 的功能. 值得注意的是 client 端有重连任务和心跳任务. 我发现 HeaderExchangeClient 实现的方法,都是直接调用 client.xxx 或者 channel.xxx.

下面关注下两个任务.
两个任务的相关内容参考上面.

HeaderExchangeChannel 实现了 ExchangeChannel 接口,该类同样采用了装饰者模式,是功能的增强. 关于该类实现的核心方法,我们重点关注下 send 方法和 request 方法.

public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + “, cause: The channel " + this + " is closed!”);
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}

是不是感觉很怪异?但是又说不上来哪里怪了?HeaderExchangeChannel 的职责就是用Request对象包装请求体,但是如果之前已经有装饰器对请求体做过包装了,那就没必要再包装一层了. 那么问题来了?
1.啥时候回封装 Response 对象?
2.为啥 String 类型的消息可以直接被发送走?
经过高人指点,发现 HeaderExchangeHandler 中有这么一段处理逻辑:

public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}

我们再看下 request 方法:

public CompletableFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + “, cause: The channel " + this + " is closed!”);
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

1.为什么 request 方法不需要判断 request 的类型了?类似与 send 方法那样判断?

我的理解是调用问题吧. send 方法用于单方面的通知(也不一就,但是有一点可以确定的是 send 方法只管发送,发送完后就不管结果了),而 request 类似于请求/应答模式,是有结果的.
我发现,类似于 ExecutionChannelHandler 类,调用的都是 channel.send 方法,而没有使用 request. 所以,request 方法是不需要额外的进行判断的.

ExchangeHandler 继承 ChannelHandler 和 TelnetHandler,该类之定义了一个方法,CompletableFuture reply(ExchangeChannel channel, Object request).

ExchangeHandlerDispatcher 实现 ExchangeHandler 接口,该类运用了组合模式,有三个比较关键的成员变量:

private final ReplierDispatcher replierDispatcher;private final ChannelHandlerDispatcher handlerDispatcher;private final TelnetHandler telnetHandler;

该三个成员变量,分别管不同的方法.

ExchangeHandlerAdapter 主要是为了做适配 TelnetHandlerAdapter 和 ExchangeHandler.

ExchangeServer 继承 Server,扩展了两个和 ExchangeChannel 相关的方法.

HeaderExchangeServer 实现 ExchangeServer,同样采用了装饰者模式,装饰 Server. 需要注意的是,空闲检查在 Server 和 Client 的处理是不一样的,Server 是关闭 Channel,而 Client 是重连.
当服务端关闭时,会发送 readonly 事件,通知客户端服务不可用.

ExchangeServerDelegate 为 ExchangeServer 的代理.

Exchanger 类似 Transporter

HeaderExchangeHandler 和 netty 中 handler 的概念很类似,我们看下它是如何处理事件的.

我们可以看到,connected 的时候,放入了 read 时间戳和 write 时间戳. 同时将 channel 包装成 ExchangeHandler.
public void connected(Channel channel) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.connected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}

然后看下 sent 方法:

public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.sent(exchangeChannel, message);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
// Request 类型的消息,才会有发送. 因为 Request 类型的消息,需要得到结果,所以需要标记已发送.
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request);
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}

这个类看的还不是很明白,有待调试…

客户端调用链:

注意:这里的 handler 其实是 NettyClient.

这个 handler 是如何被组装起来的?

首先看下 client,发现调用 wrapChannelHandler 的时候,会增加 MultiMessageHandler 和 HeartbeatHandler 的功能. 同时会进一步使用 dispatcher 获得一个 handler,默认的是 AllChannelHandler.

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}

那 DecodeHandler 是如何被封装进去的了?

答案在 DubboPotocol 类中:

client = Exchangers.connect(url, requestHandler);

然后到 HeaderExchange.connect 方法中:

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

最后的那个 DubboProtocol$1@3481 是 requestHandler

那数据时如何发送的了?通过 ReferenceCountExchangeClient.request -> HeaderExchangeClient.request -> HeaderExchangeChannel.request(将 object 封装成 Request 对象)

Request 对象在传输的过程中,会调用编解码器进行处理.

服务端调用链:

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

这篇关于dubbo remoting 层之 exchange的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/247932

相关文章

springboot+dubbo实现时间轮算法

《springboot+dubbo实现时间轮算法》时间轮是一种高效利用线程资源进行批量化调度的算法,本文主要介绍了springboot+dubbo实现时间轮算法,文中通过示例代码介绍的非常详细,对大家... 目录前言一、参数说明二、具体实现1、HashedwheelTimer2、createWheel3、n

Exchange 服务器地址列表的配置方法与注意事项

Exchange Server 是微软推出的一款企业级邮件服务器软件,广泛应用于企业内部邮件系统的搭建与管理。配置 Exchange 服务器地址列表是其中一个关键环节。本文将详细介绍 Exchange 服务器地址列表的配置方法与注意事项,帮助系统管理员顺利完成这一任务。 内容目录 1. 引言 2. 准备工作 3. 配置地址列表 3.1 创建地址列表 3.2 使用 Exchange

Dubbo学习入门

本文参考自:Dubbo用户手册(中文)http://dubbo.apache.org/books/dubbo-user-book/ 现在的参考文档地址:http://dubbo.apache.org/zh-cn/docs/user/quick-start.html 入门请参考自《Dubbo用户手册(中文)》第一节,在手册第二节说明如何快速启动Dubbo,下面就顺着手册的使用方式,自己搭建一个快

dubbo服务过程

dubbo服务暴露过程 Dubbo通过实现ApplicationListener接口,监听了容器刷新的事件,再容器刷新后调用onApplicationEvent方法,这个是服务暴露的启动点通过识别带目标注解的服务类通过动态代理统一暴露出Invoker,如何通过配置文件以及目标协议(SPI机制)封装成exporter存储起来如果是本地注册,将exporter存入ServiceConfig的缓存,如

Dubbo缓存

是的,Dubbo 可以对服务调用结果进行缓存。通过缓存结果,可以减少重复调用、降低服务提供者的负载,并提高系统的响应速度和吞吐量。Dubbo 内置了多种缓存机制,开发者可以根据不同的业务需求选择合适的缓存策略。 1. Dubbo 结果缓存的工作原理 Dubbo 的结果缓存功能是在服务消费者一侧实现的。当一个服务消费者调用某个服务时,Dubbo 会首先检查本地缓存中是否有该服务的结果。如果缓存中

springboot+dubbo+zk 入门篇(windows单机版)

一、下载安装zk注册中心并启动:     官网地址:http://www.apache.org/dyn/closer.cgi/zookeeper/     我的是zookeeper-3.3.6版本的。下载之后需要修改下文件:进入zk的conf目录。复制下zoo_sample.cfg     这个文件并重命名为zoo.cfg,然后把修改该文件内容,下面是我的,这个只是单机配置: # 心跳时间间隔

dubbo 服务消费原理分析之引用服务配置

文章目录 前言一、服务监听ContextRefreshedEvent1、AbstractApplicationContext.refresh2、AbstractApplicationContext.finishRefresh3、DubboDeployApplicationListener.onApplicationEvent4、DefaultModuleDeployer .referServ

dubbo是什么?,能做什么?以及其工作流程

1.Dubbo是什么?能做什么? Dubbo是阿里巴巴开源的基于Java的高性能RPC分布式服务框架,现已成为Apache基金会孵化项目,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案 简单来说,dubbo就是个服务框架,如果没有分布式的需求,其实是不需要用的,只有在分布式的时候,才有dubbo这样的分布式服务框架的请求,本质上是个远程服务调用的分布式框架 其核心部

Dubbo依赖包

Dubbo 是一个高性能的 RPC 框架,用于构建分布式服务治理系统。要使用 Dubbo,项目中需要引入一些关键的依赖包。这些依赖包提供了 Dubbo 的核心功能、服务注册与发现、网络通信、序列化等能力。 一、Dubbo 核心依赖包 Dubbo 的核心依赖包包含了实现 RPC 功能的基础组件,如服务暴露、调用、负载均衡、容错机制等。以下是 Dubbo 必须依赖的核心包: 1. dubbo

zookeeper/dubbo使用记录

zookeeper版本号:zookeeper-3.4.6 在windows上使用的时候将 1,conf目录下的zoo_sample.cfg名字修改为zoo.cfg,里面内容可以不变 2,执行bin目录下的zkServer.cmd 这样zookeeper服务就启动了。 自己在本地使用zookeeper与dubbo时行接口调用时使用的时需要的部分jar包: dubbo-