本文主要是介绍Okhttp3 设计思想学习,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前言
Okhttp
的源码其实并不难,难的是对于Http的协议的熟悉程度.比如如何缓存.其实Okhttp
参考了当前很多热门的浏览器源码而编写成的客户端.你可以看到很多线程数的设置都参考了主流浏览器.你可以把Okhttp
视为一个无渲染界面的浏览器即可.
Http缓存
想要看得懂Okhttp
拦截器你首先必须对Http协议自带的缓存机制有一定的了解.
这里给出两个参考
1 HTTP 缓存(建议切换到英文版本)
2 Wikipedia HTTP头字段
3 可能是最被误用的 HTTP 响应头之一 Cache-Control: must-revalidate
4 Cache-Control
- 缓存基础概念:
浏览器会将一些Http响应
存储到本地
,下次再次访问一个地址时,判断缓存是否存在且有效,如果有效直接返回磁盘存储的响应体即可.如果没有缓存重新请求服务器.如果缓存存在但是已经过期,那么进行缓存重校验
:带着一些特殊请求参数询问服务器缓存是否还可用,如果还可以用那么返回状态码304
且不包含实际响应体.如果缓存无效服务端直接重新返回新的资源,状态码为200
你会发现浏览器在处理缓存和重新校验过期缓存是否有效时会大量处理请求头,这里我们给出一个总结图(你可以通过上文的参考文献了解具体的缓存细节):
(可点击放大图片哦)
上面便是一个不太严谨的缓存流程,当然你需要注意的是仅有Get
能被缓存.
Okhttp
也实现上面的一套磁盘缓存流程.
Http1.1协议
我们知道Http1.1
默认实现了保持长连接和复用Socket
的功能,且大多数浏览器对于单个地址(ip和端口相同)套接字仅开启若干个链接.
上图是Chrome
浏览器中的某次请求每一行都是一个socket
.在okhttp
中最大开启5个相同地址套接字.
Http2.0 协议
我们知道Http2.0
有一个多路复用
的概念,请看下图
也就是说Okhttp同样要实现Http2.0
所有特性,当然本文不会分析这块源码.只是让读者在自行分析这块源码可以先了解Http2.0
在进行分析不然你不会明白Okhttp到底在写什么
源码分析
看完上面的前置的知识你会发现Okhttp
要实现的功能真的很庞大,这也是为什么Okhttp
能在众多网络框架脱颖而出的原因.
Okhttp
默认不开启缓存,所以为了分析源码流程你可以开启缓存以及tls/ssl证书方便抓包分析.
这里给出一个网上抄写的代码段:
/*** Created by PandaQ on 2016/11/10.* 封装的支持Https连接的Okhttp客户端* email : 767807368@qq.com*/public class HttpsUtils {//返回一个带缓存和信任所有证书的okhttppublic OkHttpClient getTrustAllClient() {//添加缓存Cache cache = new Cache(new File("todo 缓存路径", "http_cache"),// $0.05 worth of phone storage in 202050L * 1024L * 1024L // 50 MiB);OkHttpClient.Builder mBuilder = new OkHttpClient.Builder();//信任所有证书mBuilder.sslSocketFactory(createSSLSocketFactory(), mMyTrustManager).hostnameVerifier(new TrustAllHostnameVerifier()).cache(cache);return mBuilder.build();}private MyTrustManager mMyTrustManager;public SSLSocketFactory createSSLSocketFactory() {SSLSocketFactory ssfFactory = null;try {mMyTrustManager = new MyTrustManager();SSLContext sc = SSLContext.getInstance("TLS");sc.init(null, new TrustManager[]{mMyTrustManager}, new SecureRandom());ssfFactory = sc.getSocketFactory();} catch (Exception ignored) {ignored.printStackTrace();}return ssfFactory;}//实现X509TrustManager接口public class MyTrustManager implements X509TrustManager {@Overridepublic void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {}@Overridepublic void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {}@Overridepublic X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];}}//实现HostnameVerifier接口public static class TrustAllHostnameVerifier implements HostnameVerifier {@Overridepublic boolean verify(String hostname, SSLSession session) {return true;}}/*** 对外提供的获取支持自签名的okhttp客户端** @param certificate 自签名证书的输入流* @return 支持自签名的客户端*/public OkHttpClient getTrusClient(InputStream certificate) {X509TrustManager trustManager;SSLSocketFactory sslSocketFactory;try {trustManager = trustManagerForCertificates(certificate);SSLContext sslContext = SSLContext.getInstance("TLS");//使用构建出的trustManger初始化SSLContext对象sslContext.init(null, new TrustManager[]{trustManager}, null);//获得sslSocketFactory对象sslSocketFactory = sslContext.getSocketFactory();} catch (GeneralSecurityException e) {throw new RuntimeException(e);}return new OkHttpClient.Builder().sslSocketFactory(sslSocketFactory, trustManager).build();}/*** 获去信任自签证书的trustManager** @param in 自签证书输入流* @return 信任自签证书的trustManager* @throws GeneralSecurityException*/private X509TrustManager trustManagerForCertificates(InputStream in)throws GeneralSecurityException {CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");//通过证书工厂得到自签证书对象集合Collection<? extends Certificate> certificates = certificateFactory.generateCertificates(in);if (certificates.isEmpty()) {throw new IllegalArgumentException("expected non-empty set of trusted certificates");}//为证书设置一个keyStorechar[] password = "password".toCharArray(); // Any password will work.KeyStore keyStore = newEmptyKeyStore(password);int index = 0;//将证书放入keystore中for (Certificate certificate : certificates) {String certificateAlias = Integer.toString(index++);keyStore.setCertificateEntry(certificateAlias, certificate);}// Use it to build an X509 trust manager.//使用包含自签证书信息的keyStore去构建一个X509TrustManagerKeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());keyManagerFactory.init(keyStore, password);TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());trustManagerFactory.init(keyStore);TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {throw new IllegalStateException("Unexpected default trust managers:"+ Arrays.toString(trustManagers));}return (X509TrustManager) trustManagers[0];}private KeyStore newEmptyKeyStore(char[] password) throws GeneralSecurityException {try {KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());InputStream in = null; // By convention, 'null' creates an empty key store.keyStore.load(null, password);return keyStore;} catch (IOException e) {throw new AssertionError(e);}}
}
本文分析okhttp4.9.0
版本.
分发请求事件
我们看一个小例子
fun main() {val httpsUtils = HttpsUtils()//得到一个val okHttpClient = httpsUtils.trustAllClientval request: Request = Request.Builder().url("https://publicobject.com/helloworld.txt").build()//okHttpClient.newCall返回对象为RealCallval newCall: RealCall = okHttpClient.newCall(request) as RealCall//同步调用val response = newCall.execute()//异步调用val response2 = newCall.enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {}override fun onResponse(call: Call, response: Response) {}})
}
我们知道okhttp有同步调用和异步调用方式如下:
class RealCall(val client: OkHttpClient,/** The application's original request unadulterated by redirects or auth headers. */val originalRequest: Request,val forWebSocket: Boolean
) : Call {//同步调用override fun execute(): Response {//..略try {//关注点 ------------------1client.dispatcher.executed(this)//责任链相关,这里我们先不关心//这里会执行责任链完成http请求return getResponseWithInterceptorChain()} finally {//完成后移除任务,取出另一个准备运行的任务client.dispatcher.finished(this)}}//异步调用override fun enqueue(responseCallback: Callback) {//关注点 ------------------2client.dispatcher.enqueue(AsyncCall(responseCallback))}
}
不管异步还是同步调用都会调用client.dispatcher
相关函数.
我们首先看看同步的情况client.dispatcher.executed(this)
class Dispatcher constructor() {/** Running synchronous calls. Includes canceled calls that haven't finished yet. */private val runningSyncCalls = ArrayDeque<RealCall>()//可以看到被Synchronized修饰,也就是多线程情况执行保证可见性有序性等@Synchronized internal fun executed(call: RealCall) {//放入集合中,这个集合存放所有同步运行的任务runningSyncCalls.add(call)}
}
我们同样看看异步相关函数:
client.dispatcher.enqueue(AsyncCall(responseCallback))
class Dispatcher constructor() {/** Running synchronous calls. Includes canceled calls that haven't finished yet. */// 存放同步运行的任务private val runningSyncCalls = ArrayDeque<RealCall>()/** Ready async calls in the order they'll be run. *///存放所有还未运行的异步任务private val readyAsyncCalls = ArrayDeque<AsyncCall>()/** Running asynchronous calls. Includes canceled calls that haven't finished yet. *///正在运行的异步任务private val runningAsyncCalls = ArrayDeque<AsyncCall>()internal fun enqueue(call: AsyncCall) {synchronized(this) {//存放到准备运行的异步队列中readyAsyncCalls.add(call)//判断是否为websocket//websocket无法共享socket套接字if (!call.call.forWebSocket) {//从正在运行的异步队列或者准备运行队列中获取一个相同域名call,//每个相同域名的请求会共享同一个计数器AtomicInteger实例//计数器用于统计相同域名请求数量val existingCall = findExistingCallWithHost(call.host)//将上一个call的计数器对象放入新的call中if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)}}//判断当前是否可以直接运行call任务而不是在readyAsyncCalls队列中promoteAndExecute()}private fun findExistingCallWithHost(host: String): AsyncCall? {//从正在运行的异步队列或者准备运行队列中获取一个相同域名callfor (existingCall in runningAsyncCalls) {if (existingCall.host == host) return existingCall}for (existingCall in readyAsyncCalls) {if (existingCall.host == host) return existingCall}return null}}
internal inner class AsyncCall(private val responseCallback: Callback) : Runnable {//这里Volatile用于保证callsPerHost这个对象赋值后立马对其他线程可见//AtomicInteger内部计数器已经保证可见性@Volatile var callsPerHost = AtomicInteger(0)private set//共享计数器fun reuseCallsPerHostFrom(other: AsyncCall) {this.callsPerHost = other.callsPerHost}
class Dispatcher constructor() {private fun promoteAndExecute(): Boolean {val executableCalls = mutableListOf<AsyncCall>()val isRunning: Boolean//使用粗锁synchronized(this) {//待运行的队列val i = readyAsyncCalls.iterator()while (i.hasNext()) {val asyncCall = i.next()//当前运行的http请求数量必须小于64个. var maxRequests = 64if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.//callsPerHost为AtomicInteger(0) 同一个域名的共享同一个AtomicInteger对象,是用于统计当前到底有多个相同域名请求,你可以enqueue函数中看到他们是如何共享这个计数器的//当前判断对同一个域名的请求是否大于5个.maxRequestsPerHost默认为5if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.//能运行到这个证明 符合运行条件,将这个任务取出放入runningAsyncCallsi.remove()//这个域名计数器加一asyncCall.callsPerHost.incrementAndGet()//放入集合中待会全部交付给线程池执行executableCalls.add(asyncCall)//放入运行队列runningAsyncCalls.add(asyncCall)}//runningCallsCount等于 runningAsyncCalls.size + runningSyncCalls.sizeisRunning = runningCallsCount() > 0}//从集合中取出然后交付给线程池执行for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]//executorService是一个线程池,这里就是讲asyncCall丢到线程池执行asyncCall.executeOn(executorService)}return isRunning}
}
class internal inner class AsyncCall(private val responseCallback: Callback) {fun executeOn(executorService: ExecutorService) {//略过无关代码//丢到线程池执行executorService.execute(this)}}
我们最后看看executorService
线程池
class Dispatcher constructor() {//实际的线程池对象private var executorServiceOrNull: ExecutorService? = null//executorService只是一个代理对象实现了懒加载//@get:Synchronized防止数据竞争@get:Synchronized@get:JvmName("executorService") val executorService: ExecutorServiceget() {if (executorServiceOrNull == null) {//SynchronousQueue()配合Int.MAX_VALUE可以看到这个线程池是无限制的线程池//只要来任务就开辟线程或者复用线程进行调度//这里之所以构造这样的线程池我想是因为Okhttp已经外部严格控制了任务队列了,不需要线程池掌控调度executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))}return executorServiceOrNull!!}
}
当然最后会回调AsyncCall
的run
函数
internal inner class AsyncCall(private val responseCallback: Callback) : Runnable {override fun run() {//threadName是一个工具类,仅仅调用后面的闭包时修改线程名字,调用完修改回线程名字//这里主要方便调试threadName("OkHttp ${redactedUrl()}") {try {//这里主要构建责任链,责任链负责真正的网络请求,也就是网上看到//责任链处理完后得到响应结果返回给回调val response = getResponseWithInterceptorChain()//这里将结果回调responseCallback.onResponse(this@RealCall, response)} catch (e: IOException) {//回调错误responseCallback.onFailure(this@RealCall, e)} catch (t: Throwable) {//其他错误处理cancel()throw t} finally {//完成请求的后续处理client.dispatcher.finished(this)}}}}}
在分析责任链之前我们看看,请求完成之后是如何处理的,也就是client.dispatcher.finished(this)
class Dispatcher constructor() {internal fun finished(call: AsyncCall) {//同域名请求的计数器数量减一call.callsPerHost.decrementAndGet()finished(runningAsyncCalls, call)}private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {//从runningAsyncCalls移除这个这个请求if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}//promoteAndExecute前面已经讲过了.取出待运行的任务运行//如果有运行那么isRunning返回trueval isRunning = promoteAndExecute()//如果没有运行的请求,那么回调idleCallback//这个idleCallback是okhttp留给上层开发者自行实现的.//如果你需要实现的话可以按照:okHttpClient.dispatcher.idleCallback= Runnable{}if (!isRunning && idleCallback != null) {idleCallback.run()}}
}
我们总结下Okhttp
任务的分发流程.
- 同步请求:直接将任务放入
runningSyncCalls
然后在当前线程运行. - 异步请求:先放入
readyAsyncCalls
,然后执行promoteAndExecute
函数,函数内部会判断是否能取出readyAsyncCalls
任务运行 - 异步请求结束:回调监听接口,再次执行
promoteAndExecute
函数 promoteAndExecute
内部细节:当前并发请求的是不超过64个,且同域名请求不能超过5个.如果满足上面的两个条件取出readyAsyncCalls
队列任务运行.
责任链
上面我们知道当okhttp
调用getResponseWithInterceptorChain()
后便得到结果.
getResponseWithInterceptorChain()
内部会构建一个责任链模式的代码,每个节点都负责处理一个任务,最后将结果返回即可.
class RealCall(val client: OkHttpClient,/** The application's original request unadulterated by redirects or auth headers. */val originalRequest: Request,val forWebSocket: Boolean
) : Call {@Throws(IOException::class)internal fun getResponseWithInterceptorChain(): Response {// Build a full stack of interceptors.val interceptors = mutableListOf<Interceptor>()//添加一个开发者定义的 应用拦截器.你可以从这里知道应用拦截器是第一个放入链的interceptors += client.interceptors//一个处理处理重试和重定向拦截器interceptors += RetryAndFollowUpInterceptor(client)//BridgeInterceptor用于添加一些默认的常用请求头interceptors += BridgeInterceptor(client.cookieJar)//缓存相关的拦截器interceptors += CacheInterceptor(client.cache)//ConnectInterceptor主要用来寻找创建或者复用套接字interceptors += ConnectInterceptor//如果当前不是websocket那么添加网络拦截器if (!forWebSocket) {//开发者自定义的网络拦截器,interceptors += client.networkInterceptors}//CallServerInterceptor主要是用来向转化封装的对象为字节序列然后发送到套接字//或者从套接字从读取结果转化为上层对象interceptors += CallServerInterceptor(forWebSocket)//用上述的集合构建成一个链对象val chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = client.connectTimeoutMillis,readTimeoutMillis = client.readTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = falsetry {//执行各个拦截器val response = chain.proceed(originalRequest)//返回结果return response}
}
看完上面的代码相比你能更清楚的理解网络拦截器
和应用拦截器
的区别在哪.
网络拦截器
:位于几乎最底层,所以可得到重定向和前几个拦截器添加额外的请求头信息等.
应用拦截器
:位于责任链顶层,获取不到重定向和额外的请求头信息,但是可以获得最终的响应结果.
接下来我们将分析5个系统所提供的拦截器.
每个拦截器都需要实现Interceptor
接口:
fun interface Interceptor {@Throws(IOException::class)fun intercept(chain: Chain): Response
你可以在intercept
中修改或者短路掉请求或者响应.如果你想放行请求到下一个责任链调用chain.proceed(chain.request())
即可
- (1) RetryAndFollowUpInterceptor
一个用来进行容错处理
以及重定向
相关的拦截器
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainvar request = chain.requestval call = realChain.callvar followUpCount = 0var priorResponse: Response? = nullvar newExchangeFinder = true//用于存储在下文中错误重试,每次的异常 var recoveredFailures = listOf<IOException>()//这里开启一个死循环是用于错误重试while (true) {call.enterNetworkInterceptorExchange(request, newExchangeFinder)var response: Responsevar closeActiveExchange = true//try-FLAG_OUTERtry {//被取消就抛出一个IO异常结束整个责任链if (call.isCanceled()) {throw IOException("Canceled")}//try-FLAG_REQUESTtry {//放行请求 然后得到结果response = realChain.proceed(request)//newExchangeFinder = true} catch (e: RouteException) {//路由异常在ConnectInterceptor拦截器中触发.//RouteException异常一般为socket链接期间的错误,比如超时等//调用recover判断是否能进行重试操作,如果能重试那么recover函数返回trueif (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)} else {//存储错误到集合,然后重新执行请求recoveredFailures += e.firstConnectException}newExchangeFinder = false//重试执行continue} catch (e: IOException) {//IO错误一般是socket链接后的各种写入写出的错误 //判断是否能恢复if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)} else {recoveredFailures += e}newExchangeFinder = falsecontinue}//END --- try-FLAG_REQUEST// Attach the prior response if it exists. Such responses never have a body.//priorResponse存储的是前一次响应,priorResponse一般都没有响应体,比如301,302等重定向的响应if (priorResponse != null) {//把之前的响应放入priorResponse变量中,也就是重定向可以获取到之前响应体//你可以调用得到结果的时候:response.priorResponse获取之前重定向的具体信息response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build()).build()}val exchange = call.interceptorScopedExchange//计算出当前是否需要重定向//如果重定向followUp不为空val followUp = followUpRequest(response, exchange)//等于null 证明不需要重定向等操作直接返回结果 if (followUp == null) {if (exchange != null && exchange.isDuplex) {call.timeoutEarlyExit()}closeActiveExchange = falsereturn response}//如果重定向有响应体也直接返回 val followUpBody = followUp.bodyif (followUpBody != null && followUpBody.isOneShot()) {closeActiveExchange = falsereturn response}response.body?.closeQuietly()//判断是重定向次数是否超过最大限度.MAX_FOLLOW_UPS默认为20if (++followUpCount > MAX_FOLLOW_UPS) {//重定向超过次数抛出异常throw ProtocolException("Too many follow-up requests: $followUpCount")}request = followUp//存储当前响应体,重新进入循环,下一轮循环完成重定向请求priorResponse = response} finally {call.exitNetworkInterceptorExchange(closeActiveExchange)}//END --- try-FLAG_OUTER}//END --- while (true)}// END --- override fun intercept(chain: Interceptor.Chain)
}// END --- class RetryAndFollowUpInterceptor
我们可以看到RetryAndFollowUpInterceptor
出现了异常的时候调用recover
判断是否能重新请求,调用followUpRequest
判断是否需要重定向等操作.
我们首先看看容错函数recover
private fun recover(e: IOException,call: RealCall,userRequest: Request,requestSendStarted: Boolean): Boolean {// The application layer has forbidden retries.//开发者手动禁止重试操作 比如:new OkHttpClient.Builder().retryOnConnectionFailure(false)if (!client.retryOnConnectionFailure) return false// We can't send the request body again.//requestSendStarted标识当前请求体是否已经发给了服务端,如果已经发送了okhttp禁止尝试//否则服务端可能受到两次请求的情况if (requestSendStarted && requestIsOneShot(e, userRequest)) return false// This exception is fatal.//如果当前错误是不可以容忍的比如SSL错误,这个错误重试几次都是错的//if (!isRecoverable(e, requestSendStarted)) return false// No more routes to attempt.//没有更多路由路径可以让我重新进行请求//路由:指的是如何 通过DNS得到的ip然后通过代理/非代理手段最终访问到服务端的一个方式,你可能通过dns得到多个ip,或者得到多个代理将他们组合在一起就是一个路由表.//切换路由:你可以简单理解切换一个代理或者ip再去访问(一个域名可以映射到多个ip).if (!call.retryAfterFailure()) return false// For failure recovery, use the same route selector with a new connection.return true}
关于路由选择您可以参阅OkHttp源码解析 (三)——代理和路由
isRecoverable
函数用于判断当前函数是否是可恢复的
private fun isRecoverable(e: IOException, requestSendStarted: Boolean): Boolean {// If there was a protocol problem, don't recover.//ProtocolException表示通信双方协议出错,比如返回了一个不认识的状态码,或者对方不按要求返回格式if (e is ProtocolException) {return false}// If there was an interruption don't recover, but if there was a timeout connecting to a route// we should try the next route (if there is one).if (e is InterruptedIOException) {return e is SocketTimeoutException && !requestSendStarted}// Look for known client-side or negotiation errors that are unlikely to be fixed by trying// again with a different route.if (e is SSLHandshakeException) {// If the problem was a CertificateException from the X509TrustManager,// do not retry.if (e.cause is CertificateException) {return false}}if (e is SSLPeerUnverifiedException) {// e.g. a certificate pinning error.return false}// An example of one we might want to retry with a different route is a problem connecting to a// proxy and would manifest as a standard IOException. Unless it is one we know we should not// retry, we return true and try a new route.return true}
上面的一些异常我们平常比较难需要这里仅作为了解,我们最后看看这个拦截器判断是否需要重定向followUpRequest
函数
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {val route = exchange?.connection?.route()val responseCode = userResponse.codeval method = userResponse.request.methodwhen (responseCode) {//状态407 表示需要认证所以 重新加入Proxy-Authorization头和认证信息 再次访问网络HTTP_PROXY_AUTH -> {val selectedProxy = route!!.proxyif (selectedProxy.type() != Proxy.Type.HTTP) {throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")}return client.proxyAuthenticator.authenticate(route, userResponse)}//401认证失败 重新加入Authorization头和认证信息 再次访问网络HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)//一些重定向的状态码,比如302 301 307等,需要这里状态码直接重定向HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {return buildRedirectRequest(userResponse, method)}//状态码 408 这个错误比较少见直接重新请求 HTTP_CLIENT_TIMEOUT -> {// 408's are rare in practice, but some servers like HAProxy use this response code. The// spec says that we may repeat the request without modifications. Modern browsers also// repeat the request (even non-idempotent ones.)if (!client.retryOnConnectionFailure) {// The application layer has directed us not to retry the request.return null}val requestBody = userResponse.request.bodyif (requestBody != null && requestBody.isOneShot()) {return null}val priorResponse = userResponse.priorResponseif (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {// We attempted to retry and got another timeout. Give up.return null}if (retryAfter(userResponse, 0) > 0) {return null}return userResponse.request}//略 503 错误HTTP_UNAVAILABLE -> {val priorResponse = userResponse.priorResponseif (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {// We attempted to retry and got another timeout. Give up.return null}if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {// specifically received an instruction to retry without delayreturn userResponse.request}return null}//略HTTP_MISDIRECTED_REQUEST -> {// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then// we can retry on a different connection.val requestBody = userResponse.request.bodyif (requestBody != null && requestBody.isOneShot()) {return null}if (exchange == null || !exchange.isCoalescedConnection) {return null}exchange.connection.noCoalescedConnections()return userResponse.request}//其他情况返回nullelse -> return null}}
followUpRequest
函数比较简单如果是正常的就是null,其他情况根据状态判断是否需要重新请求.返回null之后这个拦截器将直接返回响应结果
- (2) BridgeInterceptor
主要是控制添加一些常用的请求头已经cookies信息等,由于比较简单读者可以自行查看下面的源码,或者简单认为就是添加若干响应头和cookies即可
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val userRequest = chain.request()val requestBuilder = userRequest.newBuilder()val body = userRequest.bodyif (body != null) {val contentType = body.contentType()if (contentType != null) {requestBuilder.header("Content-Type", contentType.toString())}val contentLength = body.contentLength()if (contentLength != -1L) {requestBuilder.header("Content-Length", contentLength.toString())requestBuilder.removeHeader("Transfer-Encoding")} else {requestBuilder.header("Transfer-Encoding", "chunked")requestBuilder.removeHeader("Content-Length")}}if (userRequest.header("Host") == null) {requestBuilder.header("Host", userRequest.url.toHostHeader())}if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.var transparentGzip = falseif (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {transparentGzip = truerequestBuilder.header("Accept-Encoding", "gzip")}val cookies = cookieJar.loadForRequest(userRequest.url)if (cookies.isNotEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies))}if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", userAgent)}val networkResponse = chain.proceed(requestBuilder.build())cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)val responseBuilder = networkResponse.newBuilder().request(userRequest)if (transparentGzip &&"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&networkResponse.promisesBody()) {val responseBody = networkResponse.bodyif (responseBody != null) {val gzipSource = GzipSource(responseBody.source())val strippedHeaders = networkResponse.headers.newBuilder().removeAll("Content-Encoding").removeAll("Content-Length").build()responseBuilder.headers(strippedHeaders)val contentType = networkResponse.header("Content-Type")responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))}}return responseBuilder.build()}/** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */private fun cookieHeader(cookies: List<Cookie>): String = buildString {cookies.forEachIndexed { index, cookie ->if (index > 0) append("; ")append(cookie.name).append('=').append(cookie.value)}}
}
- (3) CacheInterceptor
主要是用于缓存一些响应体,方便下次跳过网络请求,缓存会防止到磁盘中,你需要对Http缓存有一定的了解上文已经有过一小节做过总结.
class CacheInterceptor(internal val cache: Cache?) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val call = chain.call()//从磁盘中获取一个缓存,没有就返回null val cacheCandidate = cache?.get(chain.request())//当前时间用于判断磁盘缓存是否有效val now = System.currentTimeMillis()//这里就是判断磁盘的缓存是否有效,其结果通过strategy.networkRequest和strategy.cacheResponse反馈出来//内部具体的判断和本文章的一开始写的储备知识差不多,这里就不在看了val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()val networkRequest = strategy.networkRequestval cacheResponse = strategy.cacheResponsecache?.trackResponse(strategy)val listener = (call as? RealCall)?.eventListener ?: EventListener.NONEif (cacheCandidate != null && cacheResponse == null) {// The cache candidate wasn't applicable. Close it.cacheCandidate.body?.closeQuietly()}// If we're forbidden from using the network and the cache is insufficient, fail.//当两个都为空时表示,我们禁止从网络加载数据当前也无缓存,您可以按照如下方式启用: // Request.Builder().cacheControl( CacheControl.Builder().onlyIfCached().build())// cacheResponse == null 表示缓存//所以这种情况只能返回一个504的相应if (networkRequest == null && cacheResponse == null) {return Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(HTTP_GATEWAY_TIMEOUT)//504的状态码.message("Unsatisfiable Request (only-if-cached)").body(EMPTY_RESPONSE).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build().also {listener.satisfactionFailure(call, it)}}// If we don't need the network, we're done.//运行到这里证明networkRequest==null,cacheResponse!=null//那么证明缓存有效直接返回缓存即可if (networkRequest == null) {return cacheResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse)).build().also {listener.cacheHit(call, it)}}//运行到这.证明networkRequest!=null,cacheResponse!=null//可见缓存存在但是缓存过期了需要进行网络验证缓存是否还有效if (cacheResponse != null) {listener.cacheConditionalHit(call, cacheResponse)} else if (cache != null) {listener.cacheMiss(call)}var networkResponse: Response? = null//请求网络networkResponse = chain.proceed(networkRequest)// If we have a cache response too, then we're doing a conditional get.//此时已经向服务端验证过缓存是否过期,如果过期networkResponse将返回完整的响应体if (cacheResponse != null) {//返回的状态为304,表示当前服务器资源和本地缓存一直,所以这里直接返回缓存即可if (networkResponse?.code == HTTP_NOT_MODIFIED) {//用缓存构建一个响应体准备返回val response = cacheResponse.newBuilder().headers(combine(cacheResponse.headers, networkResponse.headers)).sentRequestAtMillis(networkResponse.sentRequestAtMillis).receivedResponseAtMillis(networkResponse.receivedResponseAtMillis).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build()networkResponse.body!!.close()// Update the cache after combining headers but before stripping the// Content-Encoding header (as performed by initContentStream()).cache!!.trackConditionalCacheHit()//更新缓存cache.update(cacheResponse, response)//返回结果return response.also {listener.cacheHit(call, it)}} else {//表示当前缓存过期了关闭缓存cacheResponse.body?.closeQuietly()}}//运行到这证明缓存已经无效,networkResponse是新的资源val response = networkResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build()//这里会将新的相应放入磁盘缓存中if (cache != null) {if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {// Offer this request to the cache.val cacheRequest = cache.put(response)return cacheWritingResponse(cacheRequest, response).also {if (cacheResponse != null) {// This will log a conditional cache miss only.listener.cacheMiss(call)}}}if (HttpMethod.invalidatesCache(networkRequest.method)) {try {cache.remove(networkRequest)} catch (_: IOException) {// The cache cannot be written.}}}//放入磁盘缓存结束//返回结果return response}}
上面的代码和文章一开始描述http存储知识行为是差不多,如果读者想仔细看判断缓存是否有效可以深入CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
源码查看,其内部源码您可以按照文章前面的总结合并来看.
-
(4) ConnectInterceptor
这个拦截器是比较难理解的,很难全部介绍,内部比较重要职责:
- 路由选择
- 创建socket
- 复用相同目标地址的socket
- socket链接
object ConnectInterceptor : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChain//内部进行了复杂的socket创建或者socketval exchange = realChain.call.initExchange(chain)val connectedChain = realChain.copy(exchange = exchange)return connectedChain.proceed(realChain.request)}
}
一看代码还挺短,但是全部都被封装到了realChain.call.initExchange
中.
class RealCall{internal fun initExchange(chain: RealInterceptorChain): Exchange {//..略val result = Exchange(this, eventListener, exchangeFinder, codec)return result;}
}
后面的源码很深所以直接给出核心步骤
class ExchangeFinder(private val connectionPool: RealConnectionPool,internal val address: Address,private val call: RealCall,private val eventListener: EventListener
){private fun findConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean): RealConnection {//从已经存在的连接池中选中一个socket返回//address就是本次链接的目的地址if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}//如果没有就新建一个路由,内部会寻找dns和代理信息等localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)this.routeSelector = localRouteSelector//从路由选择中获取其中一个路由信息val localRouteSelection = localRouteSelector.next()routeSelection = localRouteSelection//比如说routes其中一个信息: 代理地址:127.0.0.1:8001 address[url:baidu.com,dns,支持的ssl/tls加密套件等 ]routes = localRouteSelection.routes//这个对象内部将会持有待会进行链接的socketval newConnection = RealConnection(connectionPool, route)//让我们的call持有call.connectionToCancel = newConnection//内部创建一个socket然后利用路由信息链接,如果存在代理那么会链接代理socketnewConnection.connect(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,call,eventListener)synchronized(newConnection) {//放入一个链接池中,方便以后复用connectionPool.put(newConnection)}return newConnection}
}
newConnection.connect
函数内部含有利用路由信息创建socket的全部流程,这里我们不要过度关心,只需关注okhttp核心流程.
- (5) CallServerInterceptor
经过第四步我们已经得到了相关socket
,所以我们只需要向socket
写入对应的流即可.
所以CallServerInterceptor
就是将前面的所有的封装对象比如body(响应体),header(响应头)转化字节序列即可写入.
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = realChain.exchange!!val request = realChain.request//请求体val requestBody = request.bodyval sentRequestMillis = System.currentTimeMillis()//将请求头写入socket中exchange.writeRequestHeaders(request)var invokeStartEvent = truevar responseBuilder: Response.Builder? = null//permitsRequestBody函数判断当前不是Get和HEAD请求//因为GET和HEAD是没有请求体的if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {//如果请求头存在Expect: 100-continue,那么服务端必须必须返回一个状态码为100的响应才能继续发送请求体//具体参阅:https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Headers/Expectif ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {//进入if证明存在这个请求头//刷新socket流exchange.flushRequest()//读取服务端响应,如果服务端响应头为100,那么readResponseHeaders会返回null//如果服务端不愿意接受responseBuilder那么响应头不为100此时responseBuilder就不为空//关于服务端如何处理 请参阅权威指南:https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html#sec8.2.3responseBuilder = exchange.readResponseHeaders(expectContinue = true)exchange.responseHeadersStart()invokeStartEvent = false}//responseBuilder == null可以理解为正常post请求,开始写入请求体if (responseBuilder == null) {//当前是否支持 双工传输,一般情况都是false.一般需要客户端和服务端都支持才行.//简单理解就是平常http我们需要发送完整请求头和请求体才能接受响应信息,而双工支持只发送请求头,然后接受响应体和响应头最后在发送请请请求体//如果你公司的服务器都支持建议创建子类开启//具体可看 https://square.github.io/okhttp/4.x/okhttp/okhttp3/-request-body/is-duplex/ if (requestBody.isDuplex()) {exchange.flushRequest()val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()requestBody.writeTo(bufferedRequestBody)} else {//由于不支持双工所以只能把请求体发送了在读取响应// Write the request body if the "Expect: 100-continue" expectation was met.val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()requestBody.writeTo(bufferedRequestBody)//直接刷出请求体bufferedRequestBody.close()}} else {//表示没有请求需要写入exchange.noRequestBody()//isMultiplexed是http2才会返回true,下面的注释比较权威就不在自己描述了if (!exchange.connection.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection// from being reused. Otherwise we're still obligated to transmit the request body to// leave the connection in a consistent state.exchange.noNewExchangesOnConnection()}}} else {exchange.noRequestBody()}if (requestBody == null || !requestBody.isDuplex()) {//标记请求发送完成exchange.finishRequest()}//响应体if (responseBuilder == null) {//读取响应头responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {exchange.responseHeadersStart()invokeStartEvent = false}}//构建响应体var response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()var code = response.code//如果读取到状态码100,虽然没有发送Expect: 100-continue这样请求头(前面已经读取过100的状态码,正常不应该返回这样的)但是为了容错再次读取一次响应信息if (code == 100) {// Server sent a 100-continue even though we did not request one. Try again to read the actual// response status.responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {exchange.responseHeadersStart()}response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()code = response.code} //END --- 容错状态码100响应体 机制完成exchange.responseHeadersEnd(response)response = if (forWebSocket && code == 101) {//这里是升级http到websocket请求,读者有兴趣可以自行了解websocket机制// Connection is upgrading, but we need to ensure interceptors see a non-null response body.response.newBuilder().body(EMPTY_RESPONSE).build()} else {//读取响应体response.newBuilder().body(exchange.openResponseBody(response)).build()}//状态204和205不应该有响应体if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {throw ProtocolException("HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")}return response}
}
如果你对100的状态或者Http双工请求不理解可以先跳过.毕竟前者很少用,后者需要自己实现okhttp请求体去启用这个功能.
这篇关于Okhttp3 设计思想学习的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!