1. 程式人生 > 其它 >Java多執行緒詳解5【面試+工作】

Java多執行緒詳解5【面試+工作】

Java多執行緒詳解【面試+工作】

Java執行緒:新特徵-訊號量

Java的訊號量實際上是一個功能完畢的計數器,對控制一定資源的消費與回收有著很重要的意義,訊號量常常用於多執行緒的程式碼中,並能監控有多少數目的執行緒等待獲取資源,並且通過訊號量可以得知可用資源的數目等等,這裡總是在強調“數目”二字,但不能指出來有哪些在等待,哪些資源可用。

因此,本人認為,這個訊號量類如果能返回數目,還能知道哪些物件在等待,哪些資源可使用,就非常完美了,僅僅拿到這些概括性的數字,對精確控制意義不是很大。目前還沒想到更好的用法。

下面是一個簡單例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/** 
* Java執行緒:新特徵-訊號量
* 
* @author leizhimin 2009-11-5 13:44:45 
*/ 
publicclass Test {
 publicstaticvoid main(String[] args) {
                MyPool myPool = new MyPool(20);
 //建立執行緒池
                ExecutorService threadPool = Executors.newFixedThreadPool(2); 
                MyThread t1 = new MyThread("任務A", myPool, 3); 
                MyThread t2 = new MyThread("任務B", myPool, 12); 
                MyThread t3 = new MyThread("任務C", myPool, 7); 
 //線上程池中執行任務
                threadPool.execute(t1); 
                threadPool.execute(t2); 
                threadPool.execute(t3); 
 //關閉池
                threadPool.shutdown(); 
        } 
} 

/** 
* 一個池 
*/ 
class MyPool { 
 private Semaphore sp;    //池相關的訊號量

 /** 
         * 池的大小,這個大小會傳遞給訊號量
         * 
         * @param size 池的大小
         */ 
        MyPool(int size) {
 this.sp =new Semaphore(size);
        } 

 public Semaphore getSp() {
 return sp;
        } 

 publicvoid setSp(Semaphore sp) {
 this.sp = sp;
        } 
} 

class MyThread extends Thread { 
 private String threadname;            //執行緒的名稱
 private MyPool pool;                        //自定義池
 privateint x;                                    //申請訊號量的大小

        MyThread(String threadname, MyPool pool, int x) {
 this.threadname = threadname;
 this.pool = pool;
 this.x = x;
        } 

 publicvoid run() {
 try {
 //從此訊號量獲取給定數目的許可
                        pool.getSp().acquire(x); 
 //todo:也許這裡可以做更復雜的業務
                        System.out.println(threadname + "成功獲取了" + x +"個許可!");
                } catch (InterruptedException e) {
                        e.printStackTrace(); 
                } finally {
 //釋放給定數目的許可,將其返回到訊號量。
                        pool.getSp().release(x); 
                        System.out.println(threadname + "釋放了" + x +"個許可!");
                } 
        } 
}
任務B成功獲取了12個許可!
任務B釋放了12個許可!
任務A成功獲取了3個許可!
任務C成功獲取了7個許可!
任務C釋放了7個許可!
任務A釋放了3個許可!

Process finished with exit code 0

從結果可以看出,訊號量僅僅是對池資源進行監控,但不保證執行緒的安全,因此,在使用時候,應該自己控制執行緒的安全訪問池資源。

Java執行緒:新特徵-阻塞佇列

阻塞佇列是Java5執行緒新特徵中的內容,Java定義了阻塞佇列的介面java.util.concurrent.BlockingQueue,阻塞佇列的概念是,一個指定長度的佇列,如果佇列滿了,新增新元素的操作會被阻塞等待,直到有空位為止。同樣,當佇列為空時候,請求佇列元素的操作同樣會阻塞等待,直到有可用元素為止。

有了這樣的功能,就為多執行緒的排隊等候的模型實現開闢了便捷通道,非常有用。

java.util.concurrent.BlockingQueue繼承了java.util.Queue介面,可以參看API文件。

下面給出一個簡單應用的例子:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

