Android网络请求框架OkHttp源码分析

2024-08-25 13:08

本文主要是介绍Android网络请求框架OkHttp源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、开篇

阅读源码要有目的,关于OkHttp,我们要搞清楚以下几点:

  1. OkHttp网络请求的流程;
  2. 拦截器的作用及执行流程(责任链);
  3. OkHttp缓存控制;
  4. OkHttp的连接复用机制。

本文基于写作时的最新的OkHttp 4.9.1版本。

2、OkHttp的使用

OkHttp的使用步骤如下:

  1. 构建OkHttpClient对象,可以有两种方式

    // 方式一,直接new
    val client = OkHttpClient()// 方式二,Builder模式
    val client = OkHttpClient.Builder()// 增加拦截器,可以实现类似日志记录、增加全局请求字段等类似的功能//.addInterceptor()// 增加网络拦截器//.addNetworkInterceptor()// 设置请求超时时间, 0表示不限时间.callTimeout(10, TimeUnit.SECONDS)// 设置超时时间.connectTimeout(5, TimeUnit.SECONDS)// 设置读超时时间.readTimeout(5, TimeUnit.SECONDS)// 设置写超时时间.writeTimeout(5, TimeUnit.SECONDS)// 设置缓存//.cache().build()
    

    建议采用第二种方式,可以更加灵活的进行各项配置。其中仍然有许多参数可以设置,这里只列出了一些最常见的。

  2. 构建请求

    val request = Request.Builder()// 增加请求头.addHeader("key", "value")// 设置请求头,注意和addHeader的区别.header("key", "value")// 缓存控制.cacheControl(CacheControl.FORCE_NETWORK)// GET请求//.get()// POST请求//.post("{\"key\": \"value\"}".toRequestBody("application/json".toMediaType()))// 也可以通过method来灵活指定请求方法.method("GET", null)// 请求的URL.url("https://www.baidu.com").build()
    

    这里需要注意addHeader和header两个方法的区别。Http中,一个header的key是可以对应多个value的,addHeader方法是给相应的key增加的一个value,而header方法则是把现有的value全部清空,然后增加一个value。

  3. 发起请求并处理请求结果。OkHttp提供了两种请求方式:同步请求和异步请求。同步请求会在当前线程执行请求并返回结果,所以可能会阻塞线程。异步请求会在后台线程池执行请求,并通过回调的方式回传结果给调用者。

    // 同步请求
    val response = client.newCall(request).execute()
    // 输出结果
    println(response.body?.string() ?: "")// 异步请求
    client.newCall(request).enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {}override fun onResponse(call: Call, response: Response) {println(response.body?.string() ?: "")}
    })
    

这里不讨论WebSocket的相关用法。

2、OkHttp的请求流程

从上面我们可以看到,无论是同步请求还是异步请求,都先调用了OkHttpClinent的newcall方法:

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

此方法非常简单,就是new了一个RealCall对象,同步请求的时候,执行的是RealCall#execute方法:

override fun execute(): Response {// 一个RealCall对象不能执行两次check(executed.compareAndSet(false, true)) { "Already Executed" }timeout.enter()callStart()try {client.dispatcher.executed(this)return getResponseWithInterceptorChain()} finally {client.dispatcher.finished(this)}
}

同步请求的方法非常简单,从方法名可以看出getResponseWithInterceptorChain()是真正获取请求响应的地方。

异步请求的时候执行的是RealCall#enqueue方法:

override fun enqueue(responseCallback: Callback) {check(executed.compareAndSet(false, true)) { "Already Executed" }callStart()client.dispatcher.enqueue(AsyncCall(responseCallback))
}

AsyncCall是Realcall的内部类,同时它也实现了Runnable接口,所以主要的执行逻辑肯定是在其run方法:

