okhttp原始碼分析(三)
接著上一章分析request.body().writeTo();
public @Nullable RequestBody body() {
return body;
}
public abstract class RequestBody {
............
/** Returns a new request body that transmits the content of {@code file}. */
public static RequestBody create(final @Nullable MediaType contentType, final File file) {
if (file == null) throw new NullPointerException("content == null");
return new RequestBody() {
@Override public @Nullable MediaType contentType() {
return contentType;
}
@Override public long contentLength() {
return file.length();
}
@Override public void writeTo(BufferedSink sink) throws IOException {
Source source = null;
try {
source = Okio.source(file);
sink.writeAll(source);
} finally {
Util.closeQuietly(source);
}
}
};
}
}
在ConnectInterceptor中檢視httpCodec是如何建立的,RealConnection
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
// sink初始化 sink = Okio.buffer(Okio.sink(socket));
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
1.Http2請求建立過程:Http2codec createRequestBody():
public Http2Codec(OkHttpClient client, Interceptor.Chain chain, StreamAllocation streamAllocation,
Http2Connection connection) {
this.client = client;
this.chain = chain;
this.streamAllocation = streamAllocation;
this.connection = connection;
protocol = client.protocols().contains(Protocol.H2C) ? Protocol.H2C : Protocol.HTTP_2;
}
@Override public Sink createRequestBody(Request request, long contentLength) {
return stream.getSink();
}
@Override public void writeRequestHeaders(Request request) throws IOException {
if (stream != null) return;
boolean hasRequestBody = request.body() != null;
List<Header> requestHeaders = http2HeadersList(request);
//初始化stream成員變數,Http2Connection connection
stream = connection.newStream(requestHeaders, hasRequestBody);
stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);
stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
}
Http2Connection類:
public Http2Stream newStream(List<Header> requestHeaders, boolean out) throws IOException {
return newStream(0, requestHeaders, out);
}
private Http2Stream newStream(
int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
boolean outFinished = !out;
boolean inFinished = false;
boolean flushHeaders;
Http2Stream stream;
int streamId;
synchronized (writer) {
synchronized (this) {
if (nextStreamId > Integer.MAX_VALUE / 2) {
shutdown(REFUSED_STREAM);
}
if (shutdown) {
throw new ConnectionShutdownException();
}
streamId = nextStreamId;
nextStreamId += 2;
stream = new Http2Stream(streamId, this, outFinished, inFinished, requestHeaders);
flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;
if (stream.isOpen()) {
streams.put(streamId, stream);
}
}
if (associatedStreamId == 0) {
writer.synStream(outFinished, streamId, associatedStreamId, requestHeaders);
} else if (client) {
throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
} else { // HTTP/2 has a PUSH_PROMISE frame.
writer.pushPromise(associatedStreamId, streamId, requestHeaders);
}
}
if (flushHeaders) {
writer.flush();
}
return stream;
}
Http2Stream類: Http2Stream(int id, Http2Connection connection, boolean outFinished, boolean inFinished,
List<Header> requestHeaders) {
if (connection == null) throw new NullPointerException("connection == null");
if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
this.id = id;
this.connection = connection;
this.bytesLeftInWriteWindow =
connection.peerSettings.getInitialWindowSize();
this.source = new FramingSource(connection.okHttpSettings.getInitialWindowSize());
this.sink = new FramingSink();
this.source.finished = inFinished;
this.sink.finished = outFinished;
this.requestHeaders = requestHeaders;
}
......
public Sink getSink() {
synchronized (this) {
if (!hasResponseHeaders && !isLocallyInitiated()) {
throw new IllegalStateException("reply before requesting the sink");
}
}
return sink;
}
final class FramingSink implements Sink {
private static final long EMIT_BUFFER_SIZE = 16384;
/**
* Buffer of outgoing data. This batches writes of small writes into this sink as larges frames
* written to the outgoing connection. Batching saves the (small) framing overhead.
*/
private final Buffer sendBuffer = new Buffer();
boolean closed;
/**
* True if either side has cleanly shut down this stream. We shall send no more bytes.
*/
boolean finished;
@Override public void write(Buffer source, long byteCount) throws IOException {
assert (!Thread.holdsLock(Http2Stream.this));
sendBuffer.write(source, byteCount);
while (sendBuffer.size() >= EMIT_BUFFER_SIZE) {//迴圈寫入
emitFrame(false);
}
}
/**
* Emit a single data frame to the connection. The frame's size be limited by this stream's
* write window. This method will block until the write window is nonempty.
*/
private void emitFrame(boolean outFinished) throws IOException {
long toWrite;
synchronized (Http2Stream.this) {
writeTimeout.enter();
try {
while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream.
}
} finally {
writeTimeout.exitAndThrowIfTimedOut();
}
checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
bytesLeftInWriteWindow -= toWrite;
}
writeTimeout.enter();
try {
connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite);
} finally {
writeTimeout.exitAndThrowIfTimedOut();
}
}
Http2Connection writeData():
public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
throws IOException {
if (byteCount == 0) { // Empty data frames are not flow-controlled.
writer.data(outFinished, streamId, buffer, 0);//寫入傳送完成訊息
return;
}
while (byteCount > 0) {
int toWrite;
synchronized (Http2Connection.this) {
try {
while (bytesLeftInWriteWindow <= 0) {
// Before blocking, confirm that the stream we're writing is still open. It's possible
// that the stream has since been closed (such as if this write timed out.)
if (!streams.containsKey(streamId)) {
throw new IOException("stream closed");
}
Http2Connection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
toWrite = Math.min(toWrite, writer.maxDataLength());
bytesLeftInWriteWindow -= toWrite;
}
byteCount -= toWrite;
//Http2Writer writer 在Http2Connection構造方法中 new=Http2Writer(builder.sink,client);
writer.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
}
}
HttpWriter data()函式:
public synchronized void data(boolean outFinished, int streamId, Buffer source, int byteCount)
throws IOException {
if (closed) throw new IOException("closed");
byte flags = FLAG_NONE;
if (outFinished) flags |= FLAG_END_STREAM;
dataFrame(streamId, flags, source, byteCount);
}
void dataFrame(int streamId, byte flags, Buffer buffer, int byteCount) throws IOException {
byte type = TYPE_DATA;
frameHeader(streamId, byteCount, type, flags);
if (byteCount > 0) {
sink.write(buffer, byteCount);
}
}
sink變數是構造方法注入的,sink來自Http2Connection的構造方法,Http2Connection由RealConnection建立,經過Http2Codec->Http2Stream->Http2Connection傳遞過程。
private void startHttp2(int pingIntervalMillis) throws IOException {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)
.build();
http2Connection.start();
}
以上方法在RealConnection的connet()函式呼叫,方法中的sink就是Http2Writer中的writer。sink在RealConnection的connectTls()函式或connectSocket()函式例項化的
..............
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
...............
Okio.sink():
public static Sink sink(Socket socket) throws IOException {
if (socket == null) {
throw new IllegalArgumentException("socket == null");
} else if (socket.getOutputStream() == null) {
throw new IOException("socket's output stream == null");
} else {
AsyncTimeout timeout = timeout(socket);
Sink sink = sink((OutputStream)socket.getOutputStream(), (Timeout)timeout);
return timeout.sink(sink);
}
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) {
throw new IllegalArgumentException("out == null");
} else if (timeout == null) {
throw new IllegalArgumentException("timeout == null");
} else {
return new Sink() {
public void write(Buffer source, long byteCount) throws IOException {
Util.checkOffsetAndCount(source.size, 0L, byteCount);
while(byteCount > 0L) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int)Math.min(byteCount, (long)(head.limit - head.pos));
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= (long)toCopy;
source.size -= (long)toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
public void flush() throws IOException {
out.flush();
}
public void close() throws IOException {
out.close();
}
public Timeout timeout() {
return timeout;
}
public String toString() {
return "sink(" + out + ")";
}
};
}
}
總結一下request.body().writeTo(Sink sink);的過程,writeTo函式中呼叫的sink.write(),其中的sink在CallServerInterceptor類intercept()函式中傳入的,型別為RealBufferSink,父類為BufferSink,RealBufferSink中封裝的CountingSink,CountingSink類中封裝了FramingSink,在request.body().writeTo(Sink sink);的writeTo中呼叫的RealBufferSink的write方法,將要傳送的資料寫入成員變數buffer中,然後又呼叫了,RealBufferSink中的型別為CountingSink的成員變數sink的write()方法,將要傳送的資料傳給CountingSink,CountingSink的writer方法中呼叫了FramingSink的write()方法,FramingSink的write()方法複雜操作迴圈有呼叫了Http2Connection的writeData()方法,writeData()方法中呼叫了Http2Writer類的data()方法,data()方法中呼叫了sink,sink來自Http2Connection的構造方法,Http2Connetion在RealConnection中建立,sink是在RealConnection例項化的。RealBufferSink封裝了CountingSingk,CountingSink封裝了FramingSink,FramingSink間接封裝了Sink,像不像一個單向列表。
2.Http1請求建立過程:Http1Codec createRequestBody():
public final class Http1Codec implements HttpCodec {
private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
private static final int STATE_OPEN_REQUEST_BODY = 1;
private static final int STATE_WRITING_REQUEST_BODY = 2;
private static final int STATE_READ_RESPONSE_HEADERS = 3;
private static final int STATE_OPEN_RESPONSE_BODY = 4;
private static final int STATE_READING_RESPONSE_BODY = 5;
private static final int STATE_CLOSED = 6;
private static final int HEADER_LIMIT = 256 * 1024;
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
final OkHttpClient client;
/** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
final StreamAllocation streamAllocation;
final BufferedSource source;
final BufferedSink sink;
int state = STATE_IDLE;
private long headerLimit = HEADER_LIMIT;
public Http1Codec(OkHttpClient client, StreamAllocation streamAllocation, BufferedSource source,
BufferedSink sink) {
this.client = client;
this.streamAllocation = streamAllocation;
this.source = source;
this.sink = sink;
}
@Override public Sink createRequestBody(Request request, long contentLength) {
//分塊編碼(chunked)。
if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
// Stream a request body of unknown length.
return newChunkedSink();
}
//返回資料大小
if (contentLength != -1) {
// Stream a request body of a known length.
return newFixedLengthSink(contentLength);
}
throw new IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!");
}
private final class ChunkedSink implements Sink {
private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
private boolean closed;
ChunkedSink() {
}
@Override public Timeout timeout() {
return timeout;
}
@Override public void write(Buffer source, long byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
if (byteCount == 0) return;
//sink在Http1Codec的構造方法中注入的
sink.writeHexadecimalUnsignedLong(byteCount);
sink.writeUtf8("\r\n");
sink.write(source, byteCount);
sink.writeUtf8("\r\n");
}
..................
private final class FixedLengthSink implements Sink {
private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
private boolean closed;
private long bytesRemaining;
FixedLengthSink(long bytesRemaining) {
this.bytesRemaining = bytesRemaining;
}
@Override public Timeout timeout() {
return timeout;
}
@Override public void write(Buffer source, long byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
checkOffsetAndCount(source.size(), 0, byteCount);
if (byteCount > bytesRemaining) {
throw new ProtocolException("expected " + bytesRemaining
+ " bytes but received " + byteCount);
}
sink.write(source, byteCount);
bytesRemaining -= byteCount;
}
@Override public void flush() throws IOException {
if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
sink.flush();
}
@Override public void close() throws IOException {
if (closed) return;
closed = true;
if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
detachTimeout(timeout);
state = STATE_READ_RESPONSE_HEADERS;
}
}
除了建立sink的方式不一樣以外和Http2一樣的執行流程。相關推薦
okhttp原始碼分析(三)
接著上一章分析request.body().writeTo(); public @Nullable RequestBody body() { return body; }public abstract class RequestBody { ........
OkHttp 3.7原始碼分析(三)——任務佇列
前面的部落格已經提到過,OkHttp的一個高效之處在於在內部維護了一個執行緒池,方便高效地執行非同步請求。本篇部落格將詳細介紹OkHttp的任務佇列機制。 1. 執行緒池的優點 OkHttp的任務佇列在內部維護了一個執行緒池用於執行具體的網路請求。而執行緒池
Android ADB 原始碼分析(三)
前言 之前分析的兩篇文章 Android Adb 原始碼分析(一) 嵌入式Linux:Android root破解原理(二) 寫完之後,都沒有寫到相關的實現程式碼,這篇文章寫下ADB的通訊流程的一些細節 看這篇文章之前,請先閱讀 Linux的SOCKET
Flume NG原始碼分析(三)使用Event介面表示資料流
Flume NG有4個主要的元件: Event表示在Flume各個Agent之間傳遞的資料流 Source表示從外部源接收Event資料流,然後傳遞給Channel Channel表示對從Source傳遞的Event資料流的臨時儲存 Sink表示從Channel中接收儲存的Event
GCC原始碼分析(三)——中間語言
原文連結:http://blog.csdn.net/sonicling/article/details/7915301 一、前言 很忙,很久沒更新部落格了,繼續沒寫完的gcc分析,爭取在傳說將要用C++重寫的gcc 5出來之前初略分析完。 二、符號表(GENERI
zigbee 之ZStack-2.5.1a原始碼分析(三)無線資料傳送和接收
前面說過SampleApp_Init和SampleApp_ProcessEvent是我們重點關注的函式,接下來分析無線傳送和接收相關的程式碼: 在SampleApp_ProcessEvent函式中: if ( events & SYS_EVENT_MSG ) { &nbs
Volley原始碼分析(三)
1.Volley原始碼分析(一) 2.Volley原始碼分析(二) 3.Volley原始碼分析(三) 4.XVolley-基於Volley的封裝的工具類 上一篇分析完了RequestQueue的大部分方法,add執行完後,Volley就會執行執行緒操作了,在第一篇
okhttp原始碼分析(一)——基本流程(超詳細)
1.okhttp原始碼分析(一)——基本流程(超詳細) 2.okhttp原始碼分析(二)——RetryAndFollowUpInterceptor過濾器 3.okhttp原始碼分析(三)——CacheInterceptor過濾器 4.okhttp原始碼分析(四)——Conn
Dubbo原始碼分析(三):Dubbo之服務端(Service)
如上圖所示的Dubbo的暴露服務的過程,不難看出它也和消費者端很像,也需要一個像reference的物件來維護service關聯的所有物件及其屬性,這裡的reference就是provider。由於ServiceBean實現了 Initializ
Spring component-scan原始碼分析(三) -- @Autowired等註解的處理
本篇文章分析注入註解(@Autowired、@Value等)的處理,其邏輯在AutowiredAnnotationBeanPostProcessor類中。 可以看到AutowiredAnnotationBeanPostProcessor類實現了一些增強處理的
Spring原始碼分析(三)(IoC容器的依賴注入)(2)
protected void populateBean(String beanName, RootBeanDefinition mbd, BeanWrapper bw) { //這裡取得在BeanDefinition中設定的property值,這些property來自對BeanDefini
Spring原始碼分析(三)(IoC容器的依賴注入)(1)
依賴注入的過程是使用者第一次向IoC容器索要Bean時才觸發的,當然也有例外,可以在BeanDefinition資訊中通過控制lazy-init屬性來讓容器完成對Bean的預例項化。這個預例項化實際上也是一個完成依賴注入的過程,但它是在初始化的過程中完成的。
groupcache原始碼分析(三)-- consistanthash
consistanthash.go檔案中是consistanthash模組的程式碼,這主要是提供了一致性hash的一些介面。一致性hash演算法,通常是用在查詢一個合適的下載節點時,使負載更平均,同時也使得某個節點故障不會導致大量的重新對映成本s,要了解一致性h
Java多執行緒之AQS(AbstractQueuedSynchronizer )實現原理和原始碼分析(三)
章節概覽、 1、回顧 上一章節,我們分析了ReentrantLock的原始碼: 2、AQS 佇列同步器概述 本章節我們深入分析下AQS(AbstractQueuedSynchronizer)佇列同步器原始碼,AQS是用來構建鎖或者其他同步元件的基礎框架。
libev原始碼分析(三)---ev_timer
ev_timer結構體: typedef struct ev_timer { int active; /* 是否已經啟用 */ int pending; /* 是否事件易產生,需要執行回撥 *
YOLOv2原始碼分析(三)
文章全部YOLOv2原始碼分析 接著上一講沒有講完的make_convolutional_layer函式 0x01 make_convolutional_layer //make_convolutional_laye
EventBus原始碼分析(三):post方法釋出事件【獲取事件的所有訂閱者,反射呼叫訂閱者事件處理方法】(2.4版本)
EventBus維護了一個重要的HashMap,這個HashMap的鍵是事件,值是該事件的訂閱者列表,因此post事件的時候就能夠從此HashMap中取出事件的訂閱者列表,對每個訂閱者反射呼叫事件處理方法。 private final Map<Cla
OKHttp原始碼解析(三)
public void readResponse() throws IOException { if(this.userResponse == null) { if(this.networkRequest == null
Muduo網路庫原始碼分析(三)執行緒間使用eventfd通訊和EventLoop::runInLoop系列函式
先說第一點,執行緒(程序)間通訊有很多種方式(pipe,socketpair),為什麼這裡選擇eventfd? eventfd 是一個比 pipe 更高效的執行緒間事件通知機制,一方面它比 pipe
spring4.2.9 java專案環境下ioc原始碼分析(三)——refresh之obtainFreshBeanFactory方法(@1準備工作與載入Resource)
obtainFreshBeanFactory方法從字面的意思看獲取新的Bean工廠,實際上這是一個過程,一個載入Xml資源並解析,根據解析結果組裝BeanDefinitions,然後初始化BeanFactory的過程。在載入Xml檔案之前,spring還做了一些其他的工作,比