wait和notify方法實現執行緒間的通訊
使用wait和notify方法實現執行緒間的通訊需要注意以下兩點:
- wait和notify必須配合synchronized去使用。
- wait可以釋放鎖,notify不釋放鎖。
1.wait和notify的簡單應用
使用wait和notify設計如下程式,t1執行緒向一個List中新增元素,當List的長度為5時向t2執行緒發出通知;t2等待執行緒t1的通知,當t2執行緒接收到t1的通知時,中斷執行緒。程式程式碼如下:
public class ListAdd { private volatile static List list = new ArrayList(); public void add() { list.add("Hello World"); } public int size() { return list.size(); } public static void main(String[] args) { final ListAdd listAdd = new ListAdd(); // 例項化出來一個 lock // 當使用wait 和 notify 的時候 , 一定要配合著synchronized關鍵字去使用 final Object lock = new Object(); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { listAdd.add(); System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素.."); Thread.sleep(500); if (listAdd.size() == 5) { System.out.println("已經發出通知.."); lock.notify(); } } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { synchronized (lock) { if (listAdd.size() != 5) { try { System.out.println("t2進入..."); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("當前執行緒:" + Thread.currentThread().getName() + "收到通知執行緒停止.."); throw new RuntimeException(); } } }, "t2"); t2.start(); t1.start(); } }
程式的執行結果如下:
t2進入... 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 已經發出通知.. 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 當前執行緒:t1添加了一個元素.. 10 當前執行緒:t2收到通知執行緒停止.. Exception in thread "t2" java.lang.RuntimeException at com.bjsxt.base.conn008.ListAdd$2.run(ListAdd.java:66) at java.lang.Thread.run(Thread.java:748)
首先宣告一個lock鎖物件,然後啟動t2執行緒,獲得lock鎖,此時list中的元素等於5,t2執行緒等待並釋放鎖。 t1執行緒獲得lock鎖,迴圈執行元素新增操作,當list的長度為5的時候,傳送notify通知,但t1執行緒仍然持有lock鎖,繼續執行,直到執行緒執行結束釋放鎖。t2執行緒重新獲得lock鎖,並被喚醒,繼續執行程式到結束。
2.CountDownLatch
上面的程式存在什麼樣的問題呢?
在上面的程式中,當list元素達到5個的時候,並沒有實時的去釋放鎖,而必須等到程式執行結束,如果我們想實時的釋放鎖可以使用CountDownLatch。
public class ListAdd2 { private volatile static List list = new ArrayList(); public void add() { list.add("Hello World"); } public int size() { return list.size(); } public static void main(String[] args) { final ListAdd2 list2 = new ListAdd2(); // 1 例項化出來一個 lock // 當使用wait 和 notify 的時候 , 一定要配合著synchronized關鍵字去使用 // 建構函式中的數字表示countDown呼叫的次數 final CountDownLatch countDownLatch = new CountDownLatch(1); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { for (int i = 0; i < 10; i++) { list2.add(); System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素.."); if (list2.size() == 5) { System.out.println("已經發出通知.."); countDownLatch.countDown(); } Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { if (list2.size() != 5) { try { System.out.println("t2進入..."); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("當前執行緒:" + Thread.currentThread().getName() + "收到通知執行緒停止.."); throw new RuntimeException(); } }, "t2"); t2.start(); t1.start(); } }
執行結果如下:
t2進入...
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
已經發出通知..
當前執行緒:t2收到通知執行緒停止..
Exception in thread "t2" java.lang.RuntimeException
at com.bjsxt.base.conn008.ListAdd2$2.run(ListAdd2.java:67)
at java.lang.Thread.run(Thread.java:748)
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
從執行結果我們可以看出,list中的元素達到5個的時候,等待的執行緒t2會立即得到執行,此時t1執行緒會繼續執行,並不會受到阻塞。final CountDownLatch countDownLatch = new CountDownLatch(1)中的建構函式數字,表示t2執行緒繼續執行時countDown需要呼叫的次數。
3.wait和notify實現佇列
使用wait和notify實現一個BlockingQueue(阻塞佇列),支援阻塞的放入和得到資料。需要實現兩個簡單的方法put和take操作。
put(anObject):把anObject加到Blocking裡。如果BlockingQueue中沒有空間,則呼叫此方法的執行緒被阻塞,直到BlockingQueue裡面有空間再繼續。
take:取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻塞進入等待狀態直到BlockingQueue有新的資料被加入。
public class MyQueue {
//1 需要一個承裝元素的集合
private LinkedList<Object> list = new LinkedList<Object>();
//2 需要一個計數器
private AtomicInteger count = new AtomicInteger(0);
//3 需要制定上限和下限
private final int minSize = 0;
private final int maxSize;
//4 構造方法
public MyQueue(int size) {
this.maxSize = size;
}
//5 初始化一個物件 用於加鎖
private final Object lock = new Object();
//put(anObject): 把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷,直到BlockingQueue裡面有空間再繼續.
public void put(Object obj) {
synchronized (lock) {
while (count.get() == this.maxSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//1 加入元素
list.add(obj);
//2 計數器累加
count.incrementAndGet();
//3 通知另外一個執行緒(喚醒)
lock.notify();
System.out.println("新加入的元素為:" + obj);
}
}
//take: 取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入.
public Object take() {
Object ret = null;
synchronized (lock) {
while (count.get() == this.minSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//1 做移除元素操作
ret = list.removeFirst();
//2 計數器遞減
count.decrementAndGet();
//3 喚醒另外一個執行緒
lock.notify();
}
return ret;
}
public int getSize() {
return this.count.get();
}
public static void main(String[] args) {
final MyQueue mq = new MyQueue(5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("當前容器的長度:" + mq.getSize());
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
mq.put("f");
mq.put("g");
}
}, "t1");
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
Object o1 = mq.take();
System.out.println("移除的元素為:" + o1);
Object o2 = mq.take();
System.out.println("移除的元素為:" + o2);
}
}, "t2");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
程式的執行結果如下:
新加入的元素為:a
新加入的元素為:b
新加入的元素為:c
新加入的元素為:d
新加入的元素為:e
當前容器的長度:5
移除的元素為:a
新加入的元素為:f
移除的元素為:b
新加入的元素為:g