1. 程式人生 > >wait和notify方法實現執行緒間的通訊

wait和notify方法實現執行緒間的通訊

    使用wait和notify方法實現執行緒間的通訊需要注意以下兩點:

  • wait和notify必須配合synchronized去使用。
  • wait可以釋放鎖,notify不釋放鎖。

1.wait和notify的簡單應用

    使用wait和notify設計如下程式,t1執行緒向一個List中新增元素,當List的長度為5時向t2執行緒發出通知;t2等待執行緒t1的通知,當t2執行緒接收到t1的通知時,中斷執行緒。程式程式碼如下:

public class ListAdd {
    private volatile static List list = new ArrayList();

    public void add() {
        list.add("Hello World");
    }

    public int size() {
        return list.size();
    }

    public static void main(String[] args) {

        final ListAdd listAdd = new ListAdd();

        // 例項化出來一個 lock
        // 當使用wait 和 notify 的時候 , 一定要配合著synchronized關鍵字去使用
        final Object lock = new Object();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (lock) {
                        for (int i = 0; i < 10; i++) {
                            listAdd.add();
                            System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素..");
                            Thread.sleep(500);
                            if (listAdd.size() == 5) {
                                System.out.println("已經發出通知..");
                                lock.notify();
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }, "t1");

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    if (listAdd.size() != 5) {
                        try {
                            System.out.println("t2進入...");
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("當前執行緒:" + Thread.currentThread().getName() + "收到通知執行緒停止..");
                    throw new RuntimeException();
                }
            }
        }, "t2");
        t2.start();
        t1.start();
    }
}

程式的執行結果如下:

t2進入...
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
已經發出通知..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
10
當前執行緒:t2收到通知執行緒停止..
Exception in thread "t2" java.lang.RuntimeException
	at com.bjsxt.base.conn008.ListAdd$2.run(ListAdd.java:66)
	at java.lang.Thread.run(Thread.java:748)

首先宣告一個lock鎖物件,然後啟動t2執行緒,獲得lock鎖,此時list中的元素等於5,t2執行緒等待並釋放鎖。 t1執行緒獲得lock鎖,迴圈執行元素新增操作,當list的長度為5的時候,傳送notify通知,但t1執行緒仍然持有lock鎖,繼續執行,直到執行緒執行結束釋放鎖。t2執行緒重新獲得lock鎖,並被喚醒,繼續執行程式到結束。

2.CountDownLatch

上面的程式存在什麼樣的問題呢?

在上面的程式中,當list元素達到5個的時候,並沒有實時的去釋放鎖,而必須等到程式執行結束,如果我們想實時的釋放鎖可以使用CountDownLatch。

public class ListAdd2 {
    private volatile static List list = new ArrayList();

    public void add() {
        list.add("Hello World");
    }

    public int size() {
        return list.size();
    }

    public static void main(String[] args) {

        final ListAdd2 list2 = new ListAdd2();

        // 1 例項化出來一個 lock
        // 當使用wait 和 notify 的時候 , 一定要配合著synchronized關鍵字去使用
        // 建構函式中的數字表示countDown呼叫的次數
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        list2.add();
                        System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素..");
                        if (list2.size() == 5) {
                            System.out.println("已經發出通知..");
                            countDownLatch.countDown();
                        }
                        Thread.sleep(500);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }, "t1");

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                if (list2.size() != 5) {
                    try {
                        System.out.println("t2進入...");
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("當前執行緒:" + Thread.currentThread().getName() + "收到通知執行緒停止..");
                throw new RuntimeException();
            }
        }, "t2");
        t2.start();
        t1.start();
    }
}

執行結果如下:

t2進入...
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
已經發出通知..
當前執行緒:t2收到通知執行緒停止..
Exception in thread "t2" java.lang.RuntimeException
	at com.bjsxt.base.conn008.ListAdd2$2.run(ListAdd2.java:67)
	at java.lang.Thread.run(Thread.java:748)
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..
當前執行緒:t1添加了一個元素..

從執行結果我們可以看出,list中的元素達到5個的時候,等待的執行緒t2會立即得到執行,此時t1執行緒會繼續執行,並不會受到阻塞。final CountDownLatch countDownLatch = new CountDownLatch(1)中的建構函式數字,表示t2執行緒繼續執行時countDown需要呼叫的次數。

3.wait和notify實現佇列

使用wait和notify實現一個BlockingQueue(阻塞佇列),支援阻塞的放入和得到資料。需要實現兩個簡單的方法put和take操作。

put(anObject):把anObject加到Blocking裡。如果BlockingQueue中沒有空間,則呼叫此方法的執行緒被阻塞,直到BlockingQueue裡面有空間再繼續。

take:取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻塞進入等待狀態直到BlockingQueue有新的資料被加入。

public class MyQueue {

    //1 需要一個承裝元素的集合 
    private LinkedList<Object> list = new LinkedList<Object>();
    //2 需要一個計數器
    private AtomicInteger count = new AtomicInteger(0);
    //3 需要制定上限和下限
    private final int minSize = 0;
    private final int maxSize;

    //4 構造方法
    public MyQueue(int size) {
        this.maxSize = size;
    }

    //5 初始化一個物件 用於加鎖
    private final Object lock = new Object();

    //put(anObject): 把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷,直到BlockingQueue裡面有空間再繼續.
    public void put(Object obj) {
        synchronized (lock) {
            while (count.get() == this.maxSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //1 加入元素
            list.add(obj);
            //2 計數器累加
            count.incrementAndGet();
            //3 通知另外一個執行緒(喚醒)
            lock.notify();
            System.out.println("新加入的元素為:" + obj);
        }
    }

    //take: 取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入.
    public Object take() {
        Object ret = null;
        synchronized (lock) {
            while (count.get() == this.minSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //1 做移除元素操作
            ret = list.removeFirst();
            //2 計數器遞減
            count.decrementAndGet();
            //3 喚醒另外一個執行緒
            lock.notify();
        }
        return ret;
    }

    public int getSize() {
        return this.count.get();
    }

    public static void main(String[] args) {

        final MyQueue mq = new MyQueue(5);
        mq.put("a");
        mq.put("b");
        mq.put("c");
        mq.put("d");
        mq.put("e");

        System.out.println("當前容器的長度:" + mq.getSize());

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                mq.put("f");
                mq.put("g");
            }
        }, "t1");
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                Object o1 = mq.take();
                System.out.println("移除的元素為:" + o1);
                Object o2 = mq.take();
                System.out.println("移除的元素為:" + o2);
            }
        }, "t2");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t2.start();
    }
}

程式的執行結果如下:

新加入的元素為:a
新加入的元素為:b
新加入的元素為:c
新加入的元素為:d
新加入的元素為:e
當前容器的長度:5
移除的元素為:a
新加入的元素為:f
移除的元素為:b
新加入的元素為:g