1. 程式人生 > >Java執行緒安全佇列Queue

Java執行緒安全佇列Queue

在Java多執行緒應用中,佇列的使用率很高,多數生產消費模型的首選資料結構就是佇列。Java提供的執行緒安全的Queue可以分為阻塞佇列和非阻塞佇列,其中阻塞佇列的典型例子是BlockingQueue,非阻塞佇列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞佇列或者非阻塞佇列。
注:什麼叫執行緒安全?這個首先要明確。執行緒安全的類 ,指的是類內共享的全域性變數的訪問必須保證是不受多執行緒形式影響的。如果由於多執行緒的訪問(比如修改、遍歷、檢視)而使這些變數結構被破壞或者針對這些變數操作的原子性被破壞,則這個類就不是執行緒安全的。
今天就聊聊這兩種Queue,本文分為以下兩個部分,用分割線分開:

BlockingQueue  阻塞演算法
ConcurrentLinkedQueue,非阻塞演算法

首先來看看BlockingQueue
Queue是什麼就不需要多說了吧,一句話:佇列是先進先出。相對的,棧是後進先出。如果不熟悉的話先找本基礎的資料結構的書看看吧。

BlockingQueue,顧名思義,“阻塞佇列”:可以提供阻塞功能的佇列。
首先,看看BlockingQueue提供的常用方法:

          可能報異常       返回boolean值     可能阻塞             設定等待時間
  入隊      add(e)          offer(e)        put(e)          offer(e,timeout, unit)
  出隊      remove()        poll()          take()           poll(timeout, unit)
  檢視      element()       peek()           無                   無 

從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是:

add(e) remove() element() 方法不會阻塞執行緒。當不滿足約束條件時,會丟擲IllegalStateException 異常。例如:當佇列被元素填滿後,再呼叫add(e),則會丟擲異常。
offer(e) poll() peek() 方法即不會阻塞執行緒,也不會丟擲異常。例如:當佇列被元素填滿後,再呼叫offer(e),則不會插入元素,函式返回false。
要想要實現阻塞功能,需要呼叫put(e) take() 方法。當不滿足約束條件時,會阻塞執行緒。

好,上點原始碼你就更明白了。以ArrayBlockingQueue類為例:
對於第一類方法,很明顯如果操作不成功就拋異常。而且可以看到其實呼叫的是第二類的方法,為什麼?因為第二類方法返回boolean啊。

public boolean add(E e) {  
     if (offer(e))  
         return true;  
     else  
         throw new IllegalStateException("Queue full");//佇列已滿,拋異常  
}  

public E remove() {  
    E x = poll();  
    if (x != null)  
        return x;  
    else  
        throw new NoSuchElementException();//佇列為空,拋異常  
}

對於第二類方法,很標準的ReentrantLock使用方式(不熟悉的朋友看一下這個帖子http://hellosure.iteye.com/blog/1121157),另外對於insert和extract的實現沒啥好說的。
注:先不看阻塞與否,這ReentrantLock的使用方式就能說明這個類是執行緒安全類。

public boolean offer(E e) {  
        if (e == null) throw new NullPointerException();  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            if (count == items.length)//佇列已滿,返回false  
                return false;  
            else {  
                insert(e);//insert方法中發出了notEmpty.signal();  
                return true;  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  

public E poll() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            if (count == 0)//佇列為空,返回false  
                return null;  
            E x = extract();//extract方法中發出了notFull.signal();  
            return x;  
        } finally {  
            lock.unlock();  
        }  
    }

對於第三類方法,這裡面涉及到Condition類,簡要提一下,
await方法指:造成當前執行緒在接到訊號或被中斷之前一直處於等待狀態。
signal方法指:喚醒一個等待執行緒。

public void put(E e) throws InterruptedException {  
        if (e == null) throw new NullPointerException();  
        final E[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            try {  
                while (count == items.length)//如果佇列已滿,等待notFull這個條件,這時當前執行緒被阻塞  
                    notFull.await();  
            } catch (InterruptedException ie) {  
                notFull.signal(); //喚醒受notFull阻塞的當前執行緒  
                throw ie;  
            }  
            insert(e);  
        } finally {  
            lock.unlock();  
        }  
    }  

public E take() throws InterruptedException {  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            try {  
                while (count == 0)//如果佇列為空,等待notEmpty這個條件,這時當前執行緒被阻塞  
                    notEmpty.await();  
            } catch (InterruptedException ie) {  
                notEmpty.signal();//喚醒受notEmpty阻塞的當前執行緒  
                throw ie;  
            }  
            E x = extract();  
            return x;  
        } finally {  
            lock.unlock();  
        }  
    }

