1. 程式人生 > >okhttp原始碼分析(三)

okhttp原始碼分析(三)

接著上一章分析request.body().writeTo();

  public @Nullable RequestBody body() {
    return body;
  }
public abstract class RequestBody {
  ............
  /** Returns a new request body that transmits the content of {@code file}. */
  public static RequestBody create(final @Nullable MediaType contentType, final File file) {
    if (file == null) throw new NullPointerException("content == null");

    return new RequestBody() {
      @Override public @Nullable MediaType contentType() {
        return contentType;
      }

      @Override public long contentLength() {
        return file.length();
      }

      @Override public void writeTo(BufferedSink sink) throws IOException {
        Source source = null;
        try {
          source = Okio.source(file);
          sink.writeAll(source);
        } finally {
          Util.closeQuietly(source);
        }
      }
    
}; } }

在ConnectInterceptor中檢視httpCodec是如何建立的,RealConnection

  public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
      StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); // sink初始化 sink = Okio.buffer(Okio.sink(socket)); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1Codec(client, streamAllocation, source, sink);
} }

1.Http2請求建立過程:Http2codec createRequestBody()

  public Http2Codec(OkHttpClient client, Interceptor.Chain chain, StreamAllocation streamAllocation,
      Http2Connection connection) {
    this.client = client;
    this.chain = chain;
    this.streamAllocation = streamAllocation;
    this.connection = connection;

    protocol = client.protocols().contains(Protocol.H2C) ? Protocol.H2C : Protocol.HTTP_2;
  }

  @Override public Sink createRequestBody(Request request, long contentLength) {
    return stream.getSink();
  }

  @Override public void writeRequestHeaders(Request request) throws IOException {
    if (stream != null) return;

    boolean hasRequestBody = request.body() != null;
    List<Header> requestHeaders = http2HeadersList(request);
    //初始化stream成員變數,Http2Connection connection
    stream = connection.newStream(requestHeaders, hasRequestBody);
    stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);
    stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
  }

Http2Connection類:

  public Http2Stream newStream(List<Header> requestHeaders, boolean out) throws IOException {
    return newStream(0, requestHeaders, out);
  }

  private Http2Stream newStream(
      int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
    boolean outFinished = !out;
    boolean inFinished = false;
    boolean flushHeaders;
    Http2Stream stream;
    int streamId;

    synchronized (writer) {
      synchronized (this) {
        if (nextStreamId > Integer.MAX_VALUE / 2) {
          shutdown(REFUSED_STREAM);
        }
        if (shutdown) {
          throw new ConnectionShutdownException();
        }
        streamId = nextStreamId;
        nextStreamId += 2;
        stream = new Http2Stream(streamId, this, outFinished, inFinished, requestHeaders);
        flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;
        if (stream.isOpen()) {
          streams.put(streamId, stream);
        }
      }
      if (associatedStreamId == 0) {
        writer.synStream(outFinished, streamId, associatedStreamId, requestHeaders);
      } else if (client) {
        throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
      } else { // HTTP/2 has a PUSH_PROMISE frame.
        writer.pushPromise(associatedStreamId, streamId, requestHeaders);
      }
    }

    if (flushHeaders) {
      writer.flush();
    }

    return stream;
  }