/** 
* Java執行緒:新特徵-阻塞佇列
* 
* @author leizhimin 2009-11-5 14:59:15 
*/ 
publicclass Test {
 publicstaticvoid main(String[] args)throws InterruptedException {
                BlockingQueue bqueue = new ArrayBlockingQueue(20);
 for (int i = 0; i < 30; i++) {
 //將指定元素新增到此佇列中,如果沒有可用空間,將一直等待(如果有必要)。
                        bqueue.put(i); 
                        System.out.println("向阻塞佇列中添加了元素:" + i);
                } 
                System.out.println("程式到此執行結束,即將退出----");
        } 
}
輸出結果:
向阻塞佇列中添加了元素:0
向阻塞佇列中添加了元素:1 
向阻塞佇列中添加了元素:2 
向阻塞佇列中添加了元素:3 
向阻塞佇列中添加了元素:4 
向阻塞佇列中添加了元素:5 
向阻塞佇列中添加了元素:6 
向阻塞佇列中添加了元素:7 
向阻塞佇列中添加了元素:8 
向阻塞佇列中添加了元素:9 
向阻塞佇列中添加了元素:10 
向阻塞佇列中添加了元素:11 
向阻塞佇列中添加了元素:12 
向阻塞佇列中添加了元素:13 
向阻塞佇列中添加了元素:14 
向阻塞佇列中添加了元素:15 
向阻塞佇列中添加了元素:16 
向阻塞佇列中添加了元素:17 
向阻塞佇列中添加了元素:18 
向阻塞佇列中添加了元素:19

可以看出,輸出到元素19時候,就一直處於等待狀態,因為佇列滿了,程式阻塞了。

這裡沒有用多執行緒來演示,沒有這個必要。

另外,阻塞佇列還有更多實現類,用來滿足各種複雜的需求:ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue,具體的API差別也很小。

Java執行緒:新特徵-阻塞棧

對於阻塞棧,與阻塞佇列相似。不同點在於棧是“後入先出”的結構,每次操作的是棧頂,而佇列是“先進先出”的結構,每次操作的是佇列頭。

這裡要特別說明一點的是,阻塞棧是Java6的新特徵。

Java為阻塞棧定義了介面:java.util.concurrent.BlockingDeque,其實現類也比較多,具體可以檢視JavaAPI文件。

下面看一個簡單例子:

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/** 
* Java執行緒:新特徵-阻塞棧
* 
* @author leizhimin 2009-11-5 15:34:29 
*/ 
publicclass Test {
 publicstaticvoid main(String[] args)throws InterruptedException {
                BlockingDeque bDeque = new LinkedBlockingDeque(20);
 for (int i = 0; i < 30; i++) {
 //將指定元素新增到此阻塞棧中,如果沒有可用空間,將一直等待(如果有必要)。
                        bDeque.putFirst(i); 
                        System.out.println("向阻塞棧中添加了元素:" + i);
                } 
                System.out.println("程式到此執行結束,即將退出----");
        } 
}
輸出結果:
向阻塞棧中添加了元素:0
向阻塞棧中添加了元素:1 
向阻塞棧中添加了元素:2 
向阻塞棧中添加了元素:3 
向阻塞棧中添加了元素:4 
向阻塞棧中添加了元素:5 
向阻塞棧中添加了元素:6 
向阻塞棧中添加了元素:7 
向阻塞棧中添加了元素:8 
向阻塞棧中添加了元素:9 
向阻塞棧中添加了元素:10 
向阻塞棧中添加了元素:11 
向阻塞棧中添加了元素:12 
向阻塞棧中添加了元素:13 
向阻塞棧中添加了元素:14 
向阻塞棧中添加了元素:15 
向阻塞棧中添加了元素:16 
向阻塞棧中添加了元素:17 
向阻塞棧中添加了元素:18 
向阻塞棧中添加了元素:19

從上面結果可以看到,程式並沒結束,二是阻塞住了,原因是棧已經滿了,後面追加元素的操作都被阻塞了。

Java執行緒:新特徵-條件變數

條件變數是Java5執行緒中很重要的一個概念,顧名思義,條件變數就是表示條件的一種變數。但是必須說明,這裡的條件是沒有實際含義的,僅僅是個標記而已,並且條件的含義往往通過程式碼來賦予其含義。