第四類方法就是指在有必要時等待指定時間,就不詳細說了。

再來看看BlockingQueue介面的具體實現類吧:

  • ArrayBlockingQueue,其建構函式必須帶一個int引數來指明其大小;

  • LinkedBlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定;

  • PriorityBlockingQueue,其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序。

上面是用ArrayBlockingQueue舉得例子,下面看看LinkedBlockingQueue:
首先,既然是連結串列,就應該有Node節點,它是一個內部靜態類:

static class Node<E> {
    /** The item, volatile to ensure barrier separating write and read */
    volatile E item;
    Node<E> next;
    Node(E x) { item = x; }
}

然後,對於連結串列來說,肯定需要兩個變數來標示頭和尾:

    /** 頭指標 */
    private transient Node<E> head;//head.next是佇列的頭元素
    /** 尾指標 */
    private transient Node<E> last;//last.next是null

那麼,對於入隊和出隊就很自然能理解了:

private void enqueue(E x) {    
    last = last.next = new Node<E>(x);  //入隊是為last再找個下家  
}    

private E dequeue() {    
    Node<E> first = head.next;  //出隊是把head.next取出來,然後將head向後移一位  
    head = first;    
    E x = first.item;    
    first.item = null;    
    return x;    
}

另外,LinkedBlockingQueue相對於ArrayBlockingQueue還有不同是,有兩個ReentrantLock,且佇列現有元素的大小由一個AtomicInteger物件標示。
注:AtomicInteger類是以原子的方式操作整型變數。


    private final AtomicInteger count =new AtomicInteger(0);
    /** 用於讀取的獨佔鎖*/
    private final ReentrantLock takeLock =new ReentrantLock();
    /** 佇列是否為空的條件 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 用於寫入的獨佔鎖 */
    private final ReentrantLock putLock =new ReentrantLock();
    /** 佇列是否已滿的條件 */
    private final Condition notFull = putLock.newCondition();

有兩個Condition很好理解,在ArrayBlockingQueue也是這樣做的。但是為什麼需要兩個ReentrantLock呢?下面會慢慢道來。
讓我們來看看offer和poll方法的程式碼:


 public boolean offer(E e) {  
     if (e == null) throw new NullPointerException();  
     final AtomicInteger count = this.count;  
     if (count.get() == capacity)  
         return false;  
     int c = -1;  
     final ReentrantLock putLock = this.putLock;//入隊當然用putLock   
     putLock.lock();  
     try {  
         if (count.get() < capacity) {  
             enqueue(e); //入隊  
             c = count.getAndIncrement(); //隊長度+1  
             if (c + 1 < capacity)  
                 notFull.signal(); //佇列沒滿,當然可以解鎖了  
         }  
     } finally {  
         putLock.unlock();  
     }  
     if (c == 0)  
         signalNotEmpty();//這個方法裡發出了notEmpty.signal();  
     return c >= 0;  
 }  

public E poll() {  
     final AtomicInteger count = this.count;  
     if (count.get() == 0)  
         return null;  
     E x = null;  
     int c = -1;  
     final ReentrantLock takeLock = this.takeLock;出隊當然用takeLock   
     takeLock.lock();  
     try {  
         if (count.get() > 0) {  
             x = dequeue();//出隊  
             c = count.getAndDecrement();//隊長度-1  
             if (c > 1)  
                 notEmpty.signal();//佇列沒空,解鎖  
         }  
     } finally {  
         takeLock.unlock();  
     }  
     if (c == capacity)  
         signalNotFull();//這個方法裡發出了notFull.signal();  
     return x;  
 }