Http2Stream類:
  Http2Stream(int id, Http2Connection connection, boolean outFinished, boolean inFinished,
      List<Header> requestHeaders) {
    if (connection == null) throw new NullPointerException("connection == null");
    if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
    this.id = id;
    this.connection = connection;
    this.bytesLeftInWriteWindow =
        connection.peerSettings.getInitialWindowSize();
    this.source = new FramingSource(connection.okHttpSettings.getInitialWindowSize());
    this.sink = new FramingSink();
    this.source.finished = inFinished;
    this.sink.finished = outFinished;
    this.requestHeaders = requestHeaders;
  }
  ......
  public Sink getSink() {
    synchronized (this) {
      if (!hasResponseHeaders && !isLocallyInitiated()) {
        throw new IllegalStateException("reply before requesting the sink");
      }
    }
    return sink;
  }
  final class FramingSink implements Sink {
    private static final long EMIT_BUFFER_SIZE = 16384;

    /**
     * Buffer of outgoing data. This batches writes of small writes into this sink as larges frames
     * written to the outgoing connection. Batching saves the (small) framing overhead.
     */
    private final Buffer sendBuffer = new Buffer();

    boolean closed;

    /**
     * True if either side has cleanly shut down this stream. We shall send no more bytes.
     */
    boolean finished;

    @Override public void write(Buffer source, long byteCount) throws IOException {
      assert (!Thread.holdsLock(Http2Stream.this));
      sendBuffer.write(source, byteCount);
      while (sendBuffer.size() >= EMIT_BUFFER_SIZE) {//迴圈寫入
        emitFrame(false);
      }
    }

    /**
     * Emit a single data frame to the connection. The frame's size be limited by this stream's
     * write window. This method will block until the write window is nonempty.
     */
    private void emitFrame(boolean outFinished) throws IOException {
      long toWrite;
      synchronized (Http2Stream.this) {
        writeTimeout.enter();
        try {
          while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
            waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream.
          }
        } finally {
          writeTimeout.exitAndThrowIfTimedOut();
        }

        checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
        toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
        bytesLeftInWriteWindow -= toWrite;
      }

      writeTimeout.enter();
      try {
        connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite);
      } finally {
        writeTimeout.exitAndThrowIfTimedOut();
      }
    }

Http2Connection writeData():

  public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
      throws IOException {
    if (byteCount == 0) { // Empty data frames are not flow-controlled.
      writer.data(outFinished, streamId, buffer, 0);//寫入傳送完成訊息
      return;
    }

    while (byteCount > 0) {
      int toWrite;
      synchronized (Http2Connection.this) {
        try {
          while (bytesLeftInWriteWindow <= 0) {
            // Before blocking, confirm that the stream we're writing is still open. It's possible
            // that the stream has since been closed (such as if this write timed out.)
            if (!streams.containsKey(streamId)) {
              throw new IOException("stream closed");
            }
            Http2Connection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
          }
        } catch (InterruptedException e) {
          throw new InterruptedIOException();
        }

        toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
        toWrite = Math.min(toWrite, writer.maxDataLength());
        bytesLeftInWriteWindow -= toWrite;
      }

      byteCount -= toWrite;
     //Http2Writer writer 在Http2Connection構造方法中 new=Http2Writer(builder.sink,client);
      writer.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
    }
  }

HttpWriter data()函式:

  public synchronized void data(boolean outFinished, int streamId, Buffer source, int byteCount)
      throws IOException {
    if (closed) throw new IOException("closed");
    byte flags = FLAG_NONE;
    if (outFinished) flags |= FLAG_END_STREAM;
    dataFrame(streamId, flags, source, byteCount);
  }

  void dataFrame(int streamId, byte flags, Buffer buffer, int byteCount) throws IOException {
    byte type = TYPE_DATA;
    frameHeader(streamId, byteCount, type, flags);
    if (byteCount > 0) {
      sink.write(buffer, byteCount);
    }
  }

sink變數是構造方法注入的,sink來自Http2Connection的構造方法,Http2Connection由RealConnection建立,經過Http2Codec->Http2Stream->Http2Connection傳遞過程。

  private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build();
    http2Connection.start();
  }

以上方法在RealConnection的connet()函式呼叫,方法中的sink就是Http2Writer中的writer。sink在RealConnection的connectTls()函式或connectSocket()函式例項化的

    ..............
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
    ...............