這裡的條件和普通意義上的條件表示式有著天壤之別。

條件變數都實現了java.util.concurrent.locks.Condition介面,條件變數的例項化是通過一個Lock物件上呼叫newCondition()方法來獲取的,這樣,條件就和一個鎖物件繫結起來了。因此,Java中的條件變數只能和鎖配合使用,來控制併發程式訪問競爭資源的安全。

條件變數的出現是為了更精細控制執行緒等待與喚醒,在Java5之前,執行緒的等待與喚醒依靠的是Object物件的wait()和notify()/notifyAll()方法,這樣的處理不夠精細。

而在Java5中,一個鎖可以有多個條件,每個條件上可以有多個執行緒等待,通過呼叫await()方法,可以讓執行緒在該條件下等待。當呼叫signalAll()方法,又可以喚醒該條件下的等待的執行緒。有關Condition介面的API可以具體參考JavaAPI文件。

條件變數比較抽象,原因是他不是自然語言中的條件概念,而是程式控制的一種手段。

下面以一個銀行存取款的模擬程式為例來揭蓋Java多執行緒條件變數的神祕面紗:

有一個賬戶,多個使用者(執行緒)在同時操作這個賬戶,有的存款有的取款,存款隨便存,取款有限制,不能透支,任何試圖透支的操作都將等待裡面有足夠存款才執行操作。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** 
* Java執行緒:條件變數 
* 
* @author leizhimin 2009-11-5 10:57:29 
*/ 
publicclass Test {
 publicstaticvoid main(String[] args) {
 //建立併發訪問的賬戶
           MyCount myCount = new MyCount("95599200901215522", 10000);
 //建立一個執行緒池
                ExecutorService pool = Executors.newFixedThreadPool(2); 
                Thread t1 = new SaveThread("張三", myCount, 2000); 
                Thread t2 = new SaveThread("李四", myCount, 3600); 
                Thread t3 = new DrawThread("王五", myCount, 2700); 
                Thread t4 = new SaveThread("老張", myCount, 600); 
                Thread t5 = new DrawThread("老牛", myCount, 1300); 
                Thread t6 = new DrawThread("胖子", myCount, 800); 
 //執行各個執行緒
                pool.execute(t1); 
                pool.execute(t2); 
                pool.execute(t3); 
                pool.execute(t4); 
                pool.execute(t5); 
                pool.execute(t6); 
 //關閉執行緒池
                pool.shutdown(); 
        } 
} 

/** 
* 存款執行緒類 
*/ 
class SaveThreadextends Thread {
 private String name;                //操作人
 private MyCount myCount;        //賬戶
 privateint x;                            //存款金額

        SaveThread(String name, MyCount myCount, int x) {
 this.name = name;
 this.myCount = myCount;
 this.x = x;
        } 

 publicvoid run() {
                myCount.saving(x, name); 
        } 
} 

/** 
* 取款執行緒類 
*/ 
class DrawThreadextends Thread {
 private String name;                //操作人
 private MyCount myCount;        //賬戶
 privateint x;                            //存款金額

        DrawThread(String name, MyCount myCount, int x) {
 this.name = name;
 this.myCount = myCount;
 this.x = x;
        } 

 publicvoid run() {
                myCount.drawing(x, name); 
        } 
} 


/** 
* 普通銀行賬戶,不可透支 
*/ 
class MyCount { 
 private String oid;                        //賬號
 privateint cash;                            //賬戶餘額
 private Lock lock =new ReentrantLock();                //賬戶鎖
 private Condition _save = lock.newCondition();    //存款條件
 private Condition _draw = lock.newCondition();    //取款條件

        MyCount(String oid, int cash) {
 this.oid = oid;
 this.cash = cash;
        } 

 /** 
         * 存款 
         * 
         * @param x        操作金額
         * @param name 操作人
         */ 
 publicvoid saving(int x, String name) {
                lock.lock();                        //獲取鎖
 if (x > 0) {
                        cash += x;                    //存款
             System.out.println(name + "存款" + x +",當前餘額為" + cash);
                } 
                _draw.signalAll();            //喚醒所有等待執行緒。
                lock.unlock();                    //釋放鎖
        } 