看看原始碼發現和上面ArrayBlockingQueue的很類似,關鍵的問題在於:為什麼要用兩個ReentrantLockputLock和takeLock?
我們仔細想一下,入隊操作其實操作的只有隊尾引用last,並且沒有牽涉到head。而出隊操作其實只針對head,和last沒有關係。那麼就是說入隊和出隊的操作完全不需要公用一把鎖,所以就設計了兩個鎖,這樣就實現了多個不同任務的執行緒入隊的同時可以進行出隊的操作,另一方面由於兩個操作所共同使用的count是AtomicInteger型別的,所以完全不用考慮計數器遞增遞減的問題。
另外,還有一點需要說明一下:await()和singal()這兩個方法執行時都會檢查當前執行緒是否是獨佔鎖的當前執行緒,如果不是則丟擲java.lang.IllegalMonitorStateException異常。所以可以看到在原始碼中這兩個方法都出現在Lock的保護塊中。

下面再來說說ConcurrentLinkedQueue,它是一個無鎖的併發執行緒安全的佇列。
以下部分的內容參照了這個帖子http://yanxuxin.iteye.com/blog/586943
對比鎖機制的實現,使用無鎖機制的難點在於要充分考慮執行緒間的協調。簡單的說就是多個執行緒對內部資料結構進行訪問時,如果其中一個執行緒執行的中途因為一些原因出現故障,其他的執行緒能夠檢測並幫助完成剩下的操作。這就需要把對資料結構的操作過程精細的劃分成多個狀態或階段,考慮每個階段或狀態多執行緒訪問會出現的情況。
ConcurrentLinkedQueue有兩個volatile的執行緒共享變數:head,tail。要保證這個佇列的執行緒安全就是保證對這兩個Node的引用的訪問(更新,檢視)的原子性和可見性,由於volatile本身能夠保證可見性,所以就是對其修改的原子性要被保證。
下面通過offer方法的實現來看看在無鎖情況下如何保證原子性:


public boolean offer(E e) {    
    if (e == null) throw new NullPointerException();    
    Node<E> n = new Node<E>(e, null);    
    for (;;) {    
        Node<E> t = tail;    
        Node<E> s = t.getNext();    
        if (t == tail) { //------------------------------a    
            if (s == null) { //---------------------------b    
                if (t.casNext(s, n)) { //-------------------c    
                    casTail(t, n); //------------------------d    
                    return true;    
                }    
            } else {    
                casTail(t, s); //----------------------------e    
            }    
        }    
    }    
}

此方法的迴圈內首先獲得尾指標和其next指向的物件,由於tail和Node的next均是volatile的,所以保證了獲得的分別都是最新的值。

  • 程式碼a:t==tail是最上層的協調,如果其他執行緒改變了tail的引用,則說明現在獲得不是最新的尾指標需要重新迴圈獲得最新的值。
  • 程式碼b:s==null的判斷。靜止狀態下tail的next一定是指向null的,但是多執行緒下的另一個狀態就是中間態:tail的指向沒有改變,但是其next已經指向新的結點,即完成tail引用改變前的狀態,這時候s!=null。這裡就是協調的典型應用,直接進入程式碼e去協調參與中間態的執行緒去完成最後的更新,然後重新迴圈獲得新的tail開始自己的新一次的入隊嘗試。另外值得注意的是a,b之間,其他的執行緒可能會改變tail的指向,使得協調的操作失敗。從這個步驟可以看到無鎖實現的複雜性。
  • 程式碼c:t.casNext(s, n)是入隊的第一步,因為入隊需要兩步:更新Node的next,改變tail的指向。程式碼c之前可能發生tail引用指向的改變或者進入更新的中間態,這兩種情況均會使得t指向的元素的next屬性被原子的改變,不再指向null。這時程式碼c操作失敗,重新進入迴圈。
  • 程式碼d:這是完成更新的最後一步了,就是更新tail的指向,最有意思的協調在這兒又有了體現。從程式碼看casTail(t, n)不管是否成功都會接著返回true標誌著更新的成功。首先如果成功則表明本執行緒完成了兩步的更新,返回true是理所當然的;如果 casTail(t, n)不成功呢?要清楚的是完成程式碼c則代表著更新進入了中間態,程式碼d不成功則是tail的指向被其他執行緒改變。意味著對於其他的執行緒而言:它們得到的是中間態的更新,s!=null,進入程式碼e幫助本執行緒執行最後一步並且先於本執行緒成功。這樣本執行緒雖然程式碼d失敗了,但是是由於別的執行緒的協助先完成了,所以返回true也就理所當然了。
    通過分析這個入隊的操作,可以清晰的看到無鎖實現的每個步驟和狀態下多執行緒之間的協調和工作。
    注:上面這大段文字看起來很累,先能看懂多少看懂多少,現在看不懂先不急,下面還會提到這個演算法,並且用示意圖說明,就易懂很多了。

