privatefunpromoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() if (runningAsyncCalls.size >= this.maxRequests) break// Max capacity. if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue// Host max capacity. i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 }
for (i in0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) }
/** Used by [AsyncCall.run] to signal completion. */ internalfunfinished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) }
/** Used by [Call.execute] to signal completion. */ internalfunfinished(call: RealCall) { finished(runningSyncCalls, call) }
privatefun<T>finished(calls: Deque<T>, call: T) { val idleCallback: Runnable? synchronized(this) { if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback }
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) { idleCallback.run() } }
if (exchange != null) { check(exchange.finder.sameHostAndPort(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } check(calls == 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } } //注释1 // Call the next interceptor in the chain. val next = copy(index = index + 1, request = request) val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null")
@Throws(IOException::class) overridefunintercept(chain: Interceptor.Chain): Response { val call = chain.call() val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse
// 根据响应头判断是否需要返回 if (!isCacheable(cacheResponse, request)) { return CacheStrategy(request, null) }
val requestCaching = request.cacheControl //如果没有缓存,或者用户没有指定缓存,就直接返回 if (requestCaching.noCache || hasConditions(request)) { return CacheStrategy(request, null) }
val responseCaching = cacheResponse.cacheControl
//获得缓存的响应从创建到现在的时间 val ageMillis = cacheResponseAge() //获取这个响应有效缓存的时长 var freshMillis = computeFreshnessLifetime() //如果请求中指定了 max-age 表示指定了能拿的缓存有效时长,就需要综合响应有效缓存时长与请求能拿缓存的时长,获得最小的能够使用响应缓存的时长 if (requestCaching.maxAgeSeconds != -1) { freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong())) }
var minFreshMillis: Long = 0 //请求认为的缓存有效时间 if (requestCaching.minFreshSeconds != -1) { minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong()) }
//Cache-Control:must-revalidate 可缓存但必须再向源服务器进行确认 //Cache-Control:max-stale 缓存过期后还能使用指定的时长,如果未指定多少秒,则表示无论过期多长时间都可以;如果指定了,则只要是指定时间内就能使用缓存 // 前者会忽略后者,所以判断了不必须向服务器确认,再获得请求头中的max-stale var maxStaleMillis: Long = 0 if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) { maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong()) } //不需要与服务器验证有效性 && 响应存在的时间+请求认为的缓存有效时间 < 缓存有效时长+过期后还可以使用的时间 //可以使用缓存 if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { val builder = cacheResponse.newBuilder() if (ageMillis + minFreshMillis >= freshMillis) { builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"") } val oneDayMillis = 24 * 60 * 60 * 1000L if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"") } return CacheStrategy(null, builder.build()) } val conditionName: String val conditionValue: String? when { etag != null -> { conditionName = "If-None-Match" conditionValue = etag }
val realChain = chain as RealInterceptorChain val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange) return connectedChain.proceed(realChain.request)
// 尝试从连接池中找一个连接,如果找到就返回连接 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
if (call.isCanceled()) throw IOException("Canceled")
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
route = localRouteSelection.next() }
// 进行连接 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) //确定是不是多路复用,如果是就进行连接合并。 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result }
var routeException: RouteException? = null val connectionSpecs = route.address.connectionSpecs val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
if (route.address.sslSocketFactory == null) { if (ConnectionSpec.CLEARTEXT !in connectionSpecs) { throw RouteException(UnknownServiceException( "CLEARTEXT communication not enabled for client")) } val host = route.address.url.host if (!Platform.get().isCleartextTrafficPermitted(host)) { throw RouteException(UnknownServiceException( "CLEARTEXT communication to $host not permitted by network security policy")) } } else { if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) { throw RouteException(UnknownServiceException( "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS")) } }
while (true) { try { //如果需要建立通讯隧道,就建立隧道。也就是通过http代理访问https if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener) if (rawSocket == null) { // We were unable to connect the tunnel but properly closed down our resources. break } } else { //建立socket连接。 connectSocket(connectTimeout, readTimeout, call, eventListener) } establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener) eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol) break } catch (e: IOException) { socket?.closeQuietly() rawSocket?.closeQuietly() socket = null rawSocket = null source = null sink = null handshake = null protocol = null http2Connection = null allocationLimit = 1
overridefunintercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body val sentRequestMillis = System.currentTimeMillis()
exchange.writeRequestHeaders(request)
var invokeStartEvent = true var responseBuilder: Response.Builder? = null //判断请求头是不是POST if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // 如果请求头包含"100-continue"的响应,就等待它完成响应,在执行正文,如果没有得到,返回正常的结果。 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } if (responseBuilder == null) { if (requestBody.isDuplex()) { exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection.isMultiplexed) { exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() }
if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() invokeStartEvent = false } } var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code //如果响应是100,这代表了是请求Expect: 100-continue成功的响应,需要再读取一份响应头 if (code == 100) { responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() } response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code }