OkHttp源码解析(一)- Dispatcher

2024-02-18 17:32

本文主要是介绍OkHttp源码解析(一)- Dispatcher,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

OkHttp的配置、使用步骤这里就不展开描述了,网络上有很多优秀的文章,这里主要是对学习源码中理解到的知识进行概括总结。

该文章根据OkHttp-3.11.0版本进行分析

首先介绍一下OkHttp的两种请求方式(不是get、post、put等等这些请求方式):

  • 请求方式
    1. 同步
      调用方式:call.execute
      阻塞线程

    2. 异步
      调用方式:call.enqueue
      不阻塞线程
      回调在子线程,不能做更新UI操作

再介绍一下OkHttp中核心的两个东西之一 - Dispatcher,也是这篇文章主要分析的东西:

  • Dispatcher

    意义:分发器,内部维护请求的状态(异步或同步),维护3个队列(ReadyAsyn:异步准备执行队列、RunningAsyn:异步执行中队列、RunningSyn:同步执行中队列),维护一个线程池(核心线程为0,最大线程数为Integer.MAX_VALUE,空闲时间60秒)。这里的线程池,虽然最大线程数定义为了Integer的最大值,但是在执行过程中会将执行的最大并发请求数默认设置为64个,所以

我们先来看看 Dispatcher 是如何构建出来的。
在我们使用OkHttp进行初始化的时候,是通过OkHttpClient的静态内部类 Builder 进行创建(这里使用了建造者模式,有兴趣的可以去看一下设计模式,OkHttp中大量运用了这种设计模式进行对象的创建),Builder 的构造方法中第一步就是创建了 Dispatcher 对象:

// OkHttpClient类
public Builder() {dispatcher = new Dispatcher();//省略其他代码
}//Dispatcher类
public Dispatcher(ExecutorService executorService) {this.executorService = executorService;
}public Dispatcher() {
}

可以看到,Dispatcher 的创建是调用了空参的构造方法,那么里面的线程池是什么时候创建的呢?
我们先看一下 Dispatcher 类中几个关键的属性:

  //并发最大请求数,正在执行的异步请求队列数超过最大请求数时,会把新的请求放在准备的异步请求队列private int maxRequests = 64;//同一Host的并发最大请求数,也就是说假设对于192.168.0.1 最多只能同时请求5次(未完成且未取消状态的请求),超过5个也会把新的同一Host的请求放入准备的异步请求队列private int maxRequestsPerHost = 5;private @Nullable Runnable idleCallback;//线程池,根据注释我们可以知道是用于执行异步请求的,并且是懒加载,也就是用到的时候再初始化/** Executes calls. Created lazily. */private @Nullable ExecutorService executorService;//准备的异步请求队列/** Ready async calls in the order they'll be run. */private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//正在执行的异步请求队列,包含未完成的被取消的请求/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();//正在执行的同步请求队列,包含未完成的被取消的请求/** Running synchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

代码中我添加了中文注释,理解了这几个属性的含义对于接下来看源码的过程有帮助。代码注释中有说到线程池的创建是lazily的,用到再创建,那么我们从OkHttp开始请求时分析:

//同步
Response response = client.newCall(request).execute();//异步
client.newCall(request).enqueue(new Callback() {@Overridepublic void onFailure(Call call, IOException e) {}@Overridepublic void onResponse(Call call, Response response) throws IOException {}});

我们点到execute的源码里面看到是 Call 接口,那么必定是实现类做了操作,我们跟踪 newCall() 方法:

  @Override public Call newCall(Request request) {return RealCall.newRealCall(this, request, false /* for web socket */);}

可以看到里面构建了一个 RealCall ,不管是同步还是异步,其实都是 RealCall 在执行,那我们就看 RealCall 的 同步/异步 实现:

  //同步@Override public Response execute() throws IOException {synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}captureCallStackTrace();eventListener.callStart(this);try {client.dispatcher().executed(this);Response result = getResponseWithInterceptorChain();if (result == null) throw new IOException("Canceled");return result;} catch (IOException e) {eventListener.callFailed(this, e);throw e;} finally {client.dispatcher().finished(this);}}

