/** * A concrete interceptor chain that carries the entire interceptor chain: all application * interceptors, the OkHttp core, all network interceptors, and finally the network caller. */ publicfinalclassRealInterceptorChainimplementsInterceptor.Chain{...}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection)throws IOException { if (index >= interceptors.size()) thrownew AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); }
// If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); }
// Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); //这里调用的就是特定过滤器实现Interceptor接口的自身的intercept方法
// Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { thrownew IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); }
// Confirm that the intercepted response isn't null. if (response == null) { thrownew NullPointerException("interceptor " + interceptor + " returned null"); }
if (response.body() == null) { thrownew IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); }
int followUpCount = 0; Response priorResponse = null; while (true) { if (canceled) { //请求已取消 streamAllocation.release(); thrownew IOException("Canceled"); }
Response response; boolean releaseConnection = true; try { //执行请求 response = realChain.proceed(request, streamAllocation, null, null); releaseConnection = false; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.getLastConnectException(), streamAllocation, false, request)) { // 判断能否从前一个请求恢复,不行就抛出异常 throw e.getLastConnectException(); } releaseConnection = false; //重试 continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. // 先判断当前请求是否已经发送了 boolean requestSendStarted = !(e instanceof ConnectionShutdownException); // 同样的重试判断 if (!recover(e, streamAllocation, requestSendStarted, request)) throw e; releaseConnection = false; continue; //重试 } finally { // We're throwing an unchecked exception. Release any resources. // 没有捕获到的异常,最终要释放 if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } }
// Attach the prior response if it exists. Such responses never have a body. //priorResponse 是用来保存前一个 Response 的,这里可以看到将前一个Response和当前Response结合到一起了, //对应的场景是,当获得Response后,发现需要重定向,则将当前Response设置给priorResponse,再执行一遍流程, //直到不需要重定向了,则将priorResponse和Response结合起来。 if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); }
privatebooleanrecover(IOException e, boolean requestSendStarted, Request userRequest){ streamAllocation.streamFailed(e);
// The application layer has forbidden retries. // 如果OkHttpClient直接配置拒绝失败重连 // 默认创建的OkHttpClient的retryOnConnectionFailure属性是true if (!client.retryOnConnectionFailure()) returnfalse;
// We can't send the request body again. //如果请求已经发送,并且这个请求体是一个UnrepeatableRequestBody类型,则不能重试 //StreamedRequestBody实现了UnrepeatableRequestBody接口,是个流类型,不会被缓存,所以只能执行一次 if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) returnfalse;
// This exception is fatal. //一些严重问题就不要重试了 if (!isRecoverable(e, requestSendStarted)) returnfalse;
// No more routes to attempt. //没有更多的路由就不要重试了 if (!streamAllocation.hasMoreRoutes()) returnfalse;
// For failure recovery, use the same route selector with a new connection. returntrue; }
/** * This request body streams bytes from an application thread to an OkHttp dispatcher thread via a * pipe. Because the data is not buffered it can only be transmitted once. */ finalclassStreamedRequestBodyextendsOutputStreamRequestBodyimplementsUnrepeatableRequestBody{}
privatebooleanisRecoverable(IOException e, boolean requestSendStarted){ // If there was a protocol problem, don't recover. if (e instanceof ProtocolException) {//如果是协议问题 returnfalse; }
// If there was an interruption don't recover, but if there was a timeout connecting to a route // we should try the next route (if there is one). if (e instanceof InterruptedIOException) {// 中断 //超时问题并且请求还没有被发送,可以重试 //其他就不要重试了 return e instanceof SocketTimeoutException && !requestSendStarted; }
// Look for known client-side or negotiation errors that are unlikely to be fixed by trying // again with a different route. if (e instanceof SSLHandshakeException) { // If the problem was a CertificateException from the X509TrustManager, // do not retry. //证书或安全原因。就不要重试 if (e.getCause() instanceof CertificateException) { returnfalse; } } if (e instanceof SSLPeerUnverifiedException) { // e.g. a certificate pinning error. returnfalse; }
// An example of one we might want to retry with a different route is a problem connecting to a // proxy and would manifest as a standard IOException. Unless it is one we know we should not // retry, we return true and try a new route. returntrue; }
/** * Returns true if there's another route to attempt. Every address has at least one route. */ publicbooleanhasNext(){ return hasNextInetSocketAddress() || hasNextProxy() || hasNextPostponed(); }
这个判断表明:没有更多的可以使用的路由,则不要重试了(第四种拒绝重连的方式)。这里大概说明一下routeSelection 是用 List 保存的。
1 2 3 4 5 6 7 8 9
catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.getLastConnectException(), false, request)) { throw e.getLastConnectException(); } releaseConnection = false; //重试。。。 continue; }
catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. //先判断当前请求是否已经发送了 boolean requestSendStarted = !(e instanceof ConnectionShutdownException); //同样的重试判断 if (!recover(e, requestSendStarted, request)) throw e; releaseConnection = false; //重试。。。 continue; }
finally { // We're throwing an unchecked exception. Release any resources. if (releaseConnection) { //没有捕获到异常,最终要释放。 streamAllocation.streamFailed(null); streamAllocation.release(); } }
1 2 3 4 5 6 7 8
// Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); }
Request followUp = followUpRequest(response); //=========================followUpRequest()============================== /** * Figures out the HTTP request to make in response to receiving {@code userResponse}. This will * either add authentication headers, follow redirects or handle a client request timeout. If a * follow-up is either unnecessary or not applicable, this returns null. */ private Request followUpRequest(Response userResponse)throws IOException { if (userResponse == null) thrownew IllegalStateException(); Connection connection = streamAllocation.connection(); Route route = connection != null ? connection.route() : null; int responseCode = userResponse.code();
final String method = userResponse.request().method(); switch (responseCode) { case HTTP_PROXY_AUTH: Proxy selectedProxy = route != null ? route.proxy() : client.proxy(); if (selectedProxy.type() != Proxy.Type.HTTP) { thrownew ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy"); } return client.proxyAuthenticator().authenticate(route, userResponse);
case HTTP_UNAUTHORIZED: return client.authenticator().authenticate(route, userResponse);
case HTTP_PERM_REDIRECT: case HTTP_TEMP_REDIRECT: // "If the 307 or 308 status code is received in response to a request other than GET // or HEAD, the user agent MUST NOT automatically redirect the request" if (!method.equals("GET") && !method.equals("HEAD")) { returnnull; } // fall-through case HTTP_MULT_CHOICE: case HTTP_MOVED_PERM: case HTTP_MOVED_TEMP: case HTTP_SEE_OTHER: // Does the client allow redirects? if (!client.followRedirects()) returnnull;
// Don't follow redirects to unsupported protocols. if (url == null) returnnull;
// If configured, don't follow redirects between SSL and non-SSL. boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme()); if (!sameScheme && !client.followSslRedirects()) returnnull;
// Most redirects don't include a request body. Request.Builder requestBuilder = userResponse.request().newBuilder(); if (HttpMethod.permitsRequestBody(method)) { finalboolean maintainBody = HttpMethod.redirectsWithBody(method); if (HttpMethod.redirectsToGet(method)) { requestBuilder.method("GET", null); } else { RequestBody requestBody = maintainBody ? userResponse.request().body() : null; requestBuilder.method(method, requestBody); } if (!maintainBody) { requestBuilder.removeHeader("Transfer-Encoding"); requestBuilder.removeHeader("Content-Length"); requestBuilder.removeHeader("Content-Type"); } }
// When redirecting across hosts, drop all authentication headers. This // is potentially annoying to the application layer since they have no // way to retain them. if (!sameConnection(userResponse, url)) { requestBuilder.removeHeader("Authorization"); } //重新构造了一个 Request return requestBuilder.url(url).build();
case HTTP_CLIENT_TIMEOUT: // 408's are rare in practice, but some servers like HAProxy use this response code. The // spec says that we may repeat the request without modifications. Modern browsers also // repeat the request (even non-idempotent ones.) if (userResponse.request().body() instanceof UnrepeatableRequestBody) { returnnull; }
if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(followUp.url()), callStackTrace); } elseif (streamAllocation.codec() != null) { thrownew IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); }
long now = System.currentTimeMillis(); //根据response,time,request创建一个缓存策略,用于判断怎样是使用缓存 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse;
if (cache != null) { cache.trackResponse(strategy); }
if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. }
// If we're forbidden from using the network and the cache is insufficient, fail. //如果缓存策略中禁止使用网络,并且缓存又为空,则构建一个response直接返回,注意返回码为504 if (networkRequest == null && cacheResponse == null) { returnnew Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); }
// If we don't need the network, we're done. //如果不使用网络,但是又缓存,直接返回缓存 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); }
Response networkResponse = null; try { 直接走后续过滤器 networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } }
// If we have a cache response too, then we're doing a conditional get. //当缓存响应和网络响应同时存在的时候,选择用哪个 if (cacheResponse != null) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { //如果返回码是304,客户端有缓冲的文档并发出了一个条件性的请求 //(一般是提供If-Modified-Since头表示用户只想到指定日期更新文档) //服务器告诉客户,原来缓冲的文档还可以继续使用 //则使用缓存的响应 Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close();
// Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } }
/** * Remove any cache entries for the supplied {@code request}. This is invoked when the client * invalidates the cache, such as when making POST requests. */ voidremove(Request request)throws IOException;
/** * Handles a conditional request hit by updating the stored cache response with the headers from * {@code network}. The cached response body is not updated. If the stored response has changed * since {@code cached} was returned, this does nothing. */ voidupdate(Response cached, Response network);
/** Track an conditional GET that was satisfied by this cache. */ voidtrackConditionalCacheHit();
/** Track an HTTP response being satisfied with {@code cacheStrategy}. */ voidtrackResponse(CacheStrategy cacheStrategy); }
/** * Returns a snapshot of the entry named {@code key}, or null if it doesn't exist is not currently * readable. If a value is returned, it is moved to the head of the LRU queue. */ publicsynchronized Snapshot get(String key)throws IOException { initialize();//总结来说就是对journalFile文件的操作,有则删除无用冗余的信息,构建新文件,没有则new一个新的
if (initialized) { return; // Already initialized. }
// If a bkp file exists, use it instead. if (fileSystem.exists(journalFileBackup)) { // If journal file also exists just delete backup file. if (fileSystem.exists(journalFile)) { fileSystem.delete(journalFileBackup); } else { fileSystem.rename(journalFileBackup, journalFile); } }//经过上述判断后的结果只有两种:1.什么都没有;2.有journalFile文件
// Prefer to pick up where we left off. if (fileSystem.exists(journalFile)) {//journalFile文件动作 try { readJournal(); processJournal(); initialized = true; return; } catch (IOException journalIsCorrupt) { Platform.get().log(WARN, "DiskLruCache " + directory + " is corrupt: " + journalIsCorrupt.getMessage() + ", removing", journalIsCorrupt); }
// The cache is corrupted, attempt to delete the contents of the directory. This can throw and // we'll let that propagate out as it likely means there is a severe filesystem problem. try { delete();//到了这里表示有缓存损坏导致异常,则删除缓存目录下所有文件 } finally { closed = false; } }
// If we ended on a truncated line, rebuild the journal before appending to it. if (!source.exhausted()) {//表示是否还多余字节。如果没有多余字节,返回true,有多余字节返回false rebuildJournal(); } else { journalWriter = newJournalWriter();//获取这个文件的Sink,以便Writer } } finally { Util.closeQuietly(source); } }
/** Lengths of this entry's files. */ finallong[] lengths; final File[] cleanFiles;//用于保存持久数据,作用是读取,最后格式:key.0 final File[] dirtyFiles;//用于保存编辑的临时数据,作用是写,最后的格式:ley.0.tmp ... }
//日志中操作的记录数=总行数-lruEntries中实际add的行数 redundantOpCount = lineCount - lruEntries.size(); //source.exhausted()表示是否还多余字节,如果没有多余字节,返回true,有多余字节返回false // If we ended on a truncated line, rebuild the journal before appending to it. if (!source.exhausted()) { //如果有多余的字节,则重新构建下journal文件 rebuildJournal(); } else { //获取这个文件的Sink,以便Writer journalWriter = newJournalWriter(); }
/** * Creates a new journal that omits redundant information. This replaces the current journal if it * exists. */ synchronizedvoidrebuildJournal()throws IOException { if (journalWriter != null) { journalWriter.close(); }
// Prefer to pick up where we left off. if (fileSystem.exists(journalFile)) { try { readJournal(); processJournal(); initialized = true; return; } catch (IOException journalIsCorrupt) { Platform.get().log(WARN, "DiskLruCache " + directory + " is corrupt: " + journalIsCorrupt.getMessage() + ", removing", journalIsCorrupt); }
// The cache is corrupted, attempt to delete the contents of the directory. This can throw and // we'll let that propagate out as it likely means there is a severe filesystem problem. try { delete(); } finally { closed = false; } }
/** * Computes the initial size and collects garbage as a part of opening the cache. Dirty entries * are assumed to be inconsistent and will be deleted. */ privatevoidprocessJournal()throws IOException { fileSystem.delete(journalFileTmp);//删除journalFileTmp文件 for (Iterator<Entry> i = lruEntries.values().iterator(); i.hasNext(); ) { Entry entry = i.next(); if (entry.currentEditor == null) {//表明数据是 CLEAN,循环记录 size for (int t = 0; t < valueCount; t++) { size += entry.lengths[t]; } } else {//表明数据时DIRTY,删除 entry.currentEditor = null; for (int t = 0; t < valueCount; t++) { fileSystem.delete(entry.cleanFiles[t]); fileSystem.delete(entry.dirtyFiles[t]); } i.remove(); } } }
... // The cache is corrupted, attempt to delete the contents of the directory. This can throw and // we'll let that propagate out as it likely means there is a severe filesystem problem. try { delete(); } finally { closed = false; } }
/** * Returns a snapshot of the entry named {@code key}, or null if it doesn't exist is not currently * readable. If a value is returned, it is moved to the head of the LRU queue. */ publicsynchronized Snapshot get(String key)throws IOException { initialize();
Snapshot snapshot = entry.snapshot(); if (snapshot == null) returnnull;
redundantOpCount++; journalWriter.writeUtf8(READ).writeByte(' ').writeUtf8(key).writeByte('\n'); if (journalRebuildRequired()) { executor.execute(cleanupRunnable); }
return snapshot; }
到这initialized()总算分析完,接下来回到get()。总结下get()方法的主要操作:
初始化日志文件和lruEntries
检查保证 key 正确后获取缓存中保存的 Entry
操作计数器 +1
往日志文件中写入这次的 READ 操作
根据redundantOpCount判断是否需要清理日志信息
需要则开启线程清理
不需要则返回缓存
1 2 3 4 5 6 7 8 9
/** * We only rebuild the journal when it will halve the size of the journal and eliminate at least * 2000 ops. */ boolean journalRebuildRequired() { final int redundantOpCompactThreshold = 2000; return redundantOpCount >= redundantOpCompactThreshold && redundantOpCount >= lruEntries.size(); }
Snapshot snapshot(){ if (!Thread.holdsLock(DiskLruCache.this)) thrownew AssertionError();
Source[] sources = new Source[valueCount]; long[] lengths = this.lengths.clone(); // Defensive copy since these can be zeroed out. try { for (int i = 0; i < valueCount; i++) { //可以看到这里其实就是将cleanFiles传给了sources sources[i] = fileSystem.source(cleanFiles[i]); } returnnew Snapshot(key, sequenceNumber, sources, lengths); } catch (FileNotFoundException e) { // A file must have been deleted manually! for (int i = 0; i < valueCount; i++) { if (sources[i] != null) { Util.closeQuietly(sources[i]); } else { break; } } // Since the entry is no longer valid, remove it so the metadata is accurate (i.e. the cache // size.) try { removeEntry(this); } catch (IOException ignored) { } returnnull; } }
long now = System.currentTimeMillis(); //根据response,time,request创建一个缓存策略,用于判断怎样使用缓存 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse;
if (cache != null) { cache.trackResponse(strategy); }
if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. }
// If we're forbidden from using the network and the cache is insufficient, fail. //如果缓存策略中禁止使用网络,并且缓存又为空,则构建一个Resposne直接返回,注意返回码=504 if (networkRequest == null && cacheResponse == null) { returnnew Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); }
// If we don't need the network, we're done. //不使用网络,但是又缓存,直接返回缓存 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); }
Response networkResponse = null; try { //直接走后续过滤器 networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } }
// If we have a cache response too, then we're doing a conditional get. //当缓存响应和网络响应同时存在的时候,选择用哪个 if (cacheResponse != null) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { //如果返回码是304,客户端有缓冲的文档并发出了一个条件性的请求(一般是提供If-Modified-Since头表示客户 // 只想比指定日期更新的文档)。服务器告诉客户,原来缓冲的文档还可以继续使用。 //则使用缓存的响应 Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close();
// Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } //使用网络响应 Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); //所以默认创建的OkHttpClient是没有缓存的 if (cache != null) { //将响应缓存 if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. //缓存Resposne的Header信息 CacheRequest cacheRequest = cache.put(response); //缓存body return cacheWritingResponse(cacheRequest, response); } //只能缓存GET....不然移除request if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } }
if (HttpMethod.invalidatesCache(response.request().method())) { //Okhttp只能缓存 GET 请求! try { remove(response.request()); } catch (IOException ignored) { // The cache cannot be written. } returnnull; } if (!requestMethod.equals("GET")) { //Okhttp只能缓存 GET 请求! // Don't cache non-GET responses. We're technically allowed to cache // HEAD requests and some POST requests, but the complexity of doing // so is high and the benefit is low. returnnull; }
if (HttpHeaders.hasVaryAll(response)) { returnnull; }
private Response cacheWritingResponse(final CacheRequest cacheRequest, Response response) throws IOException { // Some apps return a null body; for compatibility we treat that like a null cache request. if (cacheRequest == null) return response; Sink cacheBodyUnbuffered = cacheRequest.body(); if (cacheBodyUnbuffered == null) return response; //获得body final BufferedSource source = response.body().source(); final BufferedSink cacheBody = Okio.buffer(cacheBodyUnbuffered);
Source cacheWritingSource = new Source() { boolean cacheRequestClosed;
@Overridepubliclongread(Buffer sink, long byteCount)throws IOException { long bytesRead; try { bytesRead = source.read(sink, byteCount); } catch (IOException e) { if (!cacheRequestClosed) { cacheRequestClosed = true; cacheRequest.abort(); // Failed to write a complete cache response. } throw e; }
if (bytesRead == -1) { if (!cacheRequestClosed) { cacheRequestClosed = true; cacheBody.close(); // The cache response is complete! } return -1; } //读的时候会将body写入 sink.copyTo(cacheBody.buffer(), sink.size() - bytesRead, bytesRead); cacheBody.emitCompleteSegments(); return bytesRead; }