JAVA篇:Java IO (五) 管道流
5、管道流
管道流是用來在多個執行緒之間進行資訊傳遞的Java流,包括位元組管道讀取流PipedInputStream和位元組管道寫入流PipedOutputStream、字元管道讀取流PipedReader和字元管道寫入流PipedWriter。其中讀取流是讀取者/消費者/接收者,寫入流是寫入者/生產者/傳送者。
需要注意的是:
-
管道流僅用於多個執行緒之間傳遞資訊,若用在同一個執行緒中可能會造成死鎖。
-
管道流的輸入輸出是成對的,一個輸出流只能對應一個輸入流,使用建構函式或者connect函式進行連線。
-
一對管道流包含一個緩衝區,其預設值為1024個位元組,若要改變緩衝區大小,可以使用帶有引數的建構函式。
-
管道的讀寫操作是互相阻塞的,當緩衝區為空時,讀操作阻塞;當緩衝區滿時,寫操作阻塞。
-
管道依附於執行緒,因此若執行緒結束,則雖然管道流物件還在,仍然會報錯“read dead end”。
-
管道流的讀取方法與普通流不同,只有輸出流正確close時,輸出流才能讀到-1值。
PipedReader/PipedWriter和PipedInputStream/PipedOutputStream原始碼相似,思路相同,所以以下以PipedInputStream/PipedOutputStream原始碼為例。
5.1 PipedOutputStream原始碼分析
由於connect方法在PipedOutputStream中,所以從PipedOutputStream開始看。
java.io.PipedOutputStream繼承了基類OutputStream。包含PipedInputStream例項sink,在建構函式中可以與傳入管道輸入流進行連線。connect方法所做的是將傳入管道輸入流傳給引數sink,並且初始化一些引數和狀態。由於管道輸入輸出流是一一對應的,在進行連線前,connect方法會進行判斷,若雙方任何一個已有連線則丟擲異常。
/*對應的管道輸入流*/ private PipedInputStream sink; /*建構函式:連線傳入的管道輸入流*/ public PipedOutputStream(PipedInputStream snk) throwsIOException { connect(snk); } /*空引數建構函式:未進行連線*/ public PipedOutputStream() { } /*connect方法*/ public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
PipedOutputStream是生產者/寫入者,將資料寫到“管道”中,由對應的PipedInputStream來讀取,不過緩衝區在PipedInputStream之中,上面connect時初始化的也是對應PipedInputStream中的引數,PipedOutputStream例項在寫入時,呼叫的是對應的消費者/讀取者來receive資料。
public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } sink.receive(b); } public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } sink.receive(b, off, len); }
其close()方法也只是呼叫對應PipedInputStream 的receivedLast方法來實現。
public void close() throws IOException { if (sink != null) { sink.receivedLast(); } }
PipedInputStream中,該方法將closeByWriter置為true並且喚醒所有等待執行緒,將所有資料寫入PipedInputStream的緩衝區。
synchronized void receivedLast() { closedByWriter = true; notifyAll(); }
而PipedInputStream的close()方法則是將closedByReader置為True。而closeByWriter和closedByReader兩個變數在PipedInputStream的receive以及read方法中有不同的作用,在closeByWriter和closedByReader任一為True的時候都不能再呼叫receive方法進行寫入,而在closeByWriter為True,而closedByReader為False時,若緩衝區仍有資料未讀取,則可以繼續讀取。
public void close() throws IOException { closedByReader = true; synchronized (this) { in = -1; } }
5.2 PipedInputStream 原始碼分析
從PipedOutputStream的原始碼可以看出來,PipedOutputStream做的只是連線對應的PipedInputStream 例項並在寫入時呼叫對應的receive方法,管道流具體的實現還是主要在PipedInputStream 之中。
java.io.PipedInputStream繼承了基類InputStream,其主要引數包含以下幾個部分:
-
負責連線與colse的引數
-
讀取執行緒與寫入執行緒
-
“管道”,即緩衝區相關引數
/*負責連線與close的引數*/ boolean closedByWriter = false; volatile boolean closedByReader = false; boolean connected = false; /*讀取執行緒與寫入執行緒*/ Thread readSide; Thread writeSide; /*“管道”,即緩衝區相關引數*/ private static final int DEFAULT_PIPE_SIZE = 1024; protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; protected byte buffer[];//“管道” protected int in = -1;//指向下一個“寫入”的位置 protected int out = 0;//指向下一個“讀取”的位置
而構造方法所做的兩件事情分別是:1、為“管道”的緩衝區分配空間。2、連線對應的PipedOutputStream。四個構造方法的區別在於,當傳入了緩衝區大小則按照自定義大小分配空間,沒有緩衝區大小引數則使用預設大小,當傳入PipedOutputStream引數則進行連線,反之則暫時不進行連線。
/*構造方法1:使用預設“管道”大小,並連線傳入的PipedOutputStream*/ public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } /*構造方法2:自定義“管道”大小,並連線傳入的PipedOutputStream*/ public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } /*構造方法3:使用預設“管道”大小,並未進行連線*/ public PipedInputStream() { initPipe(DEFAULT_PIPE_SIZE); } /*構造方法4:自定義“管道”大小,並未進行連線*/ public PipedInputStream(int pipeSize) { initPipe(pipeSize); } /*為“管道”按照大小分配空間*/ private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } /*PipedInputStream.connect()呼叫傳入PipedOutputStream的connect方法*/ public void connect(PipedOutputStream src) throws IOException { src.connect(this); }
主要問題在於receive系列方法及read系列方法。
PipedInputStream的receive方法,在功能上是實現了“寫入”的功能的,將傳入的資料寫入到“管道”之中。
首先涉及兩個方法checkStateForReceive和awaitSpace。
checkStateForReceive方法所做的是確認這對管道流可用:1、寫入者和讀取者是否已連線 2、是否關閉 3、讀取執行緒是否有效。
private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } }
awaitApace則是在“管道”緩衝區已滿的時候,阻塞資料寫入。ps:由於這個緩衝區使用時可以看做一個迴圈佇列,緩衝區已滿判斷條件是in==out,而判斷緩衝區為空的條件是in=-1(read的時候緩衝區為空會將in置為-1)。
private void awaitSpace() throws IOException { while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } }
兩個receive方法中receive(int b)
方法比較簡單,在判斷了管道流可用以及緩衝區未滿之後寫入,只是在寫入到了緩衝區隊尾(且緩衝區未滿)的時候會跳到隊頭繼續寫入。
而receive(byte b[], int off, int len)
方法在寫入位元組陣列的時候會複雜些。有關“管道”緩衝區,資料有以下三種情況:
-
緩衝區為空:in=-1,out=0,初始時以及在讀取的時候發現緩衝區為空會將in置為-1
-
緩衝區有資料但是未滿:in<out或者out<in
-
緩衝區已滿:int==out
receive(byte b[], int off, int len)
所做的是bytesToTransfer保留還有多少位元組未寫入,nextTransferAmount儲存下一個可寫入空間的大小,寫入後,若bytesToTransfer仍大於0,則繼續迴圈,判斷緩衝區情況,嘗試尋找下一個可寫入空間直至全部寫入。
synchronized void receive(byte b[], int off, int len) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { if (in == out)/*緩衝區已滿:阻塞*/ awaitSpace(); int nextTransferAmount = 0; /*判斷下一個可寫入的連續空間的大小*/ if (out < in) {/*當out<in,下一個可寫入的空間是in到隊尾*/ nextTransferAmount = buffer.length - in; } else if (in < out) { if (in == -1) {/*當緩衝區為空,下一個可寫入的空間是0到隊尾*/ in = out = 0; nextTransferAmount = buffer.length - in; } else {/*當in<out,下一個可寫入的空間是in-->out的空間*/ nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer)/*如果空間足夠寫入則寫入全部*/ nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount;/*如果空間不足夠寫入,則減去已寫入的部分,進入下一個迴圈找下一個可寫入的空間*/ off += nextTransferAmount; in += nextTransferAmount; if (in >= buffer.length) { in = 0; } } }
read方法包含read()
和read(byte b[], int off, int len)
。
read()
方法在判斷了是否有關聯PipedOutputStream,讀取流是否關閉和寫入流是否完全關閉之後開始嘗試讀取資料,有以下情況:
-
緩衝區為空,則先嚐試喚醒全部等待執行緒並等待,等待對應的寫入執行緒是否有未完成的寫入。若有則等待寫入後讀取,若無,則嘗試2次之後丟擲異常並退出。
-
緩衝區不為空,則直接讀取資料,更新引數
public synchronized int read() throws IOException { /*判斷是否有關聯PipedOutputStream,讀取流是否關閉和寫入流是否完全關閉*/ if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; /*如果緩衝區為空*/ while (in < 0) { /*緩衝區為空,並且寫入流已經關閉則結束並返回-1*/ if (closedByWriter) { /* closed by writer, return EOF */ return -1; } /*如果寫入執行緒不再活動,並且已經嘗試等待2次後仍無資料則丟擲異常*/ if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /*可能仍有寫入執行緒在等待的,read方法嘗試喚醒全部執行緒並等待,嘗試2次後退出並丟擲異常*/ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } /*若緩衝區不為空,讀取資料,並更新in和out*/ int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; }
read(byte b[], int off, int len)
跟receive的思路相同,每次都是獲取可讀取的下一個連續空間的位元組數來嘗試讀取,該方法會嘗試讀取足夠多的位元組,如果緩衝區的位元組數<len會全部讀取並返回實際讀取的位元組數。不過在讀取第一個位元組的時候呼叫的read()
方法來進行一系列的判斷及操作,譬如說緩衝區為空時等待並喚醒可能存在的寫入執行緒來寫入後再讀取。
public synchronized int read(byte b[], int off, int len) throws IOException { /*判斷傳入引數的合理性*/ if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ /*嘗試呼叫read()讀取第一個位元組*/ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; /*當緩衝區不為空,並且讀取的位元組數仍不夠時則繼續讀取*/ while ((in >= 0) && (len > 1)) { /*獲取下一次可讀取連續空間的位元組數*/ int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } /*如果這次可讀連續空間的位元組數已經夠了,則只讀取len-1個位元組*/ if (available > (len - 1)) { available = len - 1; } /*讀取資料,並更新引數*/ System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; }
public void Test(){ try{ PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(pis); Wtr wtr = new Wtr(pos); Rdr rdr = new Rdr(pis); wtr.start(); rdr.start(); } catch (IOException e) { e.printStackTrace(); } } class Wtr extends Thread{ private PipedOutputStream writer; public Wtr(PipedOutputStream pos){ writer = pos; } @Override public void run(){ String s = "好好學習,天天向上"; byte[] buf = s.getBytes(); System.out.println("Send "+buf.length+" Bytes : "+s); try { writer.write(buf,0,buf.length); } catch (IOException e) { e.printStackTrace(); } System.out.println("Send done"); } } class Rdr extends Thread{ private PipedInputStream reader; public Rdr(PipedInputStream pis){ reader = pis; } @Override public void run(){ ByteArrayOutputStream bis = new ByteArrayOutputStream(); byte[] buf = new byte[1024]; int len = 0; try { len = reader.read(buf,0,1024); bis.write(buf,0,len); } catch (IOException e) { e.printStackTrace(); } System.out.println("read "+len+"Bytes : "+bis.toString()); } }
5.4 PipedReader和PipedWriter 應用程式碼
public void Test(){ try{ PipedReader prd = new PipedReader(); PipedWriter pwr = new PipedWriter(prd); Wtr wtr = new Wtr(pwr); Rdr rdr = new Rdr(prd); wtr.start(); rdr.start(); } catch (IOException e) { e.printStackTrace(); } } class Wtr extends Thread{ private PipedWriter writer; public Wtr(PipedWriter pwr){ writer = pwr; } @Override public void run(){ String s = "好好學習,天天向上"; char[] chr = s.toCharArray(); System.out.println("Send "+chr.length+" Bytes : "+s); try { writer.write(chr,0,chr.length); } catch (IOException e) { e.printStackTrace(); } System.out.println("Send done"); } } class Rdr extends Thread{ private PipedReader reader; public Rdr(PipedReader prd){ reader = prd; } @Override public void run(){ char[] chr = new char[1024]; int len = 0; try { len = reader.read(chr,0,1024); } catch (IOException e) { e.printStackTrace(); } System.out.println("read "+len+"Bytes : "+new String(chr)); } }當你深入瞭解,你就會發現世界如此廣袤,而你對世界的瞭解則是如此淺薄,請永遠保持謙卑的態度。