 /** 
         * 取款 
         * 
         * @param x        操作金額
         * @param name 操作人
         */ 
 publicvoid drawing(int x, String name) {
                lock.lock();                                 //獲取鎖
 try {
 if (cash - x < 0) {
                                _draw.await();             //阻塞取款操作
                        } else {
                                cash -= x;                     //取款
          System.out.println(name + "取款" + x +",當前餘額為" + cash);
                        } 
                        _save.signalAll();             //喚醒所有存款操作
                } catch (InterruptedException e) {
                        e.printStackTrace(); 
                } finally {
                        lock.unlock();                     //釋放鎖
                } 
        } 
}
李四存款3600,當前餘額為13600
張三存款2000,當前餘額為15600
老張存款600,當前餘額為16200
老牛取款1300,當前餘額為14900
胖子取款800,當前餘額為14100
王五取款2700,當前餘額為11400

Process finished with exit code 0

假如我們不用鎖和條件變數,如何實現此功能呢?

下面是實現程式碼:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** 
* Java執行緒:不用條件變數
* 
* @author leizhimin 2009-11-5 10:57:29 
*/ 
publicclass Test {
 publicstaticvoid main(String[] args) {
 //建立併發訪問的賬戶
                MyCount myCount = new MyCount("95599200901215522", 10000);
 //建立一個執行緒池
                ExecutorService pool = Executors.newFixedThreadPool(2); 
                Thread t1 = new SaveThread("張三", myCount, 2000); 
                Thread t2 = new SaveThread("李四", myCount, 3600); 
                Thread t3 = new DrawThread("王五", myCount, 2700); 
                Thread t4 = new SaveThread("老張", myCount, 600); 
                Thread t5 = new DrawThread("老牛", myCount, 1300); 
                Thread t6 = new DrawThread("胖子", myCount, 800); 
 //執行各個執行緒
                pool.execute(t1); 
                pool.execute(t2); 
                pool.execute(t3); 
                pool.execute(t4); 
                pool.execute(t5); 
                pool.execute(t6); 
 //關閉執行緒池
                pool.shutdown(); 
        } 
} 

/** 
* 存款執行緒類 
*/ 
class SaveThreadextends Thread {
 private String name;                //操作人
 private MyCount myCount;        //賬戶
 privateint x;                            //存款金額

        SaveThread(String name, MyCount myCount, int x) {
 this.name = name;
 this.myCount = myCount;
 this.x = x;
        } 

 publicvoid run() {
                myCount.saving(x, name); 
        } 
} 

/** 
* 取款執行緒類 
*/ 
class DrawThreadextends Thread {
 private String name;                //操作人
 private MyCount myCount;        //賬戶
 privateint x;                            //存款金額

        DrawThread(String name, MyCount myCount, int x) {
 this.name = name;
 this.myCount = myCount;
 this.x = x;
        } 

 publicvoid run() {
                myCount.drawing(x, name); 
        } 
} 

/** 
* 普通銀行賬戶,不可透支 
*/ 
class MyCount { 
 private String oid;                        //賬號
 privateint cash;                            //賬戶餘額

        MyCount(String oid, int cash) {
 this.oid = oid;
 this.cash = cash;
        } 

 /** 
         * 存款 
         * 
         * @param x        操作金額
         * @param name 操作人
         */ 
 publicsynchronizedvoid saving(int x, String name) {
 if (x > 0) {
                        cash += x;                    //存款
                   System.out.println(name + "存款" + x +",當前餘額為" + cash);
                } 
                notifyAll();            //喚醒所有等待執行緒。
        } 