在使用ConcurrentLinkedQueue時要注意,如果直接使用它提供的函式,比如add或者poll方法,這樣我們自己不需要做任何同步。
但如果是非原子操作,比如:


if(!queue.isEmpty()) {    
   queue.poll(obj);    
}

我們很難保證,在呼叫了isEmpty()之後,poll()之前,這個queue沒有被其他執行緒修改。所以對於這種情況,我們還是需要自己同步:

synchronized(queue) {    
    if(!queue.isEmpty()) {    
       queue.poll(obj);    
    }    
}

注:這種需要進行自己同步的情況要視情況而定,不是任何情況下都需要這樣做。

另外還說一下,ConcurrentLinkedQueue的size()是要遍歷一遍集合的,所以儘量要避免用size而改用isEmpty(),以免效能過慢。

好,最後想說點什麼呢,阻塞演算法其實很好理解,簡單點理解就是加鎖,比如在BlockingQueue中看到的那樣,再往前推點,那就是synchronized。相比而言,非阻塞演算法的設計和實現都很困難,要通過低階的原子性來支援併發。下面就簡要的介紹一下非阻塞演算法,以下部分的內容參照了一篇很經典的文章http://www.ibm.com/developerworks/cn/java/j-jtp04186/
注:我覺得可以這樣理解,阻塞對應同步,非阻塞對應併發。也可以說:同步是阻塞模式,非同步是非阻塞模式

舉個例子來說明什麼是非阻塞演算法:非阻塞的計數器
首先,使用同步的執行緒安全的計數器程式碼如下

public final class Counter {  
    private long value = 0;  
    public synchronized long getValue() {  
        return value;  
    }  
    public synchronized long increment() {  
        return ++value;  
    }  
}

下面的程式碼顯示了一種最簡單的非阻塞演算法:使用 AtomicInteger的compareAndSet()(CAS方法)的計數器。compareAndSet()方法規定“將這個變數更新為新值,但是如果從我上次看到這個變數之後其他執行緒修改了它的值,那麼更新就失敗”


public class NonblockingCounter {  
    private AtomicInteger value;//前面提到過,AtomicInteger類是以原子的方式操作整型變數。  
    public int getValue() {  
        return value.get();  
    }  
    public int increment() {  
        int v;  
        do {  
            v = value.get();  
        while (!value.compareAndSet(v, v + 1));  
        return v + 1;  
    }  
}

非阻塞版本相對於基於鎖的版本有幾個效能優勢。首先,它用硬體的原生形態代替 JVM 的鎖定程式碼路徑,從而在更細的粒度層次上(獨立的記憶體位置)進行同步,失敗的執行緒也可以立即重試,而不會被掛起後重新排程。更細的粒度降低了爭用的機會,不用重新排程就能重試的能力也降低了爭用的成本。即使有少量失敗的 CAS 操作,這種方法仍然會比由於鎖爭用造成的重新排程快得多。
NonblockingCounter 這個示例可能簡單了些,但是它演示了所有非阻塞演算法的一個基本特徵——有些演算法步驟的執行是要冒險的,因為知道如果 CAS 不成功可能不得不重做。非阻塞演算法通常叫作樂觀演算法,因為它們繼續操作的假設是不會有干擾。如果發現干擾,就會回退並重試。在計數器的示例中,冒險的步驟是遞增——它檢索舊值並在舊值上加一,希望在計算更新期間值不會變化。如果它的希望落空,就會再次檢索值,並重做遞增計算。

再來一個例子,Michael-Scott 非阻塞佇列演算法的插入操作,ConcurrentLinkedQueue 就是用這個演算法實現的,現在來結合示意圖分析一下,很明朗:

    public class LinkedQueue <E> {  
        private static class Node <E> {  
            final E item;  
            final AtomicReference<Node<E>> next;  
            Node(E item, Node<E> next) {  
                this.item = item;  
                this.next = new AtomicReference<Node<E>>(next);  
            }  
        }  
        private AtomicReference<Node<E>> head  
            = new AtomicReference<Node<E>>(new Node<E>(null, null));  
        private AtomicReference<Node<E>> tail = head;  
        public boolean put(E item) {  
            Node<E> newNode = new Node<E>(item, null);  
            while (true) {  
                Node<E> curTail = tail.get();  
                Node<E> residue = curTail.next.get();  
                if (curTail == tail.get()) {  
                    if (residue == null) /* A */ {  
                        if (curTail.next.compareAndSet(null, newNode)) /* C */ {  
                            tail.compareAndSet(curTail, newNode) /* D */ ;  
                            return true;  
                        }  
                    } else {  
                        tail.compareAndSet(curTail, residue) /* B */;  
                    }  
                }  
            }  
        }  
    }  

看看這程式碼完全就是ConcurrentLinkedQueue 原始碼啊。
插入一個元素涉及頭指標和尾指標兩個指標更新,這兩個更新都是通過 CAS 進行的:從隊列當前的最後節點(C)連結到新節點,並把尾指標移動到新的最後一個節點(D)。如果第一步失敗,那麼佇列的狀態不變,插入執行緒會繼續重試,直到成功。一旦操作成功,插入被當成生效,其他執行緒就可以看到修改。還需要把尾指標移動到新節點的位置上,但是這項工作可以看成是 “清理工作”,因為任何處在這種情況下的執行緒都可以判斷出是否需要這種清理,也知道如何進行清理。

佇列總是處於兩種狀態之一:正常狀態(或稱靜止狀態,圖 1 和 圖 3)或中間狀態(圖 2)。在插入操作之前和第二個 CAS(D)成功之後,佇列處在靜止狀態;在第一個 CAS(C)成功之後,佇列處在中間狀態。在靜止狀態時,尾指標指向的連結節點的 next 欄位總為 null,而在中間狀態時,這個欄位為非 null。任何執行緒通過比較 tail.next 是否為 null,就可以判斷出佇列的狀態,這是讓執行緒可以幫助其他執行緒 “完成” 操作的關鍵。


上圖顯示的是:有兩個元素,處在靜止狀態的佇列

插入操作在插入新元素(A)之前,先檢查佇列是否處在中間狀態。如果是在中間狀態,那麼肯定有其他執行緒已經處在元素插入的中途,在步驟(C)和(D)之間。不必等候其他執行緒完成,當前執行緒就可以 “幫助” 它完成操作,把尾指標向前移動(B)。如果有必要,它還會繼續檢查尾指標並向前移動指標,直到佇列處於靜止狀態,這時它就可以開始自己的插入了。
第一個 CAS(C)可能因為兩個執行緒競爭訪問隊列當前的最後一個元素而失敗;在這種情況下,沒有發生修改,失去 CAS 的執行緒會重新裝入尾指標並再次嘗試。如果第二個 CAS(D)失敗,插入執行緒不需要重試 —— 因為其他執行緒已經在步驟(B)中替它完成了這個操作!

上圖顯示的是:處在插入中間狀態的佇列,在新元素插入之後,尾指標更新之前

上圖顯示的是:在尾指標更新後,佇列重新處在靜止狀態