1. 程式人生 > >多執行緒、多程序通訊 (java實現)

多執行緒、多程序通訊 (java實現)

程序間通訊方式

1.管道(匿名管道 Pipe)//   PipedInputStream  、PipedOutputStream

2.命名管道(NamedPipe/FIFO)//java 不支援?

3.訊號(Signal) // wait() notify() notifyall() 、管程

4.訊息佇列(MessageQueue)

5.共享記憶體 (SharedMemory)  //java 不支援?

6.記憶體對映(MappedMemory)

7.訊號量(Semaphore)//理解其原子性操作,也是訊號量之所以安全的原因

8.套接字(Socket)  //應該都熟悉

管道:

管道(Pipe/匿名管道):在Linux下“一切皆檔案”,其實這裡的管道就是一個檔案。管道實現程序通訊就是讓兩個程序都能訪問該檔案。 管道流是JAVA中執行緒通訊的常用方式之一,

管道的特徵: 
單向的,也就是說,兩個程序都能訪問這個檔案,假設程序1往檔案內寫東西,那麼程序2 就只能讀取檔案的內容。 簡而言之資料只能從一端流向另一端而不應該雙向流動否則會造成混亂。如果要兩個執行緒之間互通訊,則需要兩個管道流。
②只能用於具有血緣關係的程序間通訊,通常用於父子程序建通訊 
③管道是基於位元組流來通訊的 //byte型別
④依賴於檔案系統,它的生命週期隨程序的結束結束

⑤其本身自帶同步互斥效果半雙工通訊

管道容量大小:64K

使用管道需要注意的4種特殊情況:
(1)如果所有指向管道寫端的檔案描述符都關閉了,而仍然有程序從管道的讀端讀資料,那麼檔案內的所有內容被讀完後再次read就會返回0,就像讀到檔案結尾。//強讀返回0


(2)如果有指向管道寫端的檔案描述符沒有關閉(管道寫段的引用計數大於0),而持有管道寫端的程序沒有向管道內寫入資料,假如這時有程序從管道讀端讀資料,那麼讀完管道內剩餘的資料後就會阻塞等待,直到有資料可讀才讀取資料並返回。//空讀會阻塞
(3)如果所有指向管道讀端的檔案描述符都關閉,此時有程序通過寫端檔案描述符向管道內寫資料時,則該程序就會收到SIGPIPE訊號,並異常終止。//強寫則發生異常
(4)如果有指向管道讀端的檔案描述符沒有關閉(管道讀端的引用計數大於0),而持有管道讀端的程序沒有從管道內讀資料,假如此時有程序通過管道寫段寫資料,那麼管道被寫滿後就會被阻塞,直到管道內有空位置後才寫入資料並返回。//寫滿則阻塞

JAVA實現方式

管道流是JAVA中執行緒通訊的常用方式之一,基本流程如下:
1)建立管道輸出流PipedOutputStream pos和管道輸入流PipedInputStream pis
2)將pos和pis匹配,pos.connect(pis);
3)將pos賦給資訊輸入執行緒,pis賦給資訊獲取執行緒,就可以實現執行緒間的通訊了

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class testPipeConnection {

    public static void main(String[] args) {
        /**
         * 建立管道輸出流
         */
        PipedOutputStream pos = new PipedOutputStream();
        /**
         * 建立管道輸入流
         */
        PipedInputStream pis = new PipedInputStream();
        try {
            /**
             * 將管道輸入流與輸出流連線 此過程也可通過過載的建構函式來實現
             */
            pos.connect(pis);
        } catch (IOException e) {
            e.printStackTrace();
        }
        /**
         * 建立生產者執行緒
         */
        Producer p = new Producer(pos);
        /**
         * 建立消費者執行緒
         */
        Consumer1 c1 = new Consumer1(pis);
        /**
         * 啟動執行緒
         */
        p.start();
        c1.start();
    }
}

/**
 * 生產者執行緒(與一個管道輸入流相關聯)
 * 
 */
class Producer extends Thread {
    private PipedOutputStream pos;

    public Producer(PipedOutputStream pos) {
        this.pos = pos;
    }