Okio.sink():

    public static Sink sink(Socket socket) throws IOException {
        if (socket == null) {
            throw new IllegalArgumentException("socket == null");
        } else if (socket.getOutputStream() == null) {
            throw new IOException("socket's output stream == null");
        } else {
            AsyncTimeout timeout = timeout(socket);
            Sink sink = sink((OutputStream)socket.getOutputStream(), (Timeout)timeout);
            return timeout.sink(sink);
        }
    }
    private static Sink sink(final OutputStream out, final Timeout timeout) {
        if (out == null) {
            throw new IllegalArgumentException("out == null");
        } else if (timeout == null) {
            throw new IllegalArgumentException("timeout == null");
        } else {
            return new Sink() {
                public void write(Buffer source, long byteCount) throws IOException {
                    Util.checkOffsetAndCount(source.size, 0L, byteCount);

                    while(byteCount > 0L) {
                        timeout.throwIfReached();
                        Segment head = source.head;
                        int toCopy = (int)Math.min(byteCount, (long)(head.limit - head.pos));
                      out.write(head.data, head.pos, toCopy);
                        head.pos += toCopy;
                        byteCount -= (long)toCopy;
                        source.size -= (long)toCopy;
                        if (head.pos == head.limit) {
                            source.head = head.pop();
                            SegmentPool.recycle(head);
                        }
                    }

                }

                public void flush() throws IOException {
                    out.flush();
                }

                public void close() throws IOException {
                    out.close();
                }

                public Timeout timeout() {
                    return timeout;
                }

                public String toString() {
                    return "sink(" + out + ")";
                }
            };
        }
    }
總結一下request.body().writeTo(Sink sink);的過程,writeTo函式中呼叫的sink.write(),其中的sink在CallServerInterceptor類intercept()函式中傳入的,型別為RealBufferSink,父類為BufferSink,RealBufferSink中封裝的CountingSink,CountingSink類中封裝了FramingSink,在request.body().writeTo(Sink sink);的writeTo中呼叫的RealBufferSink的write方法,將要傳送的資料寫入成員變數buffer中,然後又呼叫了,RealBufferSink中的型別為CountingSink的成員變數sink的write()方法,將要傳送的資料傳給CountingSink,CountingSink的writer方法中呼叫了FramingSink的write()方法,FramingSink的write()方法複雜操作迴圈有呼叫了Http2Connection的writeData()方法,writeData()方法中呼叫了Http2Writer類的data()方法,data()方法中呼叫了sink,sink來自Http2Connection的構造方法,Http2Connetion在RealConnection中建立,sink是在RealConnection例項化的。
RealBufferSink封裝了CountingSingk,CountingSink封裝了FramingSink,FramingSink間接封裝了Sink,像不像一個單向列表。


2.Http1請求建立過程:Http1Codec createRequestBody():

public final class Http1Codec implements HttpCodec {
  private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
  private static final int STATE_OPEN_REQUEST_BODY = 1;
  private static final int STATE_WRITING_REQUEST_BODY = 2;
  private static final int STATE_READ_RESPONSE_HEADERS = 3;
  private static final int STATE_OPEN_RESPONSE_BODY = 4;
  private static final int STATE_READING_RESPONSE_BODY = 5;
  private static final int STATE_CLOSED = 6;
  private static final int HEADER_LIMIT = 256 * 1024;