 /** 
         * 取款 
         * 
         * @param x        操作金額
         * @param name 操作人
         */ 
 publicsynchronizedvoid drawing(int x, String name) {
 if (cash - x < 0) {
 try {
                                wait(); 
                        } catch (InterruptedException e1) {
                                e1.printStackTrace(); 
                        } 
                } else {
                        cash -= x;                     //取款
                  System.out.println(name + "取款" + x +",當前餘額為" + cash);
                } 
                notifyAll();             //喚醒所有存款操作
        } 
}
輸出結果為:
李四存款3600,當前餘額為13600
王五取款2700,當前餘額為10900
老張存款600,當前餘額為11500
老牛取款1300,當前餘額為10200
胖子取款800,當前餘額為9400
張三存款2000,當前餘額為11400

Process finished with exit code 0

結合先前同步程式碼知識,舉一反三,將此例改為同步程式碼塊來實現,

程式碼如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** 
* Java執行緒:改為同步程式碼塊
* 
* @author leizhimin 2009-11-5 10:57:29 
*/ 
publicclass Test {
 publicstaticvoid main(String[] args) {
 //建立併發訪問的賬戶
                MyCount myCount = new MyCount("95599200901215522", 10000);
 //建立一個執行緒池
                ExecutorService pool = Executors.newFixedThreadPool(2); 
                Thread t1 = new SaveThread("張三", myCount, 2000); 
                Thread t2 = new SaveThread("李四", myCount, 3600); 
                Thread t3 = new DrawThread("王五", myCount, 2700); 
                Thread t4 = new SaveThread("老張", myCount, 600); 
                Thread t5 = new DrawThread("老牛", myCount, 1300); 
                Thread t6 = new DrawThread("胖子", myCount, 800); 
 //執行各個執行緒
                pool.execute(t1); 
                pool.execute(t2); 
                pool.execute(t3); 
                pool.execute(t4); 
                pool.execute(t5); 
                pool.execute(t6); 
 //關閉執行緒池
                pool.shutdown(); 
        } 
} 

/** 
* 存款執行緒類 
*/ 
class SaveThreadextends Thread {
 private String name;                //操作人
 private MyCount myCount;        //賬戶
 privateint x;                            //存款金額

        SaveThread(String name, MyCount myCount, int x) {
 this.name = name;
 this.myCount = myCount;
 this.x = x;
        } 

 publicvoid run() {
                myCount.saving(x, name); 
        } 
} 

/** 
* 取款執行緒類 
*/ 
class DrawThreadextends Thread {
 private String name;                //操作人
 private MyCount myCount;        //賬戶
 privateint x;                            //存款金額

        DrawThread(String name, MyCount myCount, int x) {
 this.name = name;
 this.myCount = myCount;
 this.x = x;
        } 

 publicvoid run() {
                myCount.drawing(x, name); 
        } 
} 

/** 
* 普通銀行賬戶,不可透支 
*/ 
class MyCount { 
 private String oid;                        //賬號
 privateint cash;                            //賬戶餘額

        MyCount(String oid, int cash) {
 this.oid = oid;
 this.cash = cash;
        } 

 /** 
         * 存款 
         * 
         * @param x        操作金額
         * @param name 操作人
         */ 
 publicvoid saving(int x, String name) {
 if (x > 0) {
 synchronized (this) {
                                cash += x;                    //存款
                 System.out.println(name + "存款" + x +",當前餘額為" + cash);
                                notifyAll();            //喚醒所有等待執行緒。
                        } 
                } 
        } 

 /** 
         * 取款 
         * 
         * @param x        操作金額
         * @param name 操作人
         */ 
 publicsynchronizedvoid drawing(int x, String name) {
 synchronized (this) {
 if (cash - x < 0) {
 try {
                                        wait(); 
                                } catch (InterruptedException e1) {
                                        e1.printStackTrace(); 
                                } 
                        } else {
                                cash -= x;                     //取款
                  System.out.println(name + "取款" + x +",當前餘額為" + cash);
                        } 
                } 
                notifyAll();             //喚醒所有存款操作
        } 
}

李四存款3600,當前餘額為13600 王五取款2700,當前餘額為10900 老張存款600,當前餘額為11500 老牛取款1300,當前餘額為10200 胖子取款800,當前餘額為9400 張三存款2000,當前餘額為11400 Process finished with exit code 0

對比以上三種方式,從控制角度上講,第一種最靈活,第二種程式碼最簡單,第三種容易犯錯。