    public void run() {
        int i = 0;
        try {
            while(true)
            {
            this.sleep(3000);
            pos.write(i);
            i++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消費者執行緒(與一個管道輸入流相關聯)
 * 
 */
class Consumer1 extends Thread {
    private PipedInputStream pis;

    public Consumer1(PipedInputStream pis) {
        this.pis = pis;
    }

    public void run() {
        try {
            while(true)
            {
            System.out.println("consumer1:"+pis.read());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
命名管道(Name的Pipe/FIFO)
Sorry,Java 不支援命名管道!!

Java中的管道只先於執行緒間通訊,如果要跨程序,就要使用JNI了。

命名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關係程序間的通訊。命名管道在檔案系統中有對應的檔名。命名管道通過命令mkfifo或系統呼叫mkfifo來建立。

訊息佇列 MessageQueue

 Linux下訊息佇列用於運行於同一臺機器上的程序間通訊,它和管道很相似,是一個在系統核心中用來儲存訊息的佇列,它在系統核心中是以訊息連結串列的形式出現。訊息連結串列中節點的結構用msg宣告。事實上,它是一種正逐漸被淘汰的通訊方式,我們可以用流管道或者套介面的方式來取代它,所以,我們對此方式也不再解釋,也建議讀者忽略這種方式。

訊息佇列是訊息的連結表,包括Posix訊息佇列system V訊息佇列。有足夠許可權的程序可以向佇列中新增訊息,被賦予讀許可權的程序則可以讀走佇列中的訊息。訊息佇列克服了訊號承載資訊量少,管道只能承載無格式位元組流以及緩衝區大小受限等缺

訊息佇列的主要特點是非同步處理,主要目的是減少請求響應時間和解耦。所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為訊息放入訊息佇列。同時由於使用了訊息佇列,只要保證訊息格式不變,訊息的傳送方和接收方並不需要彼此聯絡,也不需要受對方的影響,即解耦和。

在android系統中每個執行緒都有一個訊息佇列 Looper ,activity執行緒預設是開啟訊息迴圈機制的,子執行緒需要手動開啟。

共享記憶體shared memory/記憶體對映Mapped memory

共享記憶體:使得多個程序可以訪問同一塊記憶體空間,是最快的可用IPC形式,因為資料不需要在不同的程序間複製。是針對其他通訊機制執行效率較低而設計的。通常由一個程序建立一塊共享記憶體區,其餘程序對這塊記憶體區進行 讀寫。往往與其它通訊機制,如訊號量結合使用,來達到程序間的同步及互斥。

記憶體對映:記憶體對映允許任何多個程序間通訊,每一個使用該機制的程序通過把一個共享的檔案對映到自己的程序地址空間來實現它。

linux得到共享記憶體有兩種方式:對映/dev/mem裝置和記憶體映像檔案。

java應用中只能建立映像檔案共享記憶體(記憶體對映)。

共享記憶體的使用有如下幾個特點:   

    (1)可以被多個程序開啟訪問;     

     (2)讀寫操作的程序在執行讀寫操作時其他程序不能進行寫操作;   

     (3)多個程序可以交替對某一共享記憶體執行寫操作;     

    (4)一個程序執行了記憶體的寫操作後,不影響其他程序對該記憶體的訪問。同時其他程序對更新後的記憶體具有可見性。     

     (5)在程序執行寫操作時如果異常退出,對其他程序寫操作禁止應自動解除。    

記憶體對映在java中的實現

jdk1.4中提供的類MappedByteBuffer為我們實現共享記憶體提供了較好的方法。該緩衝區實際上是一個磁碟檔案的記憶體映像。二者的變化將保持同步,即記憶體資料發生變化會立刻反映到磁碟檔案中,這樣會有效的保證共享記憶體的實現。   

將共享記憶體和磁碟檔案建立聯絡的是檔案通道類:FileChannel。該類的加入是JDK為了統一對外部裝置(檔案、網路介面等)的訪問方法,並且加強了多執行緒對同一檔案進行存取的安全性。例如讀寫操作統一成read和write。這裡只是用它來建立共享記憶體用,它建立了共享記憶體和磁碟檔案之間的一個通道。 

開啟一個檔案建立一個檔案通道可以用RandomAccessFile類中的方法getChannel。該方法將直接返回一個檔案通道。該檔案通道由於對應的檔案設為隨機存取檔案,一方面可以進行讀寫兩種操作,另一方面使用它不會破壞映像檔案的內容(如果用FileOutputStream直接開啟一個映像檔案會將該檔案的大小置為0,當然資料會全部丟失)。這裡,如果用   FileOutputStream和FileInputStream則不能理想的實現共享記憶體的要求,因為這兩個類同時實現自由的讀寫操作要困難得多。   


//   獲得一個只讀的隨機存取檔案物件 
RandomAccessFile   RAFile   =   new   RandomAccessFile(filename,"r");  
//   獲得相應的檔案通道 
FileChannel   fc   =   RAFile.getChannel(); 
//   取得檔案的實際大小,以便映像到共享記憶體 
int   size   =   (int)fc.size();  
//   獲得共享記憶體緩衝區,該共享記憶體只讀 
MappedByteBuffer   mapBuf   =   fc.map(FileChannel.MAP_RO,0,size); 
  

//   獲得一個可讀寫的隨機存取檔案物件 
RAFile   =   new   RandomAccessFile(filename,"rw"); 
//   獲得相應的檔案通道 
fc   =   RAFile.getChannel();  
//   取得檔案的實際大小,以便映像到共享記憶體 
size   =   (int)fc.size(); 
//   獲得共享記憶體緩衝區,該共享記憶體可讀寫 
mapBuf   =   fc.map(FileChannel.MAP_RW,0,size);  
//   獲取頭部訊息:存取許可權 
mode   =   mapBuf.getInt();  

對共享記憶體讀寫時應該考慮同步的問題

共享記憶體在java應用中,經常有如下兩種種應用:   
  (1)永久物件配置     
  在java伺服器應用中,使用者可能會在執行過程中配置一些引數,而這些引數需要永久有效,當伺服器應用重新啟動後,這些配置引數仍然可以對應用起作用。這就可以用到該文中的共享記憶體。該共享記憶體中儲存了伺服器的執行引數和一些物件執行特性。可以在應用啟動時讀入以啟用以前配置的引數。   
    
  (2)查詢共享資料     
  一個應用(例   sys.java)是系統的服務程序,其系統的執行狀態記錄在共享記憶體中,其中執行狀態可能是不斷變化的。為了隨時瞭解系統的執行狀態,啟動另一個應用(例   mon.java),該應用查詢該共享記憶體,彙報系統的執行狀態。   
    

  可見,共享記憶體在java應用中還是很有用的,只要組織好共享記憶體的資料結構,共享記憶體就可以在應用開發中發揮很不錯的作用。

訊號(Signal)

參考:訊號和訊號量的比較 以及在java中的實現

訊號是比較複雜的通訊方式,用於通知接受程序有某種事件發生,除了用於程序間通訊外,程序還可以傳送 訊號給程序本身;linux除了支援Unix早期訊號語義函式sigal外,還支援語義符合Posix.1標準的訊號函式sigaction(實際上,該函式是基於BSD的,BSD為了實現可靠訊號機制,又能夠統一對外介面,用sigaction函式重新實現了signal函式)。

訊號(Signal)是一種處理非同步事件的通訊方式,用於通知其他程序或者自己本身,來告知將有某種事件發生。在Java中,訊號機制通過wait(),notify()和notifyAll()來實現。其中wait()使得當前呼叫wait()的執行緒掛起,並釋放已經獲得的wait()所在程式碼塊的鎖;notify()用於隨即喚醒一個被wait()掛起的執行緒進入執行緒排程佇列;notifyAll()用於喚醒所有被wait()掛起的執行緒進入執行緒排程佇列。

(java 實現管程時可以使用wait 和notify實現)

訊號量(Semaphore)

Semaphore是用來保護一個或者多個共享資源的訪問,Semaphore內部維護了一個計數器,其值為可以訪問的共享資源的個數。一個執行緒要訪問共享資源,先獲得訊號量,如果訊號量的計數器值大於1,意味著有共享資源可以訪問,則使其計數器值減去1,再訪問共享資源。

如果計數器值為0,執行緒進入休眠。當某個執行緒使用完共享資源後,釋放訊號量,並將訊號量內部的計數器加1,之前進入休眠的執行緒將被喚醒並再次試圖獲得訊號量。

主要作為程序間以及同一程序不同執行緒之間的同步手段。

Semaphore 的原子操作檢查數值、修改變數值、以及可能發生的睡眠操作均作為一個單一的不可分割的原子操作。

java實現

Semaphore semaphore = new Semaphore(10,true);  
semaphore.acquire();  
//do something here  
semaphore.release();