  /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
  final OkHttpClient client;
  /** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
  final StreamAllocation streamAllocation;

  final BufferedSource source;
  final BufferedSink sink;
  int state = STATE_IDLE;
  private long headerLimit = HEADER_LIMIT;

  public Http1Codec(OkHttpClient client, StreamAllocation streamAllocation, BufferedSource source,
      BufferedSink sink) {
    this.client = client;
    this.streamAllocation = streamAllocation;
    this.source = source;
    this.sink = sink;
  }

  @Override public Sink createRequestBody(Request request, long contentLength) {
    //分塊編碼(chunked)。
    if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
      // Stream a request body of unknown length.
      return newChunkedSink();
    }
    //返回資料大小
    if (contentLength != -1) {
      // Stream a request body of a known length.
      return newFixedLengthSink(contentLength);
    }

    throw new IllegalStateException(
        "Cannot stream a request body without chunked encoding or a known content length!");
  }

  private final class ChunkedSink implements Sink {
    private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
    private boolean closed;

    ChunkedSink() {
    }

    @Override public Timeout timeout() {
      return timeout;
    }

    @Override public void write(Buffer source, long byteCount) throws IOException {
      if (closed) throw new IllegalStateException("closed");
      if (byteCount == 0) return;
      //sink在Http1Codec的構造方法中注入的
      sink.writeHexadecimalUnsignedLong(byteCount);
      sink.writeUtf8("\r\n");
      sink.write(source, byteCount);
      sink.writeUtf8("\r\n");
    }
   ..................
 private final class FixedLengthSink implements Sink {
    private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
    private boolean closed;
    private long bytesRemaining;

    FixedLengthSink(long bytesRemaining) {
      this.bytesRemaining = bytesRemaining;
    }

    @Override public Timeout timeout() {
      return timeout;
    }

    @Override public void write(Buffer source, long byteCount) throws IOException {
      if (closed) throw new IllegalStateException("closed");
      checkOffsetAndCount(source.size(), 0, byteCount);
      if (byteCount > bytesRemaining) {
        throw new ProtocolException("expected " + bytesRemaining
            + " bytes but received " + byteCount);
      }
      sink.write(source, byteCount);
      bytesRemaining -= byteCount;
    }

    @Override public void flush() throws IOException {
      if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
      sink.flush();
    }

    @Override public void close() throws IOException {
      if (closed) return;
      closed = true;
      if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
      detachTimeout(timeout);
      state = STATE_READ_RESPONSE_HEADERS;
    }
  }
除了建立sink的方式不一樣以外和Http2一樣的執行流程。



相關推薦

okhttp原始碼分析

接著上一章分析request.body().writeTo(); public @Nullable RequestBody body() { return body; }public abstract class RequestBody { ........

OkHttp 3.7原始碼分析——任務佇列

前面的部落格已經提到過,OkHttp的一個高效之處在於在內部維護了一個執行緒池,方便高效地執行非同步請求。本篇部落格將詳細介紹OkHttp的任務佇列機制。 1. 執行緒池的優點 OkHttp的任務佇列在內部維護了一個執行緒池用於執行具體的網路請求。而執行緒池

Android ADB 原始碼分析

前言 之前分析的兩篇文章 Android Adb 原始碼分析(一) 嵌入式Linux:Android root破解原理(二)   寫完之後,都沒有寫到相關的實現程式碼,這篇文章寫下ADB的通訊流程的一些細節 看這篇文章之前,請先閱讀 Linux的SOCKET

Flume NG原始碼分析使用Event介面表示資料流

Flume NG有4個主要的元件: Event表示在Flume各個Agent之間傳遞的資料流 Source表示從外部源接收Event資料流,然後傳遞給Channel Channel表示對從Source傳遞的Event資料流的臨時儲存 Sink表示從Channel中接收儲存的Event

GCC原始碼分析——中間語言

原文連結:http://blog.csdn.net/sonicling/article/details/7915301 一、前言   很忙,很久沒更新部落格了,繼續沒寫完的gcc分析,爭取在傳說將要用C++重寫的gcc 5出來之前初略分析完。 二、符號表(GENERI

zigbee 之ZStack-2.5.1a原始碼分析無線資料傳送和接收

前面說過SampleApp_Init和SampleApp_ProcessEvent是我們重點關注的函式,接下來分析無線傳送和接收相關的程式碼: 在SampleApp_ProcessEvent函式中: if ( events & SYS_EVENT_MSG ) {  &nbs

Volley原始碼分析

1.Volley原始碼分析(一) 2.Volley原始碼分析(二) 3.Volley原始碼分析(三) 4.XVolley-基於Volley的封裝的工具類 上一篇分析完了RequestQueue的大部分方法,add執行完後,Volley就會執行執行緒操作了,在第一篇

okhttp原始碼分析——基本流程超詳細

1.okhttp原始碼分析(一)——基本流程(超詳細) 2.okhttp原始碼分析(二)——RetryAndFollowUpInterceptor過濾器 3.okhttp原始碼分析(三)——CacheInterceptor過濾器 4.okhttp原始碼分析(四)——Conn

Dubbo原始碼分析:Dubbo之服務端Service

         如上圖所示的Dubbo的暴露服務的過程,不難看出它也和消費者端很像,也需要一個像reference的物件來維護service關聯的所有物件及其屬性,這裡的reference就是provider。由於ServiceBean實現了  Initializ

Spring component-scan原始碼分析 -- @Autowired等註解的處理

本篇文章分析注入註解(@Autowired、@Value等)的處理,其邏輯在AutowiredAnnotationBeanPostProcessor類中。 可以看到AutowiredAnnotationBeanPostProcessor類實現了一些增強處理的

Spring原始碼分析IoC容器的依賴注入2

protected void populateBean(String beanName, RootBeanDefinition mbd, BeanWrapper bw) { //這裡取得在BeanDefinition中設定的property值,這些property來自對BeanDefini

Spring原始碼分析IoC容器的依賴注入1

    依賴注入的過程是使用者第一次向IoC容器索要Bean時才觸發的,當然也有例外,可以在BeanDefinition資訊中通過控制lazy-init屬性來讓容器完成對Bean的預例項化。這個預例項化實際上也是一個完成依賴注入的過程,但它是在初始化的過程中完成的。

groupcache原始碼分析-- consistanthash

consistanthash.go檔案中是consistanthash模組的程式碼,這主要是提供了一致性hash的一些介面。一致性hash演算法,通常是用在查詢一個合適的下載節點時,使負載更平均,同時也使得某個節點故障不會導致大量的重新對映成本s,要了解一致性h

Java多執行緒之AQSAbstractQueuedSynchronizer 實現原理和原始碼分析

章節概覽、 1、回顧 上一章節,我們分析了ReentrantLock的原始碼: 2、AQS 佇列同步器概述 本章節我們深入分析下AQS(AbstractQueuedSynchronizer)佇列同步器原始碼,AQS是用來構建鎖或者其他同步元件的基礎框架。

libev原始碼分析---ev_timer

ev_timer結構體: typedef struct ev_timer { int active; /* 是否已經啟用 */ int pending; /* 是否事件易產生,需要執行回撥 *

YOLOv2原始碼分析

文章全部YOLOv2原始碼分析 接著上一講沒有講完的make_convolutional_layer函式 0x01 make_convolutional_layer //make_convolutional_laye

EventBus原始碼分析:post方法釋出事件【獲取事件的所有訂閱者,反射呼叫訂閱者事件處理方法】2.4版本

EventBus維護了一個重要的HashMap,這個HashMap的鍵是事件,值是該事件的訂閱者列表,因此post事件的時候就能夠從此HashMap中取出事件的訂閱者列表,對每個訂閱者反射呼叫事件處理方法。 private final Map<Cla

OKHttp原始碼解析

public void readResponse() throws IOException { if(this.userResponse == null) { if(this.networkRequest == null

Muduo網路庫原始碼分析執行緒間使用eventfd通訊和EventLoop::runInLoop系列函式

先說第一點,執行緒(程序)間通訊有很多種方式(pipe,socketpair),為什麼這裡選擇eventfd? eventfd 是一個比 pipe 更高效的執行緒間事件通知機制,一方面它比 pipe

spring4.2.9 java專案環境下ioc原始碼分析——refresh之obtainFreshBeanFactory方法@1準備工作與載入Resource

obtainFreshBeanFactory方法從字面的意思看獲取新的Bean工廠,實際上這是一個過程,一個載入Xml資源並解析,根據解析結果組裝BeanDefinitions,然後初始化BeanFactory的過程。在載入Xml檔案之前,spring還做了一些其他的工作,比