OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配

2024-01-22 16:08

本文主要是介绍OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Dispatcher功能是什么?

java doc:

Policy on when async requests are executed.Each dispatcher uses an ExecutorService to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.

简单翻译就是:

  • 控制异步请求何时执行。
  • 每个dispatcher拥有一个ExecutorService执行异步请求。
  • 用户可配置自定义ExecutorService,要求最大可执行线程至少是maxRequests。
  • maxRequests可通过dispatcher的getMaxRequests()方法获取。
     

接下来简单介绍一下我对dispatcher功能的理解,这样有利于理解后面的内容:

  • Dispatcher主要管理异步请求任务策略,负责分配异步线程资源,控制异步连接数。
  • Dispatcher只覆盖异步任务调度策略层面的逻辑,往下的执行过程对其来说是透明的。
  • 对于同步任务,Dispatcher只是简单记录当前运行的任务任务实体(RealCall),并且是由RealCall主动注册和注销。
  • dispatcher和RealCall、AsyncCall的耦合性比较高,它们之间会相互调用,所以它们的代码往往要相互结合来看。
     

Dispatcher的主要属性

//最大异步任务数,注意是异步不包括同步的。
private int maxRequests = 64;
//对同一个主机的最大异步任务数,同样是异步不包括同步。
private int maxRequestsPerHost = 5;
//请求任务结束,如果当前预执行任务队列为空,线程进入空闲状态会回调该接口。
private @Nullable Runnable idleCallback;
//执行异步任务的线程池。
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
//预执行的异步任务队列
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步任务队列。
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步任务队列。
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

maxRequests 和maxRequestsPerHost 为什么只记录异步请求数呢:

  • 如果用户使用单线程 + 同步任务请求,那么同时活跃的任务数肯定只有单个,没必要控制。
  • 如果用户使用多线程或者线程池 + 同步请求的话,那相当于用户自己定制和实现了异步请求策略,那么对于异步请求的管理肯定交给用户是最合适的,OkHttp也很难去管理用户的自定义实现。
  • 用户可以通过配置OkHttpClient来修改dispatcher的属性,从而扩展异步请求的策略。
     

Dispatcher的ExecutorService默认实现

在了解异步任务的执行流程之前,我们先来简单了解一下Dispatcher用来执行异步任务的默认线程池。
代码1:

public synchronized ExecutorService executorService() {if (executorService == null) {//0:表示没有核心线程,也就是没有常驻线程。//Integer.MAX_VALUE:表示活跃线程等同于最大整数,活跃线程不会常驻,有最大空闲存活时间限制。//60和TimeUnit.SECONDS:活跃线程的最大空闲存活时间是60秒//new SynchronousQueue<>():同步阻塞队列(大家可以网上找一下这方面资料了解一下),这个队列不存在容器属性,如果消费不及时,生成端put动作会被阻塞。<br/>在这里的效果就是,如果调用了ExecutorService.execute()后,如果没有空闲线程或者还没来得及创建线程,那么execute()会被阻塞,直到有线程来消费。//Util.threadFactory("OkHttp Dispatcher", false):线程工程,创建的线程添加名称前缀OkHttp Dispatcher;创建的线程为守护线程。//第六个参数executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}

总结重要的三点:

  • 线程池几乎不限制线程数。
  • 线程默认空闲存活60秒。
  • ExecutorService.execute()方法调用时,如果没有线程及时消费会一直阻塞。

Dispatcher异步任务调度策略

异步任务的执行策略的大概流程:

从上面可以看出涉及Dispatcher的两个关键方法:enqueue(AsyncCall)和promoteAndExecute()。下面就分别来分析这两个方法。

方法:enqueue(AsyncCall)
enqueue方法还没有对AsyncCall进行真正的资源分配和调度,只是对AsyncCall进行一些设置,真正的调度逻辑是由后面的promoteAndExecute()方法实现。
我们先来简单看一下enqueue方法的流程:
 

接着我们分析一下代码:

void enqueue(AsyncCall call) {synchronized (this) {//第一步:添加AsyncCall到预执行队列readyAsyncCalls.add(call);//第二步if (!call.get().forWebSocket) {AsyncCall existingCall = findExistingCallWithHost(call.host());if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);}}//第三步promoteAndExecute();
}

这方法就三部分,我相信第1、3步大家都是一眼就看穿了,所以就只分析一下第二步,其代码逻辑是设置同一Host的连接计数器:

2.1 同一Host的连接计数器主要是和maxRequestsPerHost属性做比较,目的是控制对同一Host服务器的连接数。

2.2 通过让具有相同Host的AsyncCall对象都共用一个计数器来实现。通过synchronized锁保证同一时间进入代码块的只有一个AsyncCall对象。

  • 通过synchronized锁保证同一时间进入代码块的只有一个AsyncCall对象。
  • 调用findExistingCallWithHost(call.host())方法:查找是否已经存在至少一个相同Host的AsyncCall对象,并且返回任意一个。
@Nullable 
private AsyncCall findExistingCallWithHost(String host) {for (AsyncCall existingCall : runningAsyncCalls) {if (existingCall.host().equals(host)) return existingCall;}for (AsyncCall existingCall : readyAsyncCalls) {if (existingCall.host().equals(host)) return existingCall;}return null;
}
  • 如果存在,就把之前AsyncCall对象的计数器也设置给当前的AsyncCall对象;如果不存在就直接使用当前AsyncCall对象的计数器。因为加了锁保护,这样就保证了,如果存在一段连续的时间段,该时间段内一直存在对某Host的异步请求在执行或者等待执行,那么对于该host,后面的AsyncCall对象都是共用第一个AsyncCall对象创建的计数器,直到在某个时间点不再存在连续的异步请求。
    final class AsyncCall extends NamedRunnable {private final Callback responseCallback;//同一Host的连接计数器private volatile AtomicInteger callsPerHost = new AtomicInteger(0);...//设置计数器void reuseCallsPerHostFrom(AsyncCall other) {this.callsPerHost = other.callsPerHost;}...

方法:promoteAndExecute()

promoteAndExecute()负责真正对AsyncCall进行资源的调度。
和上面一样,我们还是先来看一下简单的流程:

接着我们在解析一下代码:

private boolean promoteAndExecute() {assert (!Thread.holdsLock(this));//创建空的可执行AsyncCall集合List<AsyncCall> executableCalls = new ArrayList<>();boolean isRunning;//锁保护synchronized (this) {//对预执行队列进行迭代循环for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall asyncCall = i.next();//正在执行的队列size是否已经>=maxRequests,如果是跳出迭代循环。if (runningAsyncCalls.size() >= maxRequests,) break; //判断同一Host的连接计数器的值是否>=maxRequestsPerHost,如果是跳出迭代循环。if (asyncCall.callsPerHost().get() >= maxRequestsPerHost,) continue; //从迭代器弹出,也就是从readyAsyncCalls删除了。i.remove();//同一Host的连接计数器自增1asyncCall.callsPerHost().incrementAndGet();//添加到可执行集合。executableCalls.add(asyncCall);//添加到正在执行队列,也就是这时候asyncCall对象已经是被当作执行中状态的了。runningAsyncCalls.add(asyncCall);}isRunning = runningCallsCount() > 0;}//遍历可执行集合for (int i = 0, size = executableCalls.size(); i < size; i++) {AsyncCall asyncCall = executableCalls.get(i);//调用asyncCall.executeOn方法。asyncCall.executeOn(executorService());}return isRunning;
}

代码的重要步骤的解析我都加在上面注释里面了,相信也不难看懂。

但是最后还是要简单介绍一下“asyncCall.executeOn(executorService())”调用的执行逻辑。其实异步任务在线程资源层面的策略,是有OkHttpClient、Dispatcher和Call之间相互协作完成的,所以你单单只看Dispatcher的代码,你可能有点难以勾勒出一个相对清晰和完整的功能流程。

asyncCall.executeOn(executorService())执行流程

在开始理解AsyncCall#executeOn(ExecutorService)执行流程之前,先简单了解AsyncCall的一些基本性质:

  • AsyncCall是NamedRunnable的子类,NamedRunnable实现了Runnable接口,因此AsyncCall对象可以直接作为参数让方法“ExecutorService#execute(Runnable)”执行。
  • NamedRunnable实现了run()方法,run()方法的具体任务逻辑委派给子类execute()方法,因此“executorService#execute(Runnable)”主要执行的是AsyncCall的execute()方法。
     

下面我们来看两个具体的代码片段:

void executeOn(ExecutorService executorService) {assert (!Thread.holdsLock(client.dispatcher()));boolean success = false;try {//Call任务被线程池执行executorService.execute(this);success = true;} catch (RejectedExecutionException e) {...} finally {if (!success) {//重点是这里client.dispatcher().finished(this);}}
}@Override 
protected void execute() {boolean signalledCallback = false;transmitter.timeoutEnter();try {Response response = getResponseWithInterceptorChain();...} catch (IOException e) {...} catch (Throwable t) {...} finally {//其他都不看,先看这client.dispatcher().finished(this);}
}

因此接着上面Call.executeOn的流程继续画:

 

Dispatcher#finished(AsyncCall)功能:

  • 同一host连接计数器递减1。
  • 把当前asyncCall对象移出正在执行队列(runningAsyncCalls)。其实到这一步当前的AsyncCall对象的使命就已经完全结束了,后面是Dispatcher自身循环调用的逻辑。
  • 再次调用promoteAndExecute(),从预执行任务队列中拉取任务执行。
  • 如果预执行任务队列已经为空,调用线程空闲回调。

Dispatcher异步任务调度策略小结

到这里异步任务请求AsyncCall在Dispatcher中的整个生命周期就已经理清楚了。Dispatcher只覆盖AsyncCall在线程资源层面的执行策略,再往下的执行过程对其来说是透明的。
AsyncCall到底是从哪里进入Dispatcher的世界的,又在里面发生了什么,最后又是怎么样离它而去的?

Dispatcher同步任务策略

相对于异步任务,Dispatcher对于同步任务的管理是非常简单的,就只有两步:

第一步,同步请求任务RealCall对象在发起请求之前,由RealCall对象主动调用Dispatcher#executed(RealCall)方法,把当前RealCall对象添加到同步任务执行中队列。

注意:同步任务执行中队列是runningSyncCalls,不是runningAsyncCalls,后者是异步任务执行中队列。

synchronized void executed(RealCall call) {runningSyncCalls.add(call);
}

第二步,同步请求任务结束后,再由RealCall对象主动调用Dispatcher#finished(RealCall)方法,把当前RealCall对象从是runningSyncCalls中移除。

void finished(RealCall call) {finished(runningSyncCalls, call);
}

上面的流程都是由RealCall发起的,Dispatcher不存在发起执行的入口,这个和异步是不一样的。

@Override 
public Response execute() throws IOException {...try {//调用Dispatcher#executed(RealCall)client.dispatcher().executed(this);//真正执行请求动作return getResponseWithInterceptorChain();} finally {//调用Dispatcher#finished(RealCall)client.dispatcher().finished(this);}
}

总结

Dispatcher主要管理异步请求任务策略,负责分配异步线程资源,控制异步连接数,只覆盖策略层面的逻辑,往下的执行过程对其来说是透明的。而对于同步任务,Dispatcher只是简单记录当前运行的任务任务实体(RealCall),并且是由RealCall主动注册和注销。

这篇关于OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/633491

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

Java多线程父线程向子线程传值问题及解决

《Java多线程父线程向子线程传值问题及解决》文章总结了5种解决父子之间数据传递困扰的解决方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDeco... 目录1 背景2 ThreadLocal+TaskDecorator3 RequestContextH

java父子线程之间实现共享传递数据

《java父子线程之间实现共享传递数据》本文介绍了Java中父子线程间共享传递数据的几种方法,包括ThreadLocal变量、并发集合和内存队列或消息队列,并提醒注意并发安全问题... 目录通过 ThreadLocal 变量共享数据通过并发集合共享数据通过内存队列或消息队列共享数据注意并发安全问题总结在 J

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

异步线程traceId如何实现传递

《异步线程traceId如何实现传递》文章介绍了如何在异步请求中传递traceId,通过重写ThreadPoolTaskExecutor的方法和实现TaskDecorator接口来增强线程池,确保异步... 目录前言重写ThreadPoolTaskExecutor中方法线程池增强总结前言在日常问题排查中,

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re