override fun run() {threadName("OkHttp ${redactedUrl()}") {var signalledCallback = falsetimeout.enter()try {val response = getResponseWithInterceptorChain()signalledCallback = trueresponseCallback.onResponse(this@RealCall, response)} catch (e: IOException) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)} else {responseCallback.onFailure(this@RealCall, e)}} catch (t: Throwable) {cancel()if (!signalledCallback) {val canceledException = IOException("canceled due to $t")canceledException.addSuppressed(t)responseCallback.onFailure(this@RealCall, canceledException)}throw t} finally {client.dispatcher.finished(this)}}
}

这里我们可以看到callback回调执行的情况,同时也可以看出,它也是通过调用getResponseWithInterceptorChain()获取请求响应的。所以同步异步只是执行的线程不一样,本质上是一样的。

接下来我们来看看getResponseWithInterceptorChain()都干了些什么

internal fun getResponseWithInterceptorChain(): Response {// Build a full stack of interceptors.val interceptors = mutableListOf<Interceptor>()interceptors += client.interceptorsinterceptors += RetryAndFollowUpInterceptor(client)interceptors += BridgeInterceptor(client.cookieJar)interceptors += CacheInterceptor(client.cache)interceptors += ConnectInterceptorif (!forWebSocket) {interceptors += client.networkInterceptors}interceptors += CallServerInterceptor(forWebSocket)val chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = client.connectTimeoutMillis,readTimeoutMillis = client.readTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = falsetry {val response = chain.proceed(originalRequest)if (isCanceled()) {response.closeQuietly()throw IOException("Canceled")}return response} catch (e: IOException) {calledNoMoreExchanges = truethrow noMoreExchanges(e) as Throwable} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)}}
}

这里做的事情很简单,把开发者设置的各种拦截器和一些内置的拦截器封装成RealInterceptorChain,通过它获取请求响应并返回。注意拦截器的顺序,先是client.interceptors,也就是OkHttpClient.Builder#addInterceptor设置的拦截器,然后是RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor,再后面是client.networkInterceptors也就是OkHttpClient.Builder#addNetworkInterceptor设置的拦截器,最后是CallServerInterceptor。这些内置的拦截器基本上从命名上就可以看出它们的作用。

3、Dispatcher

可以看到无论同步还是异步,都调用了client.dispatcher的相关方法,它是okhttp3.Dispatcher的实例。我们知道一般叫做XXDispatcher或者Dispatcher的类都是负责分发和调度的。

再看一下同步请求的代码

override fun execute(): Response {// 一个RealCall对象不能执行两次check(executed.compareAndSet(false, true)) { "Already Executed" }timeout.enter()callStart()try {client.dispatcher.executed(this)return getResponseWithInterceptorChain()} finally {client.dispatcher.finished(this)}
}

这里先后调用了Dispatcher.executed()和Dispatcher.finished()

@Synchronized internal fun executed(call: RealCall) {runningSyncCalls.add(call)
}/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {finished(runningSyncCalls, call)
}private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}val isRunning = promoteAndExecute()if (!isRunning && idleCallback != null) {idleCallback.run()}
}

也就是在执行请求的时候,把这个RealCall对象添加到runningSyncCalls队列中,在请求结束之后移除。这个runningSyncCalls的作用是记录这个请求以便判断当前的状态以及方便统一取消请求,不涉及线程的调度。

再看异步请求

override fun enqueue(responseCallback: Callback) {check(executed.compareAndSet(false, true)) { "Already Executed" }callStart()client.dispatcher.enqueue(AsyncCall(responseCallback))
}

这里调用了Dispatcher的enqueue方法

internal fun enqueue(call: AsyncCall) {synchronized(this) {readyAsyncCalls.add(call)// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to// the same host.if (!call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)}}promoteAndExecute()
}private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()val executableCalls = mutableListOf<AsyncCall>()val isRunning: Booleansynchronized(this) {val i = readyAsyncCalls.iterator()while (i.hasNext()) {val asyncCall = i.next()if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.i.remove()asyncCall.callsPerHost.incrementAndGet()executableCalls.add(asyncCall)runningAsyncCalls.add(asyncCall)}isRunning = runningCallsCount() > 0}for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]asyncCall.executeOn(executorService)}return isRunning
}

