1. 程式人生 > >【Java TCP/IP Socket】深入剖析socket——TCP通信中由於底層隊列填滿而造成的死鎖問題(含代碼)

【Java TCP/IP Socket】深入剖析socket——TCP通信中由於底層隊列填滿而造成的死鎖問題(含代碼)

parameter 兩個 因此 tar 機制 至少 基礎 named 測試

基礎準備

首先需要明白數據傳輸的底層實現機制,在http://blog.csdn.net/ns_code/article/details/15813809這篇博客中有詳細的介紹,在上面的博客中,我們提到了SendQ和RecvQ緩沖隊列,這兩個緩沖區的容量在具體實現時會受一定的限制,雖然它們使用的實際內存大小會動態地增長和收縮,但還是需要一個硬性的限制,以防止行為異常的程序所控制的單一TCP連接將系統的內存全部消耗。正式由於緩沖區的容量有限,它們可能會被填滿,事實也正是如此,如果與TCP的流量控制機制結合使用,則可能導致一種形式的死鎖。

一旦RecvQ已滿,TCP流控制機制就會產生作用(使用流控制機制的目的是為了保證發送者不會傳輸太多數據,從而超出了接收系統的處理能力),它將阻止傳輸發送端主機的SendQ中的任何數據,直到接收者調用輸入流的read()方法將RecvQ中的數據移除一部分到Delivered中,從而騰出了空間。發送端可以持續地寫出數據,直到SendQ隊列被填滿,如果SendQ隊列已滿時調用輸出流的write()方法,則會阻塞等待,直到有一些字節被傳輸到RecvQ隊列中,如果此時RecvQ隊列也被填滿了,所有的操作都將停止,直到接收端調用了輸入流的read()方法將一些字節傳輸到了Delivered隊列中。

引出問題

我們假設SendQ隊列和RecvQ隊列的大小分別為SQS和RQS。將一個大小為n的字節數組傳遞給發送端write()方法調用,其中n > SQS,直到有至少n-SQS字節的數據傳遞到接收端主機的RecvQ隊列後,該方法才返回。如果n的大小超過了SQS+RQS,write()方法將在接收端從輸入流讀取了至少n-(SQS+RQS)字節後才會返回。如果接收端沒有調用read()方法,大數據量的發送是無法成功的。特別是連接的兩端同時分別調用它的輸出流的write()方法,而他們的緩沖區大小又大於SQS+RQS時,將會發生死鎖:兩個write操作都不能完成,兩個程序都將永遠保持阻塞狀態。

下面考慮一個具體的例子,即主機A上的程序和主機B上的程序之間的TCP連接。假設A和B上的SQS和RQS都是500字節,下圖展示了兩個程序試圖同時發送1500字節時的情況。主機A上的程序中的前500字節已經傳輸到另一端,另外500字節已經復制到了主機A的SendQ隊列中,余下的500字節則無法發送,write()方法將無法返回,直到主機B上程序的RecvQ隊列有空間空出來,然而不幸的是B上的程序也遇到了同樣的情況,而二者都沒有及時調用read()方法從自己的RecvQ隊列中讀取數據到Delivered隊列中。因此,兩個程序的write()方法調用都永遠無法返回,產生死鎖。因此,在寫程序時,要仔細設計協議,以避免在兩個方向上傳輸大量數據時產生死鎖。

示例分析

回顧前面幾篇博客中的TCP通信的示例代碼,基本都是只調用一次write()方法將所有的數據寫出,而且我們測試的數據量也不大。考慮一個壓縮字節的Demo,客戶端從文件中讀取字節,發送到服務端,服務端將受到的文件壓縮後反饋給客戶端。

這裏先給出代碼,客戶端代碼如下:

import java.io.FileInputStream;  
import java.io.FileOutputStream;  
import java.io.IOException;  
import java.io.InputStream;  
import java.io.OutputStream;  
import java.net.Socket;  
  
public class CompressClientNoDeadlock {  
  
  public static final int BUFSIZE = 256;  // Size of read buffer  
  
  public static void main(String[] args) throws IOException {  
  
    if (args.length != 3)  // Test for correct #  of args  
      throw new IllegalArgumentException("Parameter(s): <Server> <Port> <File>");  
  
    String server = args[0];               // Server name or IP address  
    int port = Integer.parseInt(args[1]);  // Server port  
    String filename = args[2];             // File to read data from  
  
    // Open input and output file (named input.gz)  
    final FileInputStream fileIn = new FileInputStream(filename);  
    FileOutputStream fileOut = new FileOutputStream(filename + ".gz");  
    
    // Create socket connected to server on specified port  
    final Socket sock = new Socket(server, port);  
  
    // Send uncompressed byte stream to server  
    Thread thread = new Thread() {  
      public void run() {  
        try {  
          SendBytes(sock, fileIn);  
        } catch (Exception ignored) {}  
      }  
    };  
    thread.start();  
  
    // Receive compressed byte stream from server  
    InputStream sockIn = sock.getInputStream();  
    int bytesRead;                      // Number of bytes read  
    byte[] buffer = new byte[BUFSIZE];  // Byte buffer  
    while ((bytesRead = sockIn.read(buffer)) != -1) {  
      fileOut.write(buffer, 0, bytesRead);  
      System.out.print("R");   // Reading progress indicator  
    }  
    System.out.println();      // End progress indicator line  
  
    sock.close();     // Close the socket and its streams  
    fileIn.close();   // Close file streams  
    fileOut.close();  
  }  
  