从源码我们一眼就能看到 RealCall 的同步方法中将执行的逻辑交给了 Dispatcher,其中同步方法交给 Dispatcher 之后马上就执行 getResponseWithInterceptorChain() 方法了,这里涉及了拦截器链的流程,也是真正执行请求的地方,放到另一篇文章分析。Dispatcherexecuted()

  /** Used by {@code Call#execute} to signal it is in-flight. */synchronized void executed(RealCall call) {runningSyncCalls.add(call);}

可以看到,就是将 RealCall 添加到维护的同步队列中。添加进去了,那么什么时候会将这个call移除呢?在 try catch代码块后面有一个finally代码块,这个代码块中执行了 Dispatcherfinished() 方法,注意,这里传的this是 RealCall 对象,我们跳转进去:

  /** Used by {@code Call#execute} to signal completion. */void finished(RealCall call) {finished(runningSyncCalls, call, false);}

这里传进去的队列是同步队列 runningSyncCalls,并且第三个参数是 false :

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {int runningCallsCount;Runnable idleCallback;synchronized (this) {if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}if (runningCallsCount == 0 && idleCallback != null) {idleCallback.run();}
}

根据传参我们可以知道里面代码的执行流程是:1、从同步队列中移除call,移除失败时会抛出异常,然后判断 promoteCalls 参数是否为true,这里同步请求为false,不会调用 promoteCalls() 方法,这个方法在异步请求流程中会被调用。2、判断当前正在运行的请求数是否为0,如果为0并且idleCallback不为空时,会执行这个idleCallback。

接着我们再看异步方法的执行流程:

  //异步@Override public void enqueue(Callback responseCallback) {synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}captureCallStackTrace();eventListener.callStart(this);client.dispatcher().enqueue(new AsyncCall(responseCallback));}

异步方法中会创建一个 AsyncCall 传入 Dispatcherenqueue 方法中,我们暂且不看 AsyncCall 类,我们先跟踪 Dispatcherenqueue() 方法:

  synchronized void enqueue(AsyncCall call) {if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {runningAsyncCalls.add(call);executorService().execute(call);} else {readyAsyncCalls.add(call);}}

方法中做了两个判断:1、判断当前正在执行的请求数是否超过最大请求数限制(源码中初始值为64);2、判断当前同一Host的执行请求数是否超过最大限制数(源码中初始值为5)。

如果判断通过则加入正在执行队列,并且交给线程池去执行,线程池也在此时创建:

public synchronized ExecutorService executorService() {if (executorService == null) {executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}

判断不通过则加入准备队列,等待执行。

到这里我们就明白了线程池是在什么时机创建的,并且也明白了 Dispatcher 在请求过程中扮演了什么样的角色。
紧接着我们还需要弄明白异步请求过程中两个队列(运行中队列、准备队列)是如何配合工作的,Dispatcher中的 enqueue() 方法会利用线程池去执行异步请求,这时我们就需要跳转到 AsyncCall 类查看:

  final class AsyncCall extends NamedRunnable {private final Callback responseCallback;AsyncCall(Callback responseCallback) {super("OkHttp %s", redactedUrl());this.responseCallback = responseCallback;}//省略部分代码@Override protected void execute() {boolean signalledCallback = false;try {Response response = getResponseWithInterceptorChain();if (retryAndFollowUpInterceptor.isCanceled()) {signalledCallback = true;responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {signalledCallback = true;responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {eventListener.callFailed(RealCall.this, e);responseCallback.onFailure(RealCall.this, e);}} finally {client.dispatcher().finished(this);}}}

因为 AsyncCall 是一个 NamedRunnable 的实现类,最终会执行到重写的 execute() 方法,方法当中我们又看到了在同步方法中出现过的熟悉的身影 getResponseWithInterceptorChain() ,同样的,我们放到另一个文章中分析。try catch代码块也跟了一个 finally 代码块,这里调用的 finished() 方法中传入的是 AsyncCall 对象,我们跳转进去:

  /** Used by {@code AsyncCall#run} to signal completion. */void finished(AsyncCall call) {finished(runningAsyncCalls, call, true);}

这里调用了3个参数的 finished() 方法,并且传入的第一个参数是运行中的异步队列 runningAsyncCalls ,第三个参数和同步时调用的传参 false 不同,这里为true。我们再看一下重载的3个参数的 finished() 方法, 这里只列出关键的几行代码:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {//当前分析流程的核心代码synchronized (this) {if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();}
}

和同步请求的流程一样,会先从运行中的异步队列 runningAsyncCalls 移除call,紧接着因为第三个参数为true,会执行 promoteCalls() 方法:

private void promoteCalls() {//判断目前运行中的异步队列是否超过最大请求数限制if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.//判断准备的异步队列是否为空if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.//遍历准备的异步队列,将其中的请求逐个取出放入运行中队列,并且让线程池执行请求for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall call = i.next();if (runningCallsForHost(call) < maxRequestsPerHost) {i.remove();runningAsyncCalls.add(call);executorService().execute(call);}//每次for循环的最后再判断运行中队列是否超过最大请求数限制,如果超过就结束循环if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.}}

这里就是异步的准备队列、运行中队列通过 Dispatcher 相互配合工作的核心代码,至此 Dispatcher 的调度工作就分析完了。

这篇关于OkHttp源码解析(一)- Dispatcher的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/721911

相关文章

深度解析Python中递归下降解析器的原理与实现

《深度解析Python中递归下降解析器的原理与实现》在编译器设计、配置文件处理和数据转换领域,递归下降解析器是最常用且最直观的解析技术,本文将详细介绍递归下降解析器的原理与实现,感兴趣的小伙伴可以跟随... 目录引言:解析器的核心价值一、递归下降解析器基础1.1 核心概念解析1.2 基本架构二、简单算术表达

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

Maven中生命周期深度解析与实战指南

《Maven中生命周期深度解析与实战指南》这篇文章主要为大家详细介绍了Maven生命周期实战指南,包含核心概念、阶段详解、SpringBoot特化场景及企业级实践建议,希望对大家有一定的帮助... 目录一、Maven 生命周期哲学二、default生命周期核心阶段详解(高频使用)三、clean生命周期核心阶

深入解析C++ 中std::map内存管理

《深入解析C++中std::map内存管理》文章详解C++std::map内存管理,指出clear()仅删除元素可能不释放底层内存,建议用swap()与空map交换以彻底释放,针对指针类型需手动de... 目录1️、基本清空std::map2️、使用 swap 彻底释放内存3️、map 中存储指针类型的对象

Java Scanner类解析与实战教程

《JavaScanner类解析与实战教程》JavaScanner类(java.util包)是文本输入解析工具,支持基本类型和字符串读取,基于Readable接口与正则分隔符实现,适用于控制台、文件输... 目录一、核心设计与工作原理1.底层依赖2.解析机制A.核心逻辑基于分隔符(delimiter)和模式匹

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

深度解析Python yfinance的核心功能和高级用法

《深度解析Pythonyfinance的核心功能和高级用法》yfinance是一个功能强大且易于使用的Python库,用于从YahooFinance获取金融数据,本教程将深入探讨yfinance的核... 目录yfinance 深度解析教程 (python)1. 简介与安装1.1 什么是 yfinance?

99%的人都选错了! 路由器WiFi双频合一还是分开好的专业解析与适用场景探讨

《99%的人都选错了!路由器WiFi双频合一还是分开好的专业解析与适用场景探讨》关于双频路由器的“双频合一”与“分开使用”两种模式,用户往往存在诸多疑问,本文将从多个维度深入探讨这两种模式的优缺点,... 在如今“没有WiFi就等于与世隔绝”的时代,越来越多家庭、办公室都开始配置双频无线路由器。但你有没有注