这里先把这个AsyncCall添加到就绪队列readyAsyncCalls,然后在promoteAndExecute()中检查检查当前是否达到最大并发请求数量和单个Host的最大并发请求数量,如果都没有,就把asyncCall添加到执行队列runningAsyncCalls中,最后调用了asyncCall.executeOn(executorService),在线程池中执行这个请求。

执行完成后,会从runningAsyncCalls中移除这个AsyncCall。

internal inner class AsyncCall(private val responseCallback: Callback) : Runnable {...fun executeOn(executorService: ExecutorService) {client.dispatcher.assertThreadDoesntHoldLock()var success = falsetry {executorService.execute(this)success = true} catch (e: RejectedExecutionException) {val ioException = InterruptedIOException("executor rejected")ioException.initCause(e)noMoreExchanges(ioException)responseCallback.onFailure(this@RealCall, ioException)} finally {if (!success) {// 从执行队列移除 client.dispatcher.finished(this) // This call is no longer running!}}}// 这个方法上面贴过了,节省篇幅,省略中间的代码override fun run() {threadName("OkHttp ${redactedUrl()}") {var signalledCallback = falsetimeout.enter()try {...} finally {// 从执行队列移除 client.dispatcher.finished(this)}}}
}

也就是通过ExecutorService来执行请求。我们可以通过OkHttpClient.Builder#dispatcher()设置自定义的Dispatcher和ExecutorService。
默认的ExecutorService如下

@get:JvmName("executorService") val executorService: ExecutorServiceget() {if (executorServiceOrNull == null) {executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))}return executorServiceOrNull!!
}

4、拦截器责任链的执行

再回头看getResponseWithInterceptorChain()

internal fun getResponseWithInterceptorChain(): Response {...val chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = client.connectTimeoutMillis,readTimeoutMillis = client.readTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = falsetry {val response = chain.proceed(originalRequest)...return response} catch (e: IOException) {...} finally {...}
}

这里调用了RealInterceptorChain#proceed

@Throws(IOException::class)override fun proceed(request: Request): Response {...// Call the next interceptor in the chain.val next = copy(index = index + 1, request = request)val interceptor = interceptors[index]@Suppress("USELESS_ELVIS")val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}}check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response
}

这里主要是调用了第index个拦截器的intercept方法,index是RealInterceptorChain对象构造时的传进来的。注意到intercept是RealInterceptorChain对象,只是传入的index是当前index+1。

各个Interceptor的intercept方法如果能处理当前的请求,那么则返回处理结果对应的Response对象,否则应该调用入参的proceed方法,继续在责任链中传递。这就是请求责任链的执行过程。

如果开发者没有设置自定义的拦截器,那么责任链中的顺序是RetryAndFollowUpInterceptor -> BridgeInterceptor -> CacheInterceptor -> ConnectInterceptor -> CallServerInterceptor,接下来我们一个个分析这些拦截器的作用。

4.1 RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor负责重试和重定向的处理

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainvar request = chain.requestval call = realChain.callvar followUpCount = 0var priorResponse: Response? = nullvar newExchangeFinder = truevar recoveredFailures = listOf<IOException>()while (true) {call.enterNetworkInterceptorExchange(request, newExchangeFinder)var response: Responsevar closeActiveExchange = truetry {// 如果请求已经取消了,直接抛出异常if (call.isCanceled()) {throw IOException("Canceled")}try {// 继续在责任链中传递,获取responseresponse = realChain.proceed(request)newExchangeFinder = true} catch (e: RouteException) {...continue} catch (e: IOException) {...continue}// Attach the prior response if it exists. Such responses never have a body.if (priorResponse != null) {response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build()).build()}val exchange = call.interceptorScopedExchange// 处理重定向,生成新的Requestval followUp = followUpRequest(response, exchange)// 如果没有新的Request生成,那么这个就是最终结果,直接返回if (followUp == null) {if (exchange != null && exchange.isDuplex) {call.timeoutEarlyExit()}closeActiveExchange = falsereturn response}val followUpBody = followUp.bodyif (followUpBody != null && followUpBody.isOneShot()) {closeActiveExchange = falsereturn response}response.body?.closeQuietly()// 重定向超过最大限制次数,抛出异常if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")}request = followUppriorResponse = response} finally {call.exitNetworkInterceptorExchange(closeActiveExchange)}}
}@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {val route = exchange?.connection?.route()val responseCode = userResponse.codeval method = userResponse.request.method// 通过状态码来判断是否重定向when (responseCode) {HTTP_PROXY_AUTH -> {val selectedProxy = route!!.proxyif (selectedProxy.type() != Proxy.Type.HTTP) {throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")}return client.proxyAuthenticator.authenticate(route, userResponse)}HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {return buildRedirectRequest(userResponse, method)}HTTP_CLIENT_TIMEOUT -> {// 408's are rare in practice, but some servers like HAProxy use this response code. The// spec says that we may repeat the request without modifications. Modern browsers also// repeat the request (even non-idempotent ones.)if (!client.retryOnConnectionFailure) {// The application layer has directed us not to retry the request.return null}val requestBody = userResponse.request.bodyif (requestBody != null && requestBody.isOneShot()) {return null}val priorResponse = userResponse.priorResponseif (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {// We attempted to retry and got another timeout. Give up.return null}if (retryAfter(userResponse, 0) > 0) {return null}return userResponse.request}HTTP_UNAVAILABLE -> {val priorResponse = userResponse.priorResponseif (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {// We attempted to retry and got another timeout. Give up.return null}if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {// specifically received an instruction to retry without delayreturn userResponse.request}return null}HTTP_MISDIRECTED_REQUEST -> {// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then// we can retry on a different connection.val requestBody = userResponse.request.bodyif (requestBody != null && requestBody.isOneShot()) {return null}if (exchange == null || !exchange.isCoalescedConnection) {return null}exchange.connection.noCoalescedConnections()return userResponse.request}else -> return null}
}

这是一个死循环,除非抛出异常或者得到最终的Response才会返回。请求超时会通过抛出异常的方式退出循环。RetryAndFollowUpInterceptor没有直接处理请求,而是对责任链返回的Response进行了重试、状态码判断和重定向处理。

4.2 BridgeInterceptor

/*** Bridges from application code to network code. First it builds a network request from a user* request. Then it proceeds to call the network. Finally it builds a user response from the network* response.*/
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val userRequest = chain.request()val requestBuilder = userRequest.newBuilder()val body = userRequest.bodyif (body != null) {val contentType = body.contentType()if (contentType != null) {requestBuilder.header("Content-Type", contentType.toString())}val contentLength = body.contentLength()if (contentLength != -1L) {requestBuilder.header("Content-Length", contentLength.toString())requestBuilder.removeHeader("Transfer-Encoding")} else {requestBuilder.header("Transfer-Encoding", "chunked")requestBuilder.removeHeader("Content-Length")}}if (userRequest.header("Host") == null) {requestBuilder.header("Host", userRequest.url.toHostHeader())}if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.var transparentGzip = falseif (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {transparentGzip = truerequestBuilder.header("Accept-Encoding", "gzip")}val cookies = cookieJar.loadForRequest(userRequest.url)if (cookies.isNotEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies))}if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", userAgent)}val networkResponse = chain.proceed(requestBuilder.build())cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)val responseBuilder = networkResponse.newBuilder().request(userRequest)if (transparentGzip &&"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&networkResponse.promisesBody()) {val responseBody = networkResponse.bodyif (responseBody != null) {val gzipSource = GzipSource(responseBody.source())val strippedHeaders = networkResponse.headers.newBuilder().removeAll("Content-Encoding").removeAll("Content-Length").build()responseBuilder.headers(strippedHeaders)val contentType = networkResponse.header("Content-Type")responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))}}return responseBuilder.build()}/** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */private fun cookieHeader(cookies: List<Cookie>): String = buildString {cookies.forEachIndexed { index, cookie ->if (index > 0) append("; ")append(cookie.name).append('=').append(cookie.value)}}
}

