1. 程式人生 > >Okhttp檔案上傳原始碼分析

Okhttp檔案上傳原始碼分析

     一直搞開發,也使用各種框架,但是基本上也從來沒研究過這些框架的底層是如何實現的。像我們客戶端,一般重要的事情就是:網路請求、圖片處理、操作資料庫、展現View介面,剛好今天哥們有問我一個okhttp網路請求的問題,幫他看完了之後,就詳細的走了一遍okhttp框架實現檔案上傳的基本流程,也算是對網路請求這塊的東西學習一下。

     先來看一下我按照思路整理出來的詳細的流程圖。說起流程圖,大家還不能單獨看,必須要結合原始碼一起,流程圖時刻可以幫助我們找到當前的點,但是沒有原始碼,光看流程圖,你根本不知道這是幹什麼的,所以大家參考的時候,還是需要對照原始碼一起分析,哦,還需要說一下,我當前使用的是okhttp-3.2.0.jar的包,可能不同的版本,細節實現會有一些差別。

     流程圖當中幾個重要的節點我都要紅色標註出來了。我要在客戶端測試的程式碼非常簡單,因為程式碼片現在越來越多了,這裡就不貼程式碼了,直接上一張圖,因為程式碼很少,所以大家看著也方便。


     按照上圖的流程,我們把整個檔案上傳的過程三為三個小節來分析:1、RealCall.getResponse之前的邏輯;2、RealCall類的getResponse方法中呼叫HttpEngine.sendRequest傳送請求;3、RealCall類的getResponse方法中呼叫HttpEngine.readResponse讀取響應。

     1、RealCall.getResponse之前的邏輯

     這一步中的邏輯都比較簡單,就是一些資料封裝類的操作,首先獲取到我們要上傳的檔案,以我們的目標檔案為引數建立一個RequestBody物件,我們來看一下RequestBody類的create(MediaType.parse(TYPE), file)方法的實現:

    public static RequestBody create(final MediaType contentType, final File file) {
        if(file == null) {
            throw new NullPointerException("content == null");
        } else {
            return new RequestBody() {
                public MediaType contentType() {
                    return contentType;
                }

                public long contentLength() {
                    return file.length();
                }

                public void writeTo(BufferedSink sink) throws IOException {
                    Source source = null;

                    try {
                        source = Okio.source(file);
                        sink.writeAll(source);
                    } finally {
                        Util.closeQuietly(source);
                    }

                }
            };
        }
    }
     這個方法的實現非常簡單,就是直接構造了一個RequestBody物件返回了,需要給大家說的就是writeTo方法,從這個方法邏輯中也可以看出,我們上傳檔案時候,就是在這裡把檔案轉換成Source物件,然後通過BufferedSink寫入流,最後傳送出去的。接著後面的兩句建立requestBody、requestPostFile我們就不展開了,就是根據我們的資料建立一個請求物件。我們繼續來看最後一句程式碼,client.newCall(requestPostFile)這一句就是建立一個RealCall物件,傳入的引數requestPostFile當中儲存了我們要上傳的檔案資料索引,我們繼續來看RealCall類的enqueue方法:
    public void enqueue(Callback responseCallback) {
        this.enqueue(responseCallback, false);
    }

    void enqueue(Callback responseCallback, boolean forWebSocket) {
        synchronized(this) {
            if(this.executed) {
                throw new IllegalStateException("Already Executed");
            }

            this.executed = true;
        }

        this.client.dispatcher().enqueue(new RealCall.AsyncCall(responseCallback, forWebSocket));
    }
     這裡的實現也比較簡單,就是判斷當前的Call物件是否已執行,一個Call物件不能重複執行,這點和我們的執行緒物件一樣。然後以傳進來的Callback物件構造一個RealCall.AsyncCall,這裡的forWebSocket是上面設定的,為false,同時獲取當前client的分發器,並將構造好的RealCall.AsyncCall交給分發器執行。Dispatcher分發器當中使用的是一個執行緒池來處理我們的請求的,在它的enqueue方法中就直接執行我們的請求,RealCall.AsyncCall是繼承NamedRunnable,而NamedRunnable又實現了Runnable,所以當任務被處理時,就會回撥RealCall.AsyncCall類的execute方法,我們來看一下它的實現:
        protected void execute() {
            boolean signalledCallback = false;

            try {
                Response e = RealCall.this.getResponseWithInterceptorChain(this.forWebSocket);
                if(RealCall.this.canceled) {
                    signalledCallback = true;
                    this.responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
                } else {
                    signalledCallback = true;
                    this.responseCallback.onResponse(RealCall.this, e);
                }
            } catch (IOException var6) {
                if(signalledCallback) {
                    Internal.logger.log(Level.INFO, "Callback failure for " + RealCall.this.toLoggableString(), var6);
                } else {
                    this.responseCallback.onFailure(RealCall.this, var6);
                }
            } finally {
                RealCall.this.client.dispatcher().finished(this);
            }

        }

     看一下這個方法的程式碼,再對比一下我們的流程圖,就可以看到,最終回撥到我們應用層就是在這裡的。而返回給我們的Response物件就是通過getResponseWithInterceptorChain獲取到的。當我們的請求因異常而失敗時,okhttp框架中都會將其取消,所以最終的結果就根據RealCall.this.canceled標誌位來判斷,如果取消了,那麼肯定是請求失敗了,否則請求就是成功的,最後請求完成後,將當前的call從分發器中移除。好,對照著流程圖,接下來我們看一下RealCall類的getResponseWithInterceptorChain方法的實現,它當中就是直接呼叫ApplicationInterceptorChain內部類物件,然後直接呼叫它的proceed方法。構造它時傳進來的第一個引數index為0,而我們當前的實現中,client中的攔截器數量也是0,所以就直接呼叫RealCall.this.getResponse進行處理。這個方法也是比較核心的方法了,從我們的流程圖就可以看出來,在它當中呼叫了我們下面要講的兩具節點方法:sendRequest()、readResponse(),傳送請求,讀取響應,讀取響應完成後,會將結果儲存在HttpEngine類的成員變數userResponse當中,最後將它通過回撥返回給應用層。我們來看一下getResponse方法的實現:

    Response getResponse(Request request, boolean forWebSocket) throws IOException {
        RequestBody body = request.body();
        if(body != null) {
            Builder followUpCount = request.newBuilder();
            MediaType releaseConnection = body.contentType();
            if(releaseConnection != null) {
                followUpCount.header("Content-Type", releaseConnection.toString());
            }

            long response = body.contentLength();
            if(response != -1L) {
                followUpCount.header("Content-Length", Long.toString(response));
                followUpCount.removeHeader("Transfer-Encoding");
            } else {
                followUpCount.header("Transfer-Encoding", "chunked");
                followUpCount.removeHeader("Content-Length");
            }

            request = followUpCount.build();
        }

        this.engine = new HttpEngine(this.client, request, false, false, forWebSocket, (StreamAllocation)null, (RetryableSink)null, (Response)null);
        int var20 = 0;

        while(!this.canceled) {
            boolean var21 = true;
            boolean var15 = false;

            StreamAllocation streamAllocation;
            label173: {
                label172: {
                    try {
                        HttpEngine followUp;
                        try {
                            var15 = true;
                            this.engine.sendRequest();
                            this.engine.readResponse();
                            var21 = false;
                            var15 = false;
                            break label173;
                        } catch (RequestException var16) {
                            throw var16.getCause();
                        } catch (RouteException var17) {
                            followUp = this.engine.recover(var17.getLastConnectException(), (Sink)null);
                            if(followUp == null) {
                                throw var17.getLastConnectException();
                            }
                        } catch (IOException var18) {
                            followUp = this.engine.recover(var18, (Sink)null);
                            if(followUp != null) {
                                var21 = false;
                                this.engine = followUp;
                                var15 = false;
                                break label172;
                            }

                            throw var18;
                        }

                        var21 = false;
                        this.engine = followUp;
                        var15 = false;
                    } finally {
                        if(var15) {
                            if(var21) {
                                StreamAllocation streamAllocation1 = this.engine.close();
                                streamAllocation1.release();
                            }

                        }
                    }

                    if(var21) {
                        streamAllocation = this.engine.close();
                        streamAllocation.release();
                    }
                    continue;
                }

                if(var21) {
                    streamAllocation = this.engine.close();
                    streamAllocation.release();
                }
                continue;
            }

            if(var21) {
                StreamAllocation var22 = this.engine.close();
                var22.release();
            }

            Response var23 = this.engine.getResponse();
            Request var24 = this.engine.followUpRequest();
            if(var24 == null) {
                if(!forWebSocket) {
                    this.engine.releaseStreamAllocation();
                }

                return var23;
            }

            streamAllocation = this.engine.close();
            ++var20;
            if(var20 > 20) {
                streamAllocation.release();
                throw new ProtocolException("Too many follow-up requests: " + var20);
            }

            if(!this.engine.sameConnection(var24.url())) {
                streamAllocation.release();
                streamAllocation = null;
            }

            this.engine = new HttpEngine(this.client, var24, false, false, forWebSocket, streamAllocation, (RetryableSink)null, var23);
        }

        this.engine.releaseStreamAllocation();
        throw new IOException("Canceled");
    }
     先構造一個HttpEngine物件,然後以!this.canceled為條件進行迴圈,while迴圈中的邏輯採用了java label的標籤寫法,應該和C++中的goto是一樣的道理,從實際開發中來看,label標籤已經很少採用了,所以一般也不建議大家使用這樣的寫法,這樣的程式碼易讀性不好。這裡主要就是第一個try程式碼塊中處理請求,沒有異常就直接跳出label173,然後呼叫this.engine.followUpRequest()取出下一個請求,再以它的返回值為引數構造一個新的HttpEngine,繼續在while迴圈中處理。當然如果followUpRequest取回來的為空,那也就是說只有一個請求,並且已經處理完了,那就直接返回當前的結果var23了。這個方法當中的一些變數命名也不是很好,根本看不出來是啥意思,不知道開發者是怎麼想的。這個方法分析完了,就要進入主要的邏輯了。

     2、RealCall類的getResponse方法中呼叫HttpEngine.sendRequest傳送請求

     再來對照流程圖,從中也可以看到,這一步的任務就是傳送訊息頭,我們來看一下這個方法的實現:

    public void sendRequest() throws RequestException, RouteException, IOException {
        if(this.cacheStrategy == null) {
            if(this.httpStream != null) {
                throw new IllegalStateException();
            } else {
                Request request = this.networkRequest(this.userRequest);
                InternalCache responseCache = Internal.instance.internalCache(this.client);
                Response cacheCandidate = responseCache != null?responseCache.get(request):null;
                long now = System.currentTimeMillis();
                this.cacheStrategy = (new Factory(now, request, cacheCandidate)).get();
                this.networkRequest = this.cacheStrategy.networkRequest;
                this.cacheResponse = this.cacheStrategy.cacheResponse;
                if(responseCache != null) {
                    responseCache.trackResponse(this.cacheStrategy);
                }

                if(cacheCandidate != null && this.cacheResponse == null) {
                    Util.closeQuietly(cacheCandidate.body());
                }

                if(this.networkRequest == null && this.cacheResponse == null) {
                    this.userResponse = (new Builder()).request(this.userRequest).priorResponse(stripBody(this.priorResponse)).protocol(Protocol.HTTP_1_1).code(504).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_BODY).build();
                } else if(this.networkRequest == null) {
                    this.userResponse = this.cacheResponse.newBuilder().request(this.userRequest).priorResponse(stripBody(this.priorResponse)).cacheResponse(stripBody(this.cacheResponse)).build();
                    this.userResponse = this.unzip(this.userResponse);
                } else {
                    boolean success = false;

                    try {
                        this.httpStream = this.connect();
                        this.httpStream.setHttpEngine(this);
                        if(this.writeRequestHeadersEagerly()) {
                            long contentLength = OkHeaders.contentLength(request);
                            if(this.bufferRequestBody) {
                                if(contentLength > 2147483647L) {
                                    throw new IllegalStateException("Use setFixedLengthStreamingMode() or setChunkedStreamingMode() for requests larger than 2 GiB.");
                                }

                                if(contentLength != -1L) {
                                    this.httpStream.writeRequestHeaders(this.networkRequest);
                                    this.requestBodyOut = new RetryableSink((int)contentLength);
                                } else {
                                    this.requestBodyOut = new RetryableSink();
                                }
                            } else {
                                this.httpStream.writeRequestHeaders(this.networkRequest);
                                this.requestBodyOut = this.httpStream.createRequestBody(this.networkRequest, contentLength);
                            }
                        }

                        success = true;
                    } finally {
                        if(!success && cacheCandidate != null) {
                            Util.closeQuietly(cacheCandidate.body());
                        }

                    }

                }
            }
        }
    }
     在這個方法中,networkRequest不為空,所以就進入最後一個else分支進行處理,第一句this.httpStream = this.connect()就是和服務端取得連線的過程,返回一個HttpStream物件,這個過程非常複雜,也非常重要。因為後邊的資料傳輸就是在這個連線的基礎上進行的,像常用的HTTP連線,BT連線等等的都會有這個過程,大家可以去看一下android bluetooth藍芽模組的原始碼,也是相同的,我們這裡就不展開了。bufferRequestBody是構造HttpEngine物件時傳進來的,值為false,所以接下來就是呼叫writeRequestHeaders、createRequestBody來處理了。connect()返回來的是一個Http2xStream物件,它是實現了HttpStream的,writeRequestHeaders是由它來實現的,我們來看一下writeRequestHeaders方法的實現:
    public void writeRequestHeaders(Request request) throws IOException {
        if(this.stream == null) {
            this.httpEngine.writingRequestHeaders();
            boolean permitsRequestBody = this.httpEngine.permitsRequestBody(request);
            List requestHeaders = this.framedConnection.getProtocol() == Protocol.HTTP_2?http2HeadersList(request):spdy3HeadersList(request);
            boolean hasResponseBody = true;
            this.stream = this.framedConnection.newStream(requestHeaders, permitsRequestBody, hasResponseBody);
            this.stream.readTimeout().timeout((long)this.httpEngine.client.readTimeoutMillis(), TimeUnit.MILLISECONDS);
            this.stream.writeTimeout().timeout((long)this.httpEngine.client.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
        }
    }
     這個方法當中就是將當前的請求頭進行轉換,得到一個List requestHeaders資料,然後將它作為引數,呼叫FramedConnection類的newStream方法,我們接著來看newStream方法的實現:
    public FramedStream newStream(List requestHeaders, boolean out, boolean in) throws IOException {
        return this.newStream(0, requestHeaders, out, in);
    }

    private FramedStream newStream(int associatedStreamId, List requestHeaders, boolean out, boolean in) throws IOException {
        boolean outFinished = !out;
        boolean inFinished = !in;
        FrameWriter var9 = this.frameWriter;
        FramedStream stream;
        synchronized(this.frameWriter) {
            int streamId;
            synchronized(this) {
                if(this.shutdown) {
                    throw new IOException("shutdown");
                }

                streamId = this.nextStreamId;
                this.nextStreamId += 2;
                stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders);
                if(stream.isOpen()) {
                    this.streams.put(Integer.valueOf(streamId), stream);
                    this.setIdle(false);
                }
            }

            if(associatedStreamId == 0) {
                this.frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, requestHeaders);
            } else {
                if(this.client) {
                    throw new IllegalArgumentException("client streams shouldn\'t have associated stream IDs");
                }

                this.frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
            }
        }

        if(!out) {
            this.frameWriter.flush();
        }

        return stream;
    }
     這個方法當中就是構造一個FramedStream物件,associatedStreamId值為0,是呼叫時傳進來的,然後呼叫成員變數this.frameWriter.synStream方法將資料寫入流中,往下的我們就不跟蹤了。

     再回來,對照流程圖,看一下Http2xStream類的createRequestBody方法。它當就很簡單,就是呼叫getSink()將上一步返回來的FramedStream物件的成員變數sink取出來返回給呼叫者。注意,這裡返回的實體是FramedStream.FramedDataSink物件,它是實現了Sink介面的。

     好了,請求頭髮送完了,再來對照一下流程圖,接下來就是讀取響應了。

     3、RealCall類的getResponse方法中呼叫HttpEngine.readResponse讀取響應

     我們來看一下這個方法的程式碼:

    public void readResponse() throws IOException {
        if(this.userResponse == null) {
            if(this.networkRequest == null && this.cacheResponse == null) {
                throw new IllegalStateException("call sendRequest() first!");
            } else if(this.networkRequest != null) {
                Response networkResponse;
                if(this.forWebSocket) {
                    this.httpStream.writeRequestHeaders(this.networkRequest);
                    networkResponse = this.readNetworkResponse();
                } else if(!this.callerWritesRequestBody) {
                    networkResponse = (new HttpEngine.NetworkInterceptorChain(0, this.networkRequest)).proceed(this.networkRequest);
                } else {
                    if(this.bufferedRequestBody != null && this.bufferedRequestBody.buffer().size() > 0L) {
                        this.bufferedRequestBody.emit();
                    }

                    if(this.sentRequestMillis == -1L) {
                        if(OkHeaders.contentLength(this.networkRequest) == -1L && this.requestBodyOut instanceof RetryableSink) {
                            long responseCache = ((RetryableSink)this.requestBodyOut).contentLength();
                            this.networkRequest = this.networkRequest.newBuilder().header("Content-Length", Long.toString(responseCache)).build();
                        }

                        this.httpStream.writeRequestHeaders(this.networkRequest);
                    }

                    if(this.requestBodyOut != null) {
                        if(this.bufferedRequestBody != null) {
                            this.bufferedRequestBody.close();
                        } else {
                            this.requestBodyOut.close();
                        }

                        if(this.requestBodyOut instanceof RetryableSink) {
                            this.httpStream.writeRequestBody((RetryableSink)this.requestBodyOut);
                        }
                    }

                    networkResponse = this.readNetworkResponse();
                }

                this.receiveHeaders(networkResponse.headers());
                if(this.cacheResponse != null) {
                    if(validate(this.cacheResponse, networkResponse)) {
                        this.userResponse = this.cacheResponse.newBuilder().request(this.userRequest).priorResponse(stripBody(this.priorResponse)).headers(combine(this.cacheResponse.headers(), networkResponse.headers())).cacheResponse(stripBody(this.cacheResponse)).networkResponse(stripBody(networkResponse)).build();
                        networkResponse.body().close();
                        this.releaseStreamAllocation();
                        InternalCache responseCache1 = Internal.instance.internalCache(this.client);
                        responseCache1.trackConditionalCacheHit();
                        responseCache1.update(this.cacheResponse, stripBody(this.userResponse));
                        this.userResponse = this.unzip(this.userResponse);
                        return;
                    }

                    Util.closeQuietly(this.cacheResponse.body());
                }

                this.userResponse = networkResponse.newBuilder().request(this.userRequest).priorResponse(stripBody(this.priorResponse)).cacheResponse(stripBody(this.cacheResponse)).networkResponse(stripBody(networkResponse)).build();
                if(hasBody(this.userResponse)) {
                    this.maybeCache();
                    this.userResponse = this.unzip(this.cacheWritingResponse(this.storeRequest, this.userResponse));
                }

            }
        }
    }
     在這裡的邏輯中,構造HttpEngine物件時,傳入的兩個引數forWebSocket、callerWritesRequestBody都是false,所以這裡就執行第二個else if分支。可以看到,在okhttp框架中,有很多攔截器,它們以Chain的形式組成一條鏈,至於這塊的東西,我沒有去深入研究,就不展開了,我們繼續我們的流程分析。這裡就是構造一個HttpEngine.NetworkInterceptorChain物件,然後呼叫它的proceed方法,從我們的流程圖當中也可以看到,真正的檔案傳輸就是在這裡進行的。我們來看一下proceed方法的邏輯:
        public Response proceed(Request request) throws IOException {
            ++this.calls;
            if(this.index > 0) {
                Interceptor response = (Interceptor)HttpEngine.this.client.networkInterceptors().get(this.index - 1);
                Address code = this.connection().route().address();
                if(!request.url().host().equals(code.url().host()) || request.url().port() != code.url().port()) {
                    throw new IllegalStateException("network interceptor " + response + " must retain the same host and port");
                }

                if(this.calls > 1) {
                    throw new IllegalStateException("network interceptor " + response + " must call proceed() exactly once");
                }
            }

            if(this.index < HttpEngine.this.client.networkInterceptors().size()) {
                HttpEngine.NetworkInterceptorChain var7 = HttpEngine.this.new NetworkInterceptorChain(this.index + 1, request);
                Interceptor var10 = (Interceptor)HttpEngine.this.client.networkInterceptors().get(this.index);
                Response interceptedResponse = var10.intercept(var7);
                if(var7.calls != 1) {
                    throw new IllegalStateException("network interceptor " + var10 + " must call proceed() exactly once");
                } else if(interceptedResponse == null) {
                    throw new NullPointerException("network interceptor " + var10 + " returned null");
                } else {
                    return interceptedResponse;
                }
            } else {
                HttpEngine.this.httpStream.writeRequestHeaders(request);
                HttpEngine.this.networkRequest = request;
                if(HttpEngine.this.permitsRequestBody(request) && request.body() != null) {
                    Sink var5 = HttpEngine.this.httpStream.createRequestBody(request, request.body().contentLength());
                    BufferedSink var8 = Okio.buffer(var5);
                    request.body().writeTo(var8);
                    var8.close();
                }

                Response var6 = HttpEngine.this.readNetworkResponse();
                int var9 = var6.code();
                if((var9 == 204 || var9 == 205) && var6.body().contentLength() > 0L) {
                    throw new ProtocolException("HTTP " + var9 + " had non-zero Content-Length: " + var6.body().contentLength());
                } else {
                    return var6;
                }
            }
        }

     我們在上一步構造HttpEngine.NetworkInterceptorChain物件時,傳入的第一個引數index為0,而在這個呼叫過程當中,client中的攔截器的數量也是0,所以就直接執行到最後的一個else分支當中了。在這裡判斷我們的request.body() != null,因為我們要上傳檔案,所以這個條件為true,那麼就呼叫構造好一個BufferedSink物件,然後呼叫request.body().writeTo(var8)把我們的目標檔案寫入流中。request.body()獲取到的是一個RequestBody物件,但是它的writeTo方法定義的是一個虛擬函式,而實現就是在我們一開始呼叫create(final MediaType contentType, final File file)構造Request請求當中,直接new了一個new RequestBody(),大家可以回頭看看我們本篇的第一個程式碼片,這裡的writeTo就是回撥到那裡的,而回調時的引數BufferedSink也是在回撥前構造好的。writeTo方法中先呼叫Okio.source(file),將我們的目標檔案轉換成Source型別,然後再sink.writeAll(source)把資料寫進去,這裡的流傳輸底層實現應該都是一樣的,在看藍芽模組檔案傳輸的程式碼時,也可以看到這樣的邏輯,我們的檔案如何傳過去的呢?就是把檔案通過Stream轉換成流,然後將流資料通過Socket傳送給目標。

     好了,我們再回到HttpEngine.NetworkInterceptorChain類的proceed方法當中,檔案資料寫完後,就呼叫HttpEngine.this.readNetworkResponse()讀取響應,最終返回給應用層的Response也就是在這裡生成的。這裡完成後,再往上退一步,就到了RealCall類的getResponse方法中,請求已經處理完了,響應資料也拿回來了,最後通過this.engine.getResponse()獲取到響應,然後就返回給應用層了。

     在這篇文章中,我們只是大概走了一下Okhttp網路框架中的一個最簡單的流程,還有很多非常重要的細節沒有深入研究,比如將檔案讀入到流之後,最後如何通過Socket傳送過去的,中間過程還有涉及到很多非常重要的物件,如檔案轉換後得到的Source,寫檔案時使用的BufferedSink等等,還有很多重量級物件,大家如果有興趣,可以自己研究一下。雖然我們沒有進一步深入分析這些細節,但是從這個過程中,我們還是瞭解到了網路通訊的一些實現,也希望能給大家帶來一些幫助。

     這節課就到這裡了,謝謝大家,也希望大家關注我的部落格!!