OkHttp 原始碼分析(五)——ConnectInterceptor
0、前言
在前面的文章中,我們分析了http的快取策略和Okhttp快取攔截器的快取機制,我們知道,在沒有快取命中的情況下,需要對網路資源進行請求,這時候攔截鏈就來到ConnectInterceptor。
ConnectInterceptor的主要作用是和伺服器建立連線,在連線建立後通過okio獲取通向服務端的輸入流Source和輸出流Sink。
1、原始碼分析
public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); }
ConnectInterceptor攔截器的intercept方法很簡單,首先拿到StreamAllocation物件streamAllocation,這個物件在RetryAndFollowUpInterceptor攔截器中已經初始化好了,這在RetryAndFollowUpInterceptor攔截器分析中提到過。在拿到streamAllocation後,後續的連線建立工作都交給streamAllocation來完成。
ConnectionPool
我們知道http連線的需要三次握手、四次揮手操作,如果每次請求進行握手連線和揮手釋放資源,則會消耗很多不必要的時間。針對這種情況,Http協議提供了keep-alive的請求頭欄位,來保持長連線,以便下次進行請求時不必進行重新握手操作。OkHttp為了方便管理所有連線,採用連線池ConnectionPool。
ConnectionPool的主要功能就是為了降低由於頻繁建立連線導致的網路延遲。它實現了複用連線的策略。ConnectionPool雙端佇列Deque<RealConnection>
來儲存它所管理的所有RealConnection
private final Deque<RealConnection> connections = new ArrayDeque<>();
ConnectionPool會對連線池中的最大空閒連線以及連線的保活時間進行控制,maxIdleConnections和keepAliveDurationNs成員分別體現對最大空閒連線數及連線保活時間的控制。ConnectionPool的初始化由OkhttpClient的Builder類完成,預設最大空閒連線數為5、保活時間5分鐘。此外,我們也可以初始化OkHttpClient自定義ConnectionPool。ConnectionPool有提供put、get、evictAll等操作,但對連線池的操作,是通過Internal.instance進行的。
public abstract class Internal {
public static void initializeInstanceForTests() {
// Needed in tests to ensure that the instance is actually pointing to something.
new OkHttpClient();
}
public static Internal instance;
...//
}
StreamAllocation
StreamAllocation是用來建立執行HTTP請求所需網路設施的元件,如其名字所顯示的那樣,分配Stream。StreamAllocation的建構函式如下:
public StreamAllocation(ConnectionPool connectionPool, Address address, Call call,
EventListener eventListener, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.call = call;
this.eventListener = eventListener;
this.routeSelector = new RouteSelector(address, routeDatabase(), call, eventListener);
this.callStackTrace = callStackTrace;
}
回到上面的程式碼StreamAllocation.newStream方法:
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
這個方法程式碼也不多,主要通過findHealthyConnection去查詢是否有可服用的連線,有則複用,無則返回一個新建的連線:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
//死迴圈獲取一個連線
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
//連線池同步獲取,上面找到的連線是否是一個新的連線,如果是的話,就直接返回了,就是我們需要找
// 的連線了
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
//如果不是一個新的連線,判斷是否一個健康的連線。
//不健康的RealConnection條件為如下幾種情況:
//RealConnection物件 socket沒有關閉
//socket的輸入流沒有關閉
//socket的輸出流沒有關閉
//http2時連線沒有關閉
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
繼續看findConnection方法:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
findConnection這個方法程式碼比較長,大致流程如下:
判斷streamAllocation物件內部是否有可複用的connection物件;
如果streamAllocation物件無可用的connection物件,則通過Internal.instance從連線池中查詢可用的connection物件;
如果連線池中仍未找到,則遍歷所有路由路徑,嘗試再次從ConnectionPool中尋找可複用的連線;
前面三步都未找到,則新建一個連線,進行TCP + TLS握手
將新建的連線放入連線池中,並返回結果
上面關鍵的步驟在於通過Internal.instance的get方法在連線池中查詢可複用的連線,上面提到了連線池的操作是通過Internal.instance進行的,Internal.instance的get方法最終呼叫的是ConnectionPool的get方法:
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
ConnectionPool的get方法遍歷connections佇列,通過isEligible檢測連線是否可複用,可複用則通過streamAllocation的acquire方法繫結:
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
this.reportedAcquired = reportedAcquired;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
isEligible方法判斷連線是否可用:
public boolean isEligible(Address address, @Nullable Route route) {
// 如果這個連線不接受新的流,返回false.
if (allocations.size() >= allocationLimit || noNewStreams) return false;
//判斷host是否匹配
// If the non-host fields of the address don't overlap, we're done.
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
//上面條件不滿足,判斷是否http2
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false;
// 2. 路由必須共享一個IP地址。這要求我們為兩個主機都有DNS地址,這隻發生在路由規劃之後。我們
//不能合併使用代理的連線,因為代理沒有告訴我們源伺服器的IP地址。
if (route == null) return false;
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
if (!this.route.socketAddress().equals(route.socketAddress())) return false;
// 3. 此連線的伺服器證書必須覆蓋新主機
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. Certificate pinning must match the host.
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
新建流(HttpCodec)
回到StreamAllocation的newStream方法,在獲取到Connection物件後,接下來就是新建流,即HttpCodec物件。HttpCodec物件是封裝了底層IO的可以直接用來收發資料的元件(依賴okio庫),它會將請求的資料序列化之後傳送到網路,並將接收的資料反序列化為應用程式方便操作的格式。
public interface HttpCodec {
/**
* The timeout to use while discarding a stream of input data. Since this is used for connection
* reuse, this timeout should be significantly less than the time it takes to establish a new
* connection.
*/
int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
/** Returns an output stream where the request body can be streamed. */
Sink createRequestBody(Request request, long contentLength);
/** This should update the HTTP engine's sentRequestMillis field. */
void writeRequestHeaders(Request request) throws IOException;
/** Flush the request to the underlying socket. */
void flushRequest() throws IOException;
/** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
void finishRequest() throws IOException;
/**
* Parses bytes of a response header from an HTTP transport.
*
* @param expectContinue true to return null if this is an intermediate response with a "100"
* response code. Otherwise this method never returns null.
*/
Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
/** Returns a stream that reads the response body. */
ResponseBody openResponseBody(Response response) throws IOException;
/**
* Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
* That may happen later by the connection pool thread.
*/
void cancel();
}
HttpCodec作用:
建立請求體,以用於傳送請求體資料。
寫入請求頭
結束請求傳送
讀取響應頭部。
開啟請求體,以用於後續獲取請求體資料。
取消請求執行
拿到HttpCodec物件後,回到攔截器處,將