可以看到,BridgeInterceptor也没有直接处理请求,而是把开发者的Request对象进行了重新包装,加上了“Content-Length”等HTTP标准的请求头信息。同时BridgeInterceptor也对返回结果进行了相应的处理。

4.3 CacheInterceptor

CacheInterceptor从类名就可以看出是处理缓存的拦截器

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {val call = chain.call()val cacheCandidate = cache?.get(chain.request())val now = System.currentTimeMillis()// 计算缓存策略val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()val networkRequest = strategy.networkRequestval cacheResponse = strategy.cacheResponsecache?.trackResponse(strategy)val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE// cacheCandidate不为空但是cacheResponse,缓存的结果已经无效if (cacheCandidate != null && cacheResponse == null) {// The cache candidate wasn't applicable. Close it.cacheCandidate.body?.closeQuietly()}// 如果请求策略是只检索缓存而不允许进行网络请求,但是本地没有相应的缓存,则请求失败,返回失败的Responseif (networkRequest == null && cacheResponse == null) {return Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(HTTP_GATEWAY_TIMEOUT).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_RESPONSE).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build().also {listener.satisfactionFailure(call, it)}}// 如果不需要网络请求,直接返回缓存的结果if (networkRequest == null) {return cacheResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse)).build().also {listener.cacheHit(call, it)}}if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse)} else if (cache != null) { // 缓存没有命中listener.cacheMiss(call)}var networkResponse: Response? = nulltry {// 真正处理网络请求的地方networkResponse = chain.proceed(networkRequest)} finally {// If we're crashing on I/O or otherwise, don't leak the cache body.if (networkResponse == null && cacheCandidate != null) {cacheCandidate.body?.closeQuietly()}}// If we have a cache response too, then we're doing a conditional get.if (cacheResponse != null) {// HTTP状态码为HTTP_NOT_MODIFIED(304)if (networkResponse?.code == HTTP_NOT_MODIFIED) {val response = cacheResponse.newBuilder().headers(combine(cacheResponse.headers, networkResponse.headers)).sentRequestAtMillis(networkResponse.sentRequestAtMillis).receivedResponseAtMillis(networkResponse.receivedResponseAtMillis).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build()networkResponse.body!!.close()// Update the cache after combining headers but before stripping the// Content-Encoding header (as performed by initContentStream()).cache!!.trackConditionalCacheHit()cache.update(cacheResponse, response)return response.also {listener.cacheHit(call, it)}} else {cacheResponse.body?.closeQuietly()}}val response = networkResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build()// 处理缓存if (cache != null) {if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {// Offer this request to the cache.val cacheRequest = cache.put(response)return cacheWritingResponse(cacheRequest, response).also {if (cacheResponse != null) {// This will log a conditional cache miss only.listener.cacheMiss(call)}}}if (HttpMethod.invalidatesCache(networkRequest.method)) {try {cache.remove(networkRequest)} catch (_: IOException) {// The cache cannot be written.}}}return response
}

