BlockingQueue介面及其實現類的原始碼分析
BlockingQueue是一個阻塞佇列的介面,提供了一系列的介面方法。其中方法主要可以分為三類,包括Insert相關的add、offer、put,remove相關的remove()、poll()、take()方法,以及檢視相關的peek()、element方法等。阻塞佇列是執行緒安全的容器,其元素不允許為null,否則會丟擲空指標異常。阻塞佇列可以用於生產者消費者場景中去。
常見的實現類有ArrayBlockingQueue以及LinkedBlockingQueue兩種,下面來詳細來看一下ArrayBlockingQueue以及LinkedBlockingQueue的具體實現。
ArrayBlockingQueue重要引數及建構函式
/** 佇列中元素的資料結構 */ final Object[] items; /** 隊首標記,用於出隊操作 */ int takeIndex; /** 隊尾標記,用於入隊操作 */ int putIndex; /** 佇列中元素的個數 */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /**對於任何操作都需要加鎖,這裡是一個可重入鎖 */ final ReentrantLock lock; /** 用於通知取元素的條件 */ private final Condition notEmpty; /** 控制放元素的條件 */ private final Condition notFull;
/** * 創造一個給定容量的非公平鎖的阻塞佇列 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 創造一個給定容量的陣列,並根據是否公平建立相應的公平鎖。初始化條件。 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
ArrayBlockingQueue入隊操作
以常用的add方法為例子,在這個方法內部實際上是呼叫了offer方法,而offer方法在加鎖的基礎之上呼叫了enqueue方法來將元素放在陣列的尾部,並喚醒那些阻塞了的取元素方法。
add、offer、put的主要區別如下:
add方法在成功是返回true,如果失敗就丟擲異常。
offer方法在新增成功返回true,新增失敗返回false。
put方法如果新增不成功就會一直等待。不會出現丟節點的情況一般。
/**
* 將元素插入佇列中,如果成功則返回true,否則的話丟擲異常。可以看出其本身還是呼叫了offer方法。
*
* <p>This implementation returns <tt>true</tt> if <tt>offer</tt> succeeds,
* else throws an <tt>IllegalStateException</tt>.
*
*
*/
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);//判斷是否為空,如果元素為null,則丟擲異常
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* 將元素插入陣列尾部,並喚醒等待取元素的執行緒。
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
/**
*向陣列中插入元素,如果沒有空間就等待。
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
ArrayBlockingQueue出隊操作
出隊操作也是要加鎖的,以remove為例,從頭開始遍歷,一直到尾部標記的地方為止,當找到一個和所給元素相等的元素時,刪除這個節點的元素。至於take方法和poll方法都是刪除頭部元素,區別在於take方法會等待,而poll方法如果沒有元素可取則會直接返回null。
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* <p>Removal of interior elements in circular array based queues
* is an intrinsically slow and disruptive operation, so should
* be undertaken only in exceptional circumstances, ideally
* only when the queue is known not to be accessible by other
* threads.
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)//當元素為0時,會自動阻塞到條件佇列中去。知道被其他方法喚醒。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
ArrayBlockingQueue總結
ArrayBlockingQueue本質上就是對一個數組進行操作,它嚴格不允許元素為null,但是這些操作都是加了鎖的操作,因此它們都是執行緒安全的,並且可以根據實際情況來選擇鎖的公平非公平,公平鎖可以嚴格保證執行緒不會飢餓,而非公平鎖則可以提高系統的吞吐量。並且由於它還是一個佇列,對於佇列的操作也大多數都是在頭部或者是尾部的操作。除了鎖之外,ArrayBlockingQueue還提供了兩個condition來實現等待操作,這裡的方法其實就是把那些被阻塞的執行緒放在Condition佇列中,然後當有signal操作是就喚醒最前面的執行緒執行。整體而言這些操作就是在陣列的基礎之上加鎖,相對簡單。
LinkedBlockingQueue原始碼分析
類似與ArrayList和LinkedList,和ArrayBlockingQueue對應的是LinkedBlockingqueue,不同的地方在於一個底層是陣列實現,一個底層是當連結串列來實現。再就是一個底層只有一把鎖,而另一個有兩把鎖。首先來看一下其中的重要引數。
LinkedBlockingQueue有兩把鎖,對應著隊尾和隊頭的操作,也就是說新增和刪除操作不互斥,這和上面的是不一樣的,因此其併發量要高於ArrayBlockingQueue。
/**
* 這裡的節點相對簡單,單向連結串列。
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** 記錄連結串列容量 */
private final int capacity;
/**元素數目 */
private final AtomicInteger count = new AtomicInteger();
/**
* 頭結點
*/
transient Node<E> head;
/**
* 尾節點
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue入隊操作
以offer方法為例子,當我們嘗試入隊一個null時就會丟擲異常。其他情況下當容量不夠時就返回false,否則就可以給這個入隊操作進行加鎖,當元素個數小於容量時就新增節點到尾部然後給count+1,喚醒其他被阻塞的新增元素執行緒。這裡獲取的count是還沒有加一之前的值,因此它的值如果為0,那麼至少佇列中還是有一個元素的,可以喚醒消費執行緒消費了。
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();//為null丟擲異常
final AtomicInteger count = this.count;
if (count.get() == capacity) // 量不足返回false.
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();//獲取put鎖
try {
if (count.get() < capacity) {//count小於容量在隊尾入隊
enqueue(node);
c = count.getAndIncrement();//count加一
if (c + 1 < capacity)
notFull.signal();//仍然有剩餘容量,喚醒等待的put執行緒
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//喚醒消費執行緒
return c >= 0;
}
/**
* 這個方法意味這last.next=node,也就是把node作為last的下一個節點。
*然後將last=last.next,也就是把last向後移。
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
LinkedBlockingQueue出隊操作
先來看一下remove方法的出隊操作。由於remove方法要刪除的元素不固定,可能出現在佇列中的任何地方,因此需要同時鎖住隊首和隊尾兩把鎖,然後從頭到尾諸葛遍歷元素,當找到元素之後就刪除這個元素。這裡其實類似於單鏈表中的節點刪除,不同的地方在於要加鎖!
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
poll 方法返回隊頭元素並刪除之。如果佇列中沒有資料就返回null,否則的話就說明有元素,那麼就可以講一個元素出列,同時將count值減一,C>1意味著佇列中至少有一個元素,因此可以喚醒等待著的消費執行緒進行消費。當c的值等於容量時,此時c的實際值是容量減一,可以喚醒等待新增元素的執行緒進行新增。因為他們之前最有可能會被阻塞。 public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();//獲得count值並將其減一
if (c > 1)
notEmpty.signal();//至少有一個元素,喚醒等待著的消費執行緒。
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();喚醒
return x;
}
LinkedBlockingQueue總結
LinkedBlockingQueue本質上是一個連結串列,區別於普通連結串列的地方在於它在隊首和隊尾的地方加了兩把鎖,分別對應於生產元素和消費元素,因此和獨佔鎖比起來會快一些(畢竟可以首尾同時操作),它本身的元素也是不允許為null的。
ArrayBlockingQueue和LinkedBlockingQueue比較
1.ABQ的底層是陣列實現,LBQ的底層是連結串列實現。
2.ABQ加鎖只加一把,並且是全域性的,而LBQ的鎖有兩把,分別對應著隊首和隊尾,同時也有兩個Condition佇列。也就是說,ABQ的取元素和放元素是互斥的,而LBQ則沒有相互關聯,因此就併發性而言,LBQ要優於ABQ。
3.ABQ 的容量是必須需要的,並且不可以擴容,而LBQ的大小可以指定,也可以不指定,不指定的情況下其值為最大整數值。
4.ABQ 支援公平鎖和非公平鎖,而LBQ不可指定,其本身內部使用的也是非公平鎖。
參考資料:
相關推薦
BlockingQueue介面及其實現類的原始碼分析
BlockingQueue是一個阻塞佇列的介面,提供了一系列的介面方法。其中方法主要可以分為三類,包括Insert相關的add、offer、put,remove相關的remove()、poll()、take()方法,以及檢視相關的peek()、element方法等
併發Queue之BlockingQueue介面及其實現類
1、下面先簡單介紹BlockingQueue介面的五個實現:ArrayBlockingQueue:基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長的陣列,以便快取佇列中的資料物件,其內部沒實現讀寫分離,也就意味著生產和消費者不能完全並行。長度
java中List介面的實現類 ArrayList,LinkedList,Vector 的區別 list實現類原始碼分析
java面試中經常被問到list常用的類以及內部實現機制,平時開發也經常用到list集合類,因此做一個原始碼級別的分析和比較之間的差異。 首先看一下List介面的的繼承關係: list介面繼承Col
BlockingQueue介面及實現類分析
1 BlockingQueue 介面及其實現類 BlocingQueue介面定義如下,僅列舉幾個常用方法: put(E) 在佇列尾部放入元素,若佇列滿則等待;take() 取佇列頭部元素返回,若佇列空則等待;offer(E) 在佇列尾部放入元素,若成功則返回tru
四,Java集合類(2)——Set介面及其實現類
1,Set介面及其實現類 Set集合與Collection基本相同,沒有提供任何額外的方法。實際上Set就是Collection,只是行為略有不同。Set集合不允許包含相同的元素,如果試圖把兩個相同的元素加入同一個Set集合中,新增操作失敗,add()方法返回
常用集合類及其特性一之List介面及其實現類
list介面: 1>元素有序(插入有序),元素可重複( 可重複的更確切概念即e1.equals(e2) )。 2>如果列表本身允許null元素的話,允許多個null值。 3>list藉口提供了特殊的迭代器ListIterater,除了允許Itera
Java容器學習筆記(二) Set介面及其實現類的相關知識總結
在Java容器學習筆記(一)中概述了Collection的基本概念及介面實現,並且總結了它的一個重要子介面List及其子類的實現和用法。 本篇主要總結Set介面及其實現類的用法,包括HashSet(無序不重複),LinkedHashSet(按放入順序有序不重複),TreeS
netty中的發動機--EventLoop及其實現類NioEventLoop的原始碼分析
EventLoop 在之前介紹Bootstrap的初始化以及啟動過程時,我們多次接觸了NioEventLoopGroup這個類,關於這個類的理解,還需要了解netty的執行緒模型。NioEventLoopGroup可以理解為一組執行緒,這些執行緒每一個都可以獨立地處理多個channel產生的io事件。 Nio
[五]類載入機制雙親委派機制 底層程式碼實現原理 原始碼分析 java類載入雙親委派機制是如何實現的
Launcher啟動類 本文是雙親委派機制的原始碼分析部分,類載入機制中的雙親委派模型對於jvm的穩定執行是非常重要的 不過原始碼其實比較簡單,接下來簡單介紹一下 我們先從啟動類說起 有一個Launcher類 sun.misc.Launcher; 仔細看下這簡
commons-pool2原始碼走讀(一) 池物件PooledObject介面及其實現
commons-pool2原始碼走讀(一) 池物件PooledObject<T>介面及其實現 PooledObject<T>用來定義池物件的一個wrapper 介面,用於跟蹤物件的附加資訊,比如狀態、建立時間、使用時間等。這個類的實現必須是執行緒安全的。
java基礎類庫學習(二.3)List子介面的實現類
List子介面的實現類:ArrayList/Vector/LinkedList List集合:元素有序。可重複的集合,List集合預設按元素的新增順序設定元素的索引,通過索引來訪問物件 List集合原始碼? public interface List<E> extends
java基礎類庫學習(二.2)Set子介面的實現類
Set子介面的實現類:HashSet/LinkedHashSet/TreeSet/EnumSet/SortedSet Set子介面和Collection父介面原始碼對比? 1Collection父介面原始碼 public interface Collection&l
Java中的執行緒池及其實現類ThreadPoolExecutor
前言:像我們連線資料庫一樣,需要不斷地建立連線,銷燬連線,如果都是人為地一個個建立和銷燬的話會很費勁,所以就誕生了資料庫連線池,執行緒池的產生也是同樣的道理。 執行緒池預先建立了若干數量的執行緒,並且不能由使用者直接對執行緒的建立進行控制,在這個前提下重複使用固定或較為固定數目的執行緒來完成任務
Java_59_陣列_模擬ArrayList容器的底層實現JDK原始碼分析程式碼
package cn.pmcse.myCollection; public class ArrayList { private Object[] value; private in
在介面的實現類裡使用@Override註解報錯
問題分析 @Override註解用來檢測子類對父類或介面的方法的重寫是否正確,但有一次我在Eclipse裡對介面的實現類裡使用@Override註解卻報錯,不過在父類的子類裡使用該註解卻是正常的。 百度了下才知道原來這是jdk1.5時的一個bug,在1.6時已經被修復;那麼問題來了,我使用的jdk是1.8
caffe Layer基類原始碼分析
建構函式 //標頭檔案 include/caffe/layer.hpp //實現檔案 src/caffe/layer.cpp // src/caffe/layer.cu /*
介面呼叫實現類&& 為什麼Autowired定義在介面上
1、介面與回撥 package edu.cqu.interfaceTest; import java.awt.Toolkit; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.u
Java中List及其實現類的解析
集合: 集合,集合是java中提供的一種容器,可以用來儲存多個數據。集合的長度是可變的,集合中儲存的元素必須是引用型別資料。 集合繼承關係圖: List: &n
JavaEE開發service層為什麼要分介面和實現類?
面向介面開發。多人分模組開發時,寫service(業務層)的人將介面定義好提交到SVN,其它層的人直接可以呼叫介面方法,而寫service層的人也可以通過實現類寫具體方法邏輯。達到多人同時開發。
為什麼需要一個介面,一個介面的實現類,而不是直接呼叫類裡的方法
作者:Dion連結:https://www.zhihu.com/question/20111251/answer/14012223來源:知乎著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。“介面是個規範”,這句沒錯。“不如直接就在這個類中寫實現方法豈不是更