  public static void SendBytes(Socket sock, InputStream fileIn)  
      throws IOException {  
  
    OutputStream sockOut = sock.getOutputStream();  
    int bytesRead;                      // Number of bytes read  
    byte[] buffer = new byte[BUFSIZE];  // Byte buffer  
    while ((bytesRead = fileIn.read(buffer)) != -1) {  
      sockOut.write(buffer, 0, bytesRead);  
      System.out.print("W");   // Writing progress indicator  
    }  
    sock.shutdownOutput();     // Done sending  
  }  
}  

死鎖問題的產生原因在客戶端上,因此,服務端的具體代碼我們不再給出,服務端采取邊讀邊寫的策略。

下面我們邊對上面可能產生的問題進行分析。對該示例而言,當需要傳遞的文件容量不是很大時,程序運行正常,也能得到預期的結果,但如果嘗試運行該客戶端並傳遞給它一個大文件,改文件壓縮後仍然很大(在此,大的精確定義取決於程序運行的系統,不過壓縮後依然超過2MB的文件應該就可以使改程序產生死鎖問題),那麽客戶端將打印出一堆W後停止,而且不會打印出任何R,程序也不會終止。

為什麽會產生這種情況呢?我們來看程序,客戶端很明顯是一邊讀取本地文件中的數據,一邊調用輸出流的write()方法,將數據送入客戶端主機的SendQ隊列,直到文件中的數據被讀取完,客戶端才調用輸入流的read()方法,讀取服務端發送回來的數據。

考慮這種情況:客戶端和服務端的SendQ隊列和RecvQ隊列中都有500字節的數據空間,而客戶端發送了一個10000字節的文件,同時假設對於這個文件,服務端讀取1000字節並返回500字節,即壓縮比為2:1,當客戶端發送了2000字節後,服務端將最終全部讀取這些字節,並發回1000字節,由於客戶端此時並沒有調用輸入流的read()方法從客戶端主機的RecvQ隊列中移出數據到Delivered,因此,此時客戶端的RecvQ隊列和服務端的SendQ隊列都被填滿了,此時客戶端還在繼續發送數據,又發送了1000字節的數據,並且被服務端全部讀取,但此時服務端的write操作嘗試都已被阻塞,不能繼續發送數據給客戶端,當客戶端再發送了另外的1000字節數據後,客戶端的SendQ隊列和服務端的RecvQ隊列都將被填滿,後續的客戶端write操作也將阻塞,從而形成死鎖。

解決方案

如何解決這個問題呢?造成死鎖產生的原因是因為客戶端在發送數據的同時,沒有及時讀取反饋回來的數據,從而使數據都阻塞在了底層的傳輸隊列中。

方案一是在編寫客戶端程序時,使客戶端一邊循環調用輸出流的read()方法向服務端發送數據,一邊循環調用輸入流的read()方法讀取從服務端反饋回來的數據,但這也不能完全保證不會產生死鎖。

更好的解決方案是在不同的線程中執行客戶端的write循環和read循環。一個線程從文件中反復讀取未壓縮的字節並將其發送給服務器,直到文件的結尾,然後調用該套接字的shutdownOutput()方法。另一個線程從服務端的輸入流中不斷讀取壓縮後的字節,並將其寫入輸出文件,直到到達了輸入流的結尾(服務器關閉了套接字)。這樣,便可以實現一邊發送,一邊讀取,而且如果一個線程阻塞了,另一個線程仍然可以獨立執行。這樣我們可以對客戶端代碼進行簡單的修改,將SendByes()方法調用放到一個線程中:

Thread thread = new Thread() {  
  public void run() {  
    try {  
      SendBytes(sock, fileIn);  
    } catch (Exception ignored) {}  
  }  
};  
thread.start();  

當然,解決這個問題也可以不使用多線程,而是使用NIO機制(Channel和Selector)。

轉自:http://blog.csdn.net/ns_code/article/details/15939993

【Java TCP/IP Socket】深入剖析socket——TCP通信中由於底層隊列填滿而造成的死鎖問題(含代碼)