这里逻辑非常清晰,根据缓存策略和本地缓存的有效性来决定是否请求网络。

  1. 如果请求策略是只检索缓存而不允许进行网络请求,但是本地没有相应的有效缓存,则请求失败。
  2. 如果本地有缓存且缓存策略规定不需要网络请求,直接返回缓存的结果。
  3. 如果HTTP状态码是304,那么说明本地缓存就是有效的响应数据,直接包装成Response返回。
  4. 责任链继续处理,并把本地缓存(如有)包装到cacheResponse字段,根据缓存策略决定是否更新本地缓存,返回结果。

接着我们跟进看看这个缓存的策略是怎么计算的,也就是CacheStrategy.Factory#compute(),此外注意CacheStrategy.Factory的三个构造参数,当前时间、当前请求对象以及本地缓存结果,而CacheStrategy有两个构造参数,一是网络请求对象,而是缓存结果对象。

fun compute(): CacheStrategy {val candidate = computeCandidate()// 本地没有缓存,但是request中指定只检索缓存,那么可以预测最终结果是请求失败if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {return CacheStrategy(null, null)}return candidate
}private fun computeCandidate(): CacheStrategy {// 本地没有缓存if (cacheResponse == null) {return CacheStrategy(request, null)}// handshake为空视为缓存无效if (request.isHttps && cacheResponse.handshake == null) {return CacheStrategy(request, null)}// 如果该请求不应该存在本地缓存,那么本地缓存是无效的。如果多次同样的请求缓存策略是一致的,那么这个检查就没有必要。if (!isCacheable(cacheResponse, request)) {return CacheStrategy(request, null)}// 根据调用者给request设置的缓存策略和请求头信息(If-Modified-Since、If-None-Match)判断是否使用本地缓存val requestCaching = request.cacheControlif (requestCaching.noCache || hasConditions(request)) {return CacheStrategy(request, null)}val responseCaching = cacheResponse.cacheControlval ageMillis = cacheResponseAge()var freshMillis = computeFreshnessLifetime()if (requestCaching.maxAgeSeconds != -1) {freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))}var minFreshMillis: Long = 0if (requestCaching.minFreshSeconds != -1) {minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())}var maxStaleMillis: Long = 0if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())}// 缓存足够fresh,不需要网络请求,只是用本地缓存if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {val builder = cacheResponse.newBuilder()if (ageMillis + minFreshMillis >= freshMillis) {builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")}val oneDayMillis = 24 * 60 * 60 * 1000Lif (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")}return CacheStrategy(null, builder.build())}// Find a condition to add to the request. If the condition is satisfied, the response body// will not be transmitted.val conditionName: Stringval conditionValue: String?when {etag != null -> {conditionName = "If-None-Match"conditionValue = etag}lastModified != null -> {conditionName = "If-Modified-Since"conditionValue = lastModifiedString}servedDate != null -> {conditionName = "If-Modified-Since"conditionValue = servedDateString}else -> return CacheStrategy(request, null) // No condition! Make a regular request.}val conditionalRequestHeaders = request.headers.newBuilder()conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)val conditionalRequest = request.newBuilder().headers(conditionalRequestHeaders.build()).build()return CacheStrategy(conditionalRequest, cacheResponse)
}

