OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配

2024-01-22 16:08

本文主要是介绍OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Dispatcher功能是什么?

java doc:

Policy on when async requests are executed.Each dispatcher uses an ExecutorService to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.

简单翻译就是:

  • 控制异步请求何时执行。
  • 每个dispatcher拥有一个ExecutorService执行异步请求。
  • 用户可配置自定义ExecutorService,要求最大可执行线程至少是maxRequests。
  • maxRequests可通过dispatcher的getMaxRequests()方法获取。
     

接下来简单介绍一下我对dispatcher功能的理解,这样有利于理解后面的内容:

  • Dispatcher主要管理异步请求任务策略,负责分配异步线程资源,控制异步连接数。
  • Dispatcher只覆盖异步任务调度策略层面的逻辑,往下的执行过程对其来说是透明的。
  • 对于同步任务,Dispatcher只是简单记录当前运行的任务任务实体(RealCall),并且是由RealCall主动注册和注销。
  • dispatcher和RealCall、AsyncCall的耦合性比较高,它们之间会相互调用,所以它们的代码往往要相互结合来看。
     

Dispatcher的主要属性

//最大异步任务数,注意是异步不包括同步的。
private int maxRequests = 64;
//对同一个主机的最大异步任务数,同样是异步不包括同步。
private int maxRequestsPerHost = 5;
//请求任务结束,如果当前预执行任务队列为空,线程进入空闲状态会回调该接口。
private @Nullable Runnable idleCallback;
//执行异步任务的线程池。
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
//预执行的异步任务队列
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步任务队列。
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步任务队列。
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

maxRequests 和maxRequestsPerHost 为什么只记录异步请求数呢:

  • 如果用户使用单线程 + 同步任务请求,那么同时活跃的任务数肯定只有单个,没必要控制。
  • 如果用户使用多线程或者线程池 + 同步请求的话,那相当于用户自己定制和实现了异步请求策略,那么对于异步请求的管理肯定交给用户是最合适的,OkHttp也很难去管理用户的自定义实现。
  • 用户可以通过配置OkHttpClient来修改dispatcher的属性,从而扩展异步请求的策略。
     

Dispatcher的ExecutorService默认实现

在了解异步任务的执行流程之前,我们先来简单了解一下Dispatcher用来执行异步任务的默认线程池。
代码1:

public synchronized ExecutorService executorService() {if (executorService == null) {//0:表示没有核心线程,也就是没有常驻线程。//Integer.MAX_VALUE:表示活跃线程等同于最大整数,活跃线程不会常驻,有最大空闲存活时间限制。//60和TimeUnit.SECONDS:活跃线程的最大空闲存活时间是60秒//new SynchronousQueue<>():同步阻塞队列(大家可以网上找一下这方面资料了解一下),这个队列不存在容器属性,如果消费不及时,生成端put动作会被阻塞。<br/>在这里的效果就是,如果调用了ExecutorService.execute()后,如果没有空闲线程或者还没来得及创建线程,那么execute()会被阻塞,直到有线程来消费。//Util.threadFactory("OkHttp Dispatcher", false):线程工程,创建的线程添加名称前缀OkHttp Dispatcher;创建的线程为守护线程。//第六个参数executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}

总结重要的三点:

  • 线程池几乎不限制线程数。
  • 线程默认空闲存活60秒。
  • ExecutorService.execute()方法调用时,如果没有线程及时消费会一直阻塞。

Dispatcher异步任务调度策略

异步任务的执行策略的大概流程:

从上面可以看出涉及Dispatcher的两个关键方法:enqueue(AsyncCall)和promoteAndExecute()。下面就分别来分析这两个方法。

方法:enqueue(AsyncCall)
enqueue方法还没有对AsyncCall进行真正的资源分配和调度,只是对AsyncCall进行一些设置,真正的调度逻辑是由后面的promoteAndExecute()方法实现。
我们先来简单看一下enqueue方法的流程:
 

接着我们分析一下代码:

void enqueue(AsyncCall call) {synchronized (this) {//第一步:添加AsyncCall到预执行队列readyAsyncCalls.add(call);//第二步if (!call.get().forWebSocket) {AsyncCall existingCall = findExistingCallWithHost(call.host());if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);}}//第三步promoteAndExecute();
}

这方法就三部分,我相信第1、3步大家都是一眼就看穿了,所以就只分析一下第二步,其代码逻辑是设置同一Host的连接计数器:

2.1 同一Host的连接计数器主要是和maxRequestsPerHost属性做比较,目的是控制对同一Host服务器的连接数。

2.2 通过让具有相同Host的AsyncCall对象都共用一个计数器来实现。通过synchronized锁保证同一时间进入代码块的只有一个AsyncCall对象。

  • 通过synchronized锁保证同一时间进入代码块的只有一个AsyncCall对象。
  • 调用findExistingCallWithHost(call.host())方法:查找是否已经存在至少一个相同Host的AsyncCall对象,并且返回任意一个。
@Nullable 
private AsyncCall findExistingCallWithHost(String host) {for (AsyncCall existingCall : runningAsyncCalls) {if (existingCall.host().equals(host)) return existingCall;}for (AsyncCall existingCall : readyAsyncCalls) {if (existingCall.host().equals(host)) return existingCall;}return null;
}
  • 如果存在,就把之前AsyncCall对象的计数器也设置给当前的AsyncCall对象;如果不存在就直接使用当前AsyncCall对象的计数器。因为加了锁保护,这样就保证了,如果存在一段连续的时间段,该时间段内一直存在对某Host的异步请求在执行或者等待执行,那么对于该host,后面的AsyncCall对象都是共用第一个AsyncCall对象创建的计数器,直到在某个时间点不再存在连续的异步请求。
    final class AsyncCall extends NamedRunnable {private final Callback responseCallback;//同一Host的连接计数器private volatile AtomicInteger callsPerHost = new AtomicInteger(0);...//设置计数器void reuseCallsPerHostFrom(AsyncCall other) {this.callsPerHost = other.callsPerHost;}...

方法:promoteAndExecute()

promoteAndExecute()负责真正对AsyncCall进行资源的调度。
和上面一样,我们还是先来看一下简单的流程:

接着我们在解析一下代码:

private boolean promoteAndExecute() {assert (!Thread.holdsLock(this));//创建空的可执行AsyncCall集合List<AsyncCall> executableCalls = new ArrayList<>();boolean isRunning;//锁保护synchronized (this) {//对预执行队列进行迭代循环for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall asyncCall = i.next();//正在执行的队列size是否已经>=maxRequests,如果是跳出迭代循环。if (runningAsyncCalls.size() >= maxRequests,) break; //判断同一Host的连接计数器的值是否>=maxRequestsPerHost,如果是跳出迭代循环。if (asyncCall.callsPerHost().get() >= maxRequestsPerHost,) continue; //从迭代器弹出,也就是从readyAsyncCalls删除了。i.remove();//同一Host的连接计数器自增1asyncCall.callsPerHost().incrementAndGet();//添加到可执行集合。executableCalls.add(asyncCall);//添加到正在执行队列,也就是这时候asyncCall对象已经是被当作执行中状态的了。runningAsyncCalls.add(asyncCall);}isRunning = runningCallsCount() > 0;}//遍历可执行集合for (int i = 0, size = executableCalls.size(); i < size; i++) {AsyncCall asyncCall = executableCalls.get(i);//调用asyncCall.executeOn方法。asyncCall.executeOn(executorService());}return isRunning;
}

代码的重要步骤的解析我都加在上面注释里面了,相信也不难看懂。

但是最后还是要简单介绍一下“asyncCall.executeOn(executorService())”调用的执行逻辑。其实异步任务在线程资源层面的策略,是有OkHttpClient、Dispatcher和Call之间相互协作完成的,所以你单单只看Dispatcher的代码,你可能有点难以勾勒出一个相对清晰和完整的功能流程。

asyncCall.executeOn(executorService())执行流程

在开始理解AsyncCall#executeOn(ExecutorService)执行流程之前,先简单了解AsyncCall的一些基本性质:

  • AsyncCall是NamedRunnable的子类,NamedRunnable实现了Runnable接口,因此AsyncCall对象可以直接作为参数让方法“ExecutorService#execute(Runnable)”执行。
  • NamedRunnable实现了run()方法,run()方法的具体任务逻辑委派给子类execute()方法,因此“executorService#execute(Runnable)”主要执行的是AsyncCall的execute()方法。
     

下面我们来看两个具体的代码片段:

void executeOn(ExecutorService executorService) {assert (!Thread.holdsLock(client.dispatcher()));boolean success = false;try {//Call任务被线程池执行executorService.execute(this);success = true;} catch (RejectedExecutionException e) {...} finally {if (!success) {//重点是这里client.dispatcher().finished(this);}}
}@Override 
protected void execute() {boolean signalledCallback = false;transmitter.timeoutEnter();try {Response response = getResponseWithInterceptorChain();...} catch (IOException e) {...} catch (Throwable t) {...} finally {//其他都不看,先看这client.dispatcher().finished(this);}
}

因此接着上面Call.executeOn的流程继续画:

 

Dispatcher#finished(AsyncCall)功能:

  • 同一host连接计数器递减1。
  • 把当前asyncCall对象移出正在执行队列(runningAsyncCalls)。其实到这一步当前的AsyncCall对象的使命就已经完全结束了,后面是Dispatcher自身循环调用的逻辑。
  • 再次调用promoteAndExecute(),从预执行任务队列中拉取任务执行。
  • 如果预执行任务队列已经为空,调用线程空闲回调。

Dispatcher异步任务调度策略小结

到这里异步任务请求AsyncCall在Dispatcher中的整个生命周期就已经理清楚了。Dispatcher只覆盖AsyncCall在线程资源层面的执行策略,再往下的执行过程对其来说是透明的。
AsyncCall到底是从哪里进入Dispatcher的世界的,又在里面发生了什么,最后又是怎么样离它而去的?

Dispatcher同步任务策略

相对于异步任务,Dispatcher对于同步任务的管理是非常简单的,就只有两步:

第一步,同步请求任务RealCall对象在发起请求之前,由RealCall对象主动调用Dispatcher#executed(RealCall)方法,把当前RealCall对象添加到同步任务执行中队列。

注意:同步任务执行中队列是runningSyncCalls,不是runningAsyncCalls,后者是异步任务执行中队列。

synchronized void executed(RealCall call) {runningSyncCalls.add(call);
}

第二步,同步请求任务结束后,再由RealCall对象主动调用Dispatcher#finished(RealCall)方法,把当前RealCall对象从是runningSyncCalls中移除。

void finished(RealCall call) {finished(runningSyncCalls, call);
}

上面的流程都是由RealCall发起的,Dispatcher不存在发起执行的入口,这个和异步是不一样的。

@Override 
public Response execute() throws IOException {...try {//调用Dispatcher#executed(RealCall)client.dispatcher().executed(this);//真正执行请求动作return getResponseWithInterceptorChain();} finally {//调用Dispatcher#finished(RealCall)client.dispatcher().finished(this);}
}

总结

Dispatcher主要管理异步请求任务策略,负责分配异步线程资源,控制异步连接数,只覆盖策略层面的逻辑,往下的执行过程对其来说是透明的。而对于同步任务,Dispatcher只是简单记录当前运行的任务任务实体(RealCall),并且是由RealCall主动注册和注销。

这篇关于OkHttp 3.14.10源码分析(3)- Dispatcher - 线程资源管理和分配的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/633491

相关文章

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

Spring Boot3虚拟线程的使用步骤详解

《SpringBoot3虚拟线程的使用步骤详解》虚拟线程是Java19中引入的一个新特性,旨在通过简化线程管理来提升应用程序的并发性能,:本文主要介绍SpringBoot3虚拟线程的使用步骤,... 目录问题根源分析解决方案验证验证实验实验1:未启用keep-alive实验2:启用keep-alive扩展建

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

SpringBoot使用OkHttp完成高效网络请求详解

《SpringBoot使用OkHttp完成高效网络请求详解》OkHttp是一个高效的HTTP客户端,支持同步和异步请求,且具备自动处理cookie、缓存和连接池等高级功能,下面我们来看看SpringB... 目录一、OkHttp 简介二、在 Spring Boot 中集成 OkHttp三、封装 OkHttp

C++ 各种map特点对比分析

《C++各种map特点对比分析》文章比较了C++中不同类型的map(如std::map,std::unordered_map,std::multimap,std::unordered_multima... 目录特点比较C++ 示例代码 ​​​​​​代码解释特点比较1. std::map底层实现:基于红黑

Java终止正在运行的线程的三种方法

《Java终止正在运行的线程的三种方法》停止一个线程意味着在任务处理完任务之前停掉正在做的操作,也就是放弃当前的操作,停止一个线程可以用Thread.stop()方法,但最好不要用它,本文给大家介绍了... 目录前言1. 停止不了的线程2. 判断线程是否停止状态3. 能停止的线程–异常法4. 在沉睡中停止5

Spring、Spring Boot、Spring Cloud 的区别与联系分析

《Spring、SpringBoot、SpringCloud的区别与联系分析》Spring、SpringBoot和SpringCloud是Java开发中常用的框架,分别针对企业级应用开发、快速开... 目录1. Spring 框架2. Spring Boot3. Spring Cloud总结1. Sprin