由此我们可以看到,缓存是由request中cacheControl和HTTP协议本身共同控制的。需要注意的是,OkHttpClient.Builder中的cache默认是null,也就是说默认是不启用缓存的。你也可以通过以下方式开启缓存

val client = OkHttpClient.Builder().cache(Cache(File("your cache path"), 10 * 1024 * 1024)).build()

OkHttp的Cache是LRU Cache,也就是最近最少使用算法实现缓存,具体的实现类是okhttp3.internal.cache.DiskLruCache。

4.4 ConnectInterceptor

ConnectInterceptor,连接拦截器,连接复用就在这里。ConnectInterceptor类的代码尤为简单

object ConnectInterceptor : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = realChain.call.initExchange(chain)val connectedChain = realChain.copy(exchange = exchange)return connectedChain.proceed(realChain.request)}
}

主要是调用了Realcall#initExchange方法,来看看这个方法做了什么

internal fun initExchange(chain: RealInterceptorChain): Exchange {synchronized(this) {check(expectMoreExchanges) { "released" }check(!responseBodyOpen)check(!requestBodyOpen)}val exchangeFinder = this.exchangeFinder!!val codec = exchangeFinder.find(client, chain)val result = Exchange(this, eventListener, exchangeFinder, codec)this.interceptorScopedExchange = resultthis.exchange = resultsynchronized(this) {this.requestBodyOpen = truethis.responseBodyOpen = true}if (canceled) throw IOException("Canceled")return result
}

主要是做了一些检查,然后通过exchangeFinder获取到一个ExchangeCodec对象codec,然后new了一个Exchange对象返回。

先看.ExchangeFinder#find

fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {try {val resultConnection = findHealthyConnection(connectTimeout = chain.connectTimeoutMillis,readTimeout = chain.readTimeoutMillis,writeTimeout = chain.writeTimeoutMillis,pingIntervalMillis = client.pingIntervalMillis,connectionRetryEnabled = client.retryOnConnectionFailure,doExtensiveHealthChecks = chain.request.method != "GET")return resultConnection.newCodec(client, chain)} catch ...
}@Throws(IOException::class)
private fun findHealthyConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean,doExtensiveHealthChecks: Boolean): RealConnection {while (true) {val candidate = findConnection(connectTimeout = connectTimeout,readTimeout = readTimeout,writeTimeout = writeTimeout,pingIntervalMillis = pingIntervalMillis,connectionRetryEnabled = connectionRetryEnabled)// Confirm that the connection is good.if (candidate.isHealthy(doExtensiveHealthChecks)) {return candidate}// If it isn't, take it out of the pool.candidate.noNewExchanges()// Make sure we have some routes left to try. One example where we may exhaust all the routes// would happen if we made a new connection and it immediately is detected as unhealthy.if (nextRouteToTry != null) continueval routesLeft = routeSelection?.hasNext() ?: trueif (routesLeft) continueval routesSelectionLeft = routeSelector?.hasNext() ?: trueif (routesSelectionLeft) continuethrow IOException("exhausted all routes")}
}/*** Returns a connection to host a new stream. This prefers the existing connection if it exists,* then the pool, finally building a new connection.** This checks for cancellation before each blocking operation.*/
@Throws(IOException::class)
private fun findConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean): RealConnection {// 请求已经取消了,直接抛出异常if (call.isCanceled()) throw IOException("Canceled")// 先检查call本身是否已经被分配了连接val callConnection = call.connection if (callConnection != null) {var toClose: Socket? = nullsynchronized(callConnection) {if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {toClose = call.releaseConnectionNoEvents()}}// 如果已经分配连接,重用这个连接if (call.connection != null) {check(toClose == null)return callConnection}// 连接已经释放了toClose?.closeQuietly()eventListener.connectionReleased(call, callConnection)}// We need a new connection. Give it fresh stats.refusedStreamCount = 0connectionShutdownCount = 0otherFailureCount = 0// 从连接池获取一个连接if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}// 连接池没有符合条件的连接,那么需要建立新的连接// 先查找对应的路由val routes: List<Route>?val route: Routeif (nextRouteToTry != null) {// Use a route from a preceding coalesced connection.routes = nullroute = nextRouteToTry!!nextRouteToTry = null} else if (routeSelection != null && routeSelection!!.hasNext()) {// Use a route from an existing route selection.routes = nullroute = routeSelection!!.next()} else {// Compute a new route selection. This is a blocking operation!var localRouteSelector = routeSelectorif (localRouteSelector == null) {localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)this.routeSelector = localRouteSelector}val localRouteSelection = localRouteSelector.next()routeSelection = localRouteSelectionroutes = localRouteSelection.routesif (call.isCanceled()) throw IOException("Canceled")// Now that we have a set of IP addresses, make another attempt at getting a connection from// the pool. We have a better chance of matching thanks to connection coalescing.if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}route = localRouteSelection.next()}// 建立连接val newConnection = RealConnection(connectionPool, route)call.connectionToCancel = newConnectiontry {newConnection.connect(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,call,eventListener)} finally {call.connectionToCancel = null}call.client.routeDatabase.connected(newConnection.route())// If we raced another call connecting to this host, coalesce the connections. This makes for 3// different lookups in the connection pool!if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {val result = call.connection!!nextRouteToTry = routenewConnection.socket().closeQuietly()eventListener.connectionAcquired(call, result)return result}synchronized(newConnection) {connectionPool.put(newConnection)call.acquireConnectionNoEvents(newConnection)}eventListener.connectionAcquired(call, newConnection)return newConnection
}

4.5 CallServerInterceptor

CallServerInterceptor才是真正向服务器请求数据的地方,事实上HTTP协议通信的本质也不过是通过Socket进行数据的收发,而HTTP本身有它规定的格式,双方遵循协议进行数据解析和数据发送就完成了HTTP请求。这里很多细节我们在App开发中不需要全部了解,所以这里也不深入去论述了。

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = realChain.exchange!!val request = realChain.requestval requestBody = request.bodyval sentRequestMillis = System.currentTimeMillis()exchange.writeRequestHeaders(request)var invokeStartEvent = truevar responseBuilder: Response.Builder? = nullif (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100// Continue" response before transmitting the request body. If we don't get that, return// what we did get (such as a 4xx response) without ever transmitting the request body.if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {exchange.flushRequest()responseBuilder = exchange.readResponseHeaders(expectContinue = true)exchange.responseHeadersStart()invokeStartEvent = false}if (responseBuilder == null) {if (requestBody.isDuplex()) {// Prepare a duplex body so that the application can send a request body later.exchange.flushRequest()val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()requestBody.writeTo(bufferedRequestBody)} else {// Write the request body if the "Expect: 100-continue" expectation was met.val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()requestBody.writeTo(bufferedRequestBody)bufferedRequestBody.close()}} else {exchange.noRequestBody()if (!exchange.connection.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection// from being reused. Otherwise we're still obligated to transmit the request body to// leave the connection in a consistent state.exchange.noNewExchangesOnConnection()}}} else {exchange.noRequestBody()}if (requestBody == null || !requestBody.isDuplex()) {exchange.finishRequest()}if (responseBuilder == null) {responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {exchange.responseHeadersStart()invokeStartEvent = false}}var response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()var code = response.codeif (code == 100) {// Server sent a 100-continue even though we did not request one. Try again to read the actual// response status.responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {exchange.responseHeadersStart()}response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()code = response.code}exchange.responseHeadersEnd(response)response = if (forWebSocket && code == 101) {// Connection is upgrading, but we need to ensure interceptors see a non-null response body.response.newBuilder().body(EMPTY_RESPONSE).build()} else {response.newBuilder().body(exchange.openResponseBody(response)).build()}if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||"close".equals(response.header("Connection"), ignoreCase = true)) {exchange.noNewExchangesOnConnection()}if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {throw ProtocolException("HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")}return response}
}

5、总结

OkHttp使用了责任链设计模式来处理网络请求,实际上真正会处理请求的其实只有CacheInterceptor和CallServerInterceptor。OkHttp本身默认不开启缓存,但是提供了默认的文件缓存实现DiskLruCache。另外我们可以实现自定义的拦截器去实现一些定制化的功能,通过OkHttpClient.Builder的addInterceptor和addNetworkInterceptor添加到责任链的不同位置。

这篇关于Android网络请求框架OkHttp源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1105633

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

Android平台播放RTSP流的几种方案探究(VLC VS ExoPlayer VS SmartPlayer)

技术背景 好多开发者需要遴选Android平台RTSP直播播放器的时候,不知道如何选的好,本文针对常用的方案,做个大概的说明: 1. 使用VLC for Android VLC Media Player(VLC多媒体播放器),最初命名为VideoLAN客户端,是VideoLAN品牌产品,是VideoLAN计划的多媒体播放器。它支持众多音频与视频解码器及文件格式,并支持DVD影音光盘,VCD影

ASIO网络调试助手之一:简介

多年前,写过几篇《Boost.Asio C++网络编程》的学习文章,一直没机会实践。最近项目中用到了Asio,于是抽空写了个网络调试助手。 开发环境: Win10 Qt5.12.6 + Asio(standalone) + spdlog 支持协议: UDP + TCP Client + TCP Server 独立的Asio(http://www.think-async.com)只包含了头文件,不依

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

poj 3181 网络流,建图。

题意: 农夫约翰为他的牛准备了F种食物和D种饮料。 每头牛都有各自喜欢的食物和饮料,而每种食物和饮料都只能分配给一头牛。 问最多能有多少头牛可以同时得到喜欢的食物和饮料。 解析: 由于要同时得到喜欢的食物和饮料,所以网络流建图的时候要把牛拆点了。 如下建图: s -> 食物 -> 牛1 -> 牛2 -> 饮料 -> t 所以分配一下点: s  =  0, 牛1= 1~

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者