1. 程式人生 > >Java並發編程(5)- J.U.C之AQS及其相關組件詳解

Java並發編程(5)- J.U.C之AQS及其相關組件詳解

cached 數字0 f11 一個 就會 interrupt 同步器 long 告訴

J.U.C之AQS-介紹

Java並發包(JUC)中提供了很多並發工具,這其中,很多我們耳熟能詳的並發工具,譬如ReentrangLock、Semaphore,而它們的實現都用到了一個共同的基類--AbstractQueuedSynchronizer(抽象隊列同步器),簡稱AQS。

AQS是JDK提供的一套用於實現基於FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架,它使用一個int類型的volatile變量(命名為state)來維護同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作。

AbstractQueuedSynchronizer中對state的操作是原子的,且不能被繼承。所有的同步機制的實現均依賴於對改變量的原子操作。為了實現不同的同步機制,我們需要創建一個非共有的(non-public internal)擴展了AQS類的內部輔助類來實現相應的同步邏輯。

AbstractQueuedSynchronizer並不實現任何同步接口,它提供了一些可以被具體實現類直接調用的一些原子操作方法來重寫相應的同步邏輯。AQS同時提供了獨占模式(exclusive)和共享模式(shared)兩種不同的同步邏輯。一般情況下,子類只需要根據需求實現其中一種模式,當然也有同時實現兩種模式的同步類,如ReadWriteLock。

使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基於AQS的。

當然,我們自己也能利用AQS非常輕松容易地構造出符合我們自己需求的同步器,由此可知AQS是Java並發包中最為核心的一個基類。

AbstractQueuedSynchronizer底層數據結構是一個雙向鏈表,屬於隊列的一種實現:
技術分享圖片

  • sync queue:同步隊列,其中head節點主要負責後面的調度
  • Condition queue:單向鏈表,不是必須的,只有程序中使用到Condition的時候才會存在,可能會有多個Condition queue

關於AQS裏的state狀態:

我們提到了AbstractQueuedSynchronizer維護了一個volatile int類型的變量,命名為state,用於表示當前同步狀態。volatile雖然不能保證操作的原子性,但是保證了當前變量state的可見性。state的訪問方式有三種:

getState()
setState()
compareAndSetState()

這三種操作均是原子操作,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法。


關於自定義資源共享方式:

AQS支持兩種資源共享方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。這樣方便使用者實現不同類型的同步組件,獨占式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock。總之,AQS為使用提供了底層支撐,如何組裝實現,使用者可以自由發揮。


關於同步器設計:

同步器的設計是基於模板方法模式的,一般的使用方式是這樣:

  • 使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。(這些重寫方法很簡單,無非是對於共享資源state的獲取和釋放)
  • 將AQS組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法。這其實是模板方法模式的一個很經典的應用。

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在底層實現好了。自定義同步器實現時主要實現以下幾種方法:

protected boolean isHeldExclusively()    // 該線程是否正在獨占資源。只有用到condition才需要去實現它。
protected boolean tryAcquire(int)        // 獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
protected boolean tryRelease(int)        // 獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
protected int tryAcquireShared(int)  // 共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
protected boolean tryReleaseShared(int)  // 共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

如何使用:

首先,我們需要去繼承AbstractQueuedSynchronizer這個類,然後我們根據我們的需求去重寫相應的方法,比如要實現一個獨占鎖,那就去重寫tryAcquire,tryRelease方法,要實現共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最後,在我們的組件中調用AQS中的模板方法就可以了,而這些模板方法是會調用到我們之前重寫的那些方法的。也就是說,我們只需要很小的工作量就可以實現自己的同步組件,重寫的那些方法,僅僅是一些簡單的對於共享資源state的獲取和釋放操作,至於像是獲取資源失敗,線程需要阻塞之類的操作,自然是AQS幫我們完成了。

具體實現的思路:

  1. 首先AQS內部維護了一個CLH隊列,來管理鎖
  2. 線程嘗試獲取鎖,如果獲取失敗,則將等待信息等包裝成一個Node結點,加入到同步隊列Sync queue裏
  3. 不斷重新嘗試獲取鎖(當前結點為head的直接後繼才會嘗試),如果獲取失敗,則會阻塞自己,直到被喚醒
  4. 當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程

設計思想:

對於使用者來講,我們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列復雜的實現,這些都在AQS中為我們處理好了。我們只需要負責好自己的那個環節就好,也就是獲取/釋放共享資源state的姿勢。很經典的模板方法設計模式的應用,AQS為我們定義好頂級邏輯的骨架,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現即可。

基於AQS的同步組件:

  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

AQS小結:

  • 使用Node實現FIFO隊列,可以用於構建鎖或者其他同步裝置的基礎框架
  • 利用了一個int類型表示狀態,有一個state的成員變量,表示獲取鎖的線程數(0沒有線程獲取鎖,1有線程獲取鎖,大於1表示重入鎖的數量),和一個同步組件ReentrantLock。狀態信息通過procted級別的getState,setState,compareAndSetState進行操作
  • 使用方法是繼承,然後復寫AQS中的方法,基於模板方法模式
  • 子類通過繼承並通過實現它的方法管理其狀態{acquire和release}的方法操作狀態
  • 可以同時實現排它鎖和共享鎖的模式(獨占、共享)

CountDownLatch

CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執行完後再執行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有框架服務之後執行。

CountDownLatch是通過一個計數器來實現的,計數器的初始化值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就相應得減1。當計數器到達0時,表示所有的線程都已完成任務,然後在閉鎖上等待的線程就可以恢復執行任務。

技術分享圖片

CountDownLatch的構造函數源碼如下:

/**
 * Constructs a {@code CountDownLatch} initialized with the given count.
 *
 * @param count the number of times {@link #countDown} must be invoked
 *        before threads can pass through {@link #await}
 * @throws IllegalArgumentException if {@code count} is negative
 */
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

計數器count是閉鎖需要等待的線程數量,只能被設置一次,且CountDownLatch沒有提供任何機制去重新設置計數器count。

與CountDownLatch的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程後立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。

其他N個線程必須引用CountDownLatch閉鎖對象,因為它們需要通知CountDownLatch對象,它們各自完成了任務;這種通知機制是通過CountDownLatch.countDown()方法來完成的;每調用一次,count的值就減1,因此當N個線程都調用這個方法,count的值就等於0,然後主線程就可以通過await()方法,恢復執行自己的任務。

註:該計數器的操作是原子性的

CountDownLatch使用場景:

  1. 實現最大的並行性:有時我們想同時啟動多個線程,實現最大程度的並行性。例如,我們想測試一個單例類。如果我們創建一個初始計數器為1的CountDownLatch,並讓其他所有線程都在這個鎖上等待,只需要調用一次countDown()方法就可以讓其他所有等待的線程同時恢復執行。
  2. 開始執行前等待N個線程完成各自任務:例如應用程序啟動類要確保在處理用戶請求前,所有N個外部系統都已經啟動和運行了。
  3. 死鎖檢測:一個非常方便的使用場景是你用N個線程去訪問共享資源,在每個測試階段線程數量不同,並嘗試產生死鎖。

使用示例

1.基本用法:

@Slf4j
public class CountDownLatchExample1 {
    private final static int THREAD_COUNT = 200;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (InterruptedException e) {
                    log.error("", e);
                } finally {
                    // 為防止出現異常,放在finally更保險一些
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(100);
        log.info("{}", threadNum);
        TimeUnit.MILLISECONDS.sleep(100);
    }
}

2.比如有多個線程完成一個任務,但是這個任務只想給它一個指定的時間,超過這個任務就不繼續等待了,完成多少算多少:

// 等待指定的時間 參數1:等待時間,參數2:時間單位
countDownLatch.await(10, TimeUnit.MILLISECONDS);

關於CountDownLatch的其他例子可以參考我另一篇文章:

  • CountDownLatch類的使用

Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。很多年以來,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,後面的車會看到紅燈,不能駛入XX馬路,但是如果前一百輛中有五輛車已經離開了XX馬路,那麽後面就允許有5輛車駛入馬路,這個例子裏說的車就是線程,駛入馬路就表示線程在執行,離開馬路就表示線程執行完成,看見紅燈就表示線程被阻塞,不能執行。

技術分享圖片

所以簡單來說,Semaphore主要作用就是可以控制同一時間並發執行的線程數。Semaphore有兩個構造函數,參數permits表示許可數,它最後傳遞給了AQS的state值。線程在運行時首先獲取許可,如果成功,許可數就減1,線程運行,當線程運行結束就釋放許可,許可數就加1。如果許可數為0,則獲取失敗,線程位於AQS的等待隊列中,它會被其它釋放許可的線程喚醒。在創建Semaphore對象的時候還可以指定它的公平性。一般常用非公平的信號量,非公平信號量是指在獲取許可時先嘗試獲取許可,而不必關心是否已有需要獲取許可的線程位於等待隊列中,如果獲取失敗,才會入列。而公平的信號量在獲取許可時首先要查看等待隊列中是否已有線程,如果有則入列。

技術分享圖片

使用場景:

Semaphore可以用於做流量控制,特別公用資源有限的應用場景,比如數據庫連接。假如有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程並發的讀取,但是如果讀到內存後,還需要存儲到數據庫中,而數據庫的連接數只有10個,這時我們必須控制只有十個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,我們就可以使用Semaphore來做流控。

使用示例

1.每次獲取1個許可示例:

public class SemaphoreExample1 {
    private final static int THREAD_COUNT = 200;

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(10);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    // 獲取一個許可
                    semaphore.acquire();
                    System.out.println(threadNum);
                    // 釋放一個許可
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        exec.shutdown();
    }
}

在代碼中,雖然有200個線程在執行,但是只允許10個並發的執行。Semaphore的構造方法Semaphore(int permits) 接收一個整型的數字,表示可用的許可證數量。所以Semaphore(10)表示允許10個線程獲取許可證,也就是最大並發數是10。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()獲取一個許可證,使用完之後調用release()歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

2.如何希望每次獲取多個許可的話,只需要在acquire()方法的參數中進行指定即可,如下示例:

// 獲取多個許可
semaphore.acquire(3);
System.out.println(threadNum);
// 釋放多個許可
semaphore.release(3);

3.當並發很高,想要超過允許的並發數之後,就丟棄不處理的話,可以使用Semaphore裏的tryAcquire()方法嘗試獲取許可,該方法返回boolean類型的值,我們可以通過判斷這個值來拋棄超過並發數的請求。如下示例:

public class SemaphoreExample3 {
    private final static int THREAD_COUNT = 200;

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(10);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    // 嘗試獲取一個許可,若沒有獲取到許可的線程就會被拋棄,而不是阻塞
                    if (semaphore.tryAcquire()) {
                        System.out.println(threadNum);
                        // 釋放一個許可
                        semaphore.release();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        exec.shutdown();
    }
}

Semaphore中嘗試獲取許可的相關方法:
技術分享圖片

我們可以指定嘗試獲取許可的超時時間,例如我設置超時時間為1秒:

// 嘗試獲取一個許可,直到超過一秒
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
    System.out.println(threadNum);
    // 釋放一個許可
    semaphore.release();
}

除此之外,還可以嘗試獲取多個許可,並且指定超時時間:

// 嘗試獲取多個許可,直到超過一秒
if (semaphore.tryAcquire(3, 1, TimeUnit.SECONDS)) {
    System.out.println(threadNum);
    // 釋放多個許可
    semaphore.release(3);
}

Semaphore中其他一些常用的方法:

int availablePermits()             // 返回此信號量中當前可用的許可證數。
int getQueueLength()               // 返回正在等待獲取許可證的線程數。
boolean hasQueuedThreads()         // 是否有線程正在等待獲取許可證。
void reducePermits(int reduction)  // 減少reduction個許可證。是個protected方法。
Collection getQueuedThreads()      // 返回所有等待獲取許可證的線程集合。是個protected方法。

CyclicBarrier

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。當某個線程調用了await方法之後,就會進入等待狀態,並將計數器-1,直到所有線程調用await方法使計數器為0,才可以繼續執行,由於計數器可以重復使用,所以我們又叫他循環屏障。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。

技術分享圖片

CyclicBarrier的應用場景:

CyclicBarrier可以用於多線程計算數據,最後合並計算結果的應用場景。比如我們用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個帳戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個sheet裏的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

CyclicBarrier和CountDownLatch的區別:

  • CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業務場景,比如如果計算發生錯誤,可以重置計數器,並讓線程們重新執行一次。
  • CountDownLatch主要用於實現一個或n個線程需要等待其他線程完成某項操作之後,才能繼續往下執行,描述的是一個或n個線程等待其他線程的關系,而CyclicBarrier是多個線程相互等待,知道滿足條件以後再一起往下執行。描述的是多個線程相互等待的場景
  • CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數量。isBroken方法用來知道阻塞的線程是否被中斷。

CyclicBarrier方法列表:
技術分享圖片

使用示例

1.基本使用:

@Slf4j
public class CyclicBarrierExample1 {
    // 給定一個值,說明有多少個線程同步等待
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int num = i;
            // 延遲1秒,方便觀察
            Thread.sleep(1000);
            exec.execute(() -> {
                try {
                    CyclicBarrierExample1.race(num);
                } catch (Exception e) {
                    log.error("", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void race(int num) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", num);
        // 阻塞線程
        barrier.await();
        log.info("{} continue", num);
    }
}

以防await無限阻塞進程,我們可以設置await的超時時間,修改race方法代碼如下:

private static void race(int num) throws Exception {
    Thread.sleep(1000);
    log.info("{} is ready", num);
    try {
        // 由於設置了超時時間後阻塞的線程可能會被中斷,拋出BarrierException異常,如果想繼續往下執行,需要加上try-catch
        barrier.await(2000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
        // isBroken方法用來知道阻塞的線程是否被中斷
        log.warn("exception occurred {} {}. isBroken : {}", e.getClass().getName(), e.getMessage(), barrier.isBroken());
    }
    log.info("{} continue", num);
}

如果希望當所有線程到達屏障後就執行一個runnable的話,可以使用CyclicBarrier(int parties, Runnable barrierAction)構造函數傳遞一個runnable實例。如下示例:

/**
 * 當線程全部到達屏障時,優先執行這裏傳入的runnable
 */
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> log.info("callback is running"));

ReentrantLock

在Java裏一共有兩類鎖,一類是synchornized同步鎖,還有一種是JUC裏提供的鎖Lock,Lock是個接口,其核心實現類就是ReentrantLock。

synchornized與ReentrantLock的區別對比如下表:

對比維度 synchornized ReentrantLock
可重入性(線程進入鎖的時候計數器就自增1,計數器下降為0則會釋放鎖) 可重入 可重入
鎖的實現 JVM實現,很難操作源碼 JDK實現,可以觀察其源碼
性能 在引入偏向鎖、輕量級鎖/自旋鎖後性能大大提升,官方建議無特殊要求時盡量使用synchornized,並且新版本的一些jdk源碼都由之前的ReentrantLock改成了synchornized 與優化後的synchornized相差不大
功能區別 方便簡潔,由編譯器負責加鎖和釋放鎖 需手工操作鎖的加鎖和釋放
鎖粒度 粗粒度,不靈活 細粒度,可靈活控制
可否指定公平鎖 不可以 可以
可否放棄鎖 不可以 可以

ReentrantLock實現:

  • 采用自旋鎖,循環調用CAS操作來實現加鎖,避免了使線程進入內核態的阻塞狀態。想盡辦法避免線程進入內核態的阻塞狀態,是我們分析和理解鎖設計的關鍵鑰匙。

ReentrantLock獨有的功能:

  • 可指定是公平鎖還是非公平鎖,所謂公平鎖就是先等待的線程先獲得鎖
  • 提供了一個Condition類,可以分組喚醒需要喚醒的線程
  • 提供能夠中斷等待鎖的線程的機制,lock.lockInterruptibly()

在ReentrantLock中,對於公平和非公平的定義是通過對同步器AQS的擴展加以實現的,也就是在tryAcquire的實現上做了語義的控制。

這裏提到一個鎖獲取的公平性問題,如果在絕對時間上,先對鎖進行獲取的請求一定被先滿足,那麽這個鎖是公平的,反之,是不公平的,也就是說等待時間最長的線程最有機會獲取鎖,也可以說鎖的獲取是有序的。ReentrantLock這個鎖提供了一個構造函數,能夠控制這個鎖是否是公平的。

而鎖的名字也是說明了這個鎖具備了重復進入的可能,也就是說能夠讓當前線程多次的進行對鎖的獲取操作,這樣的最大次數限制是Integer.MAX_VALUE,約21億次左右。

事實上公平的鎖機制往往沒有非公平的效率高,因為公平的獲取鎖沒有考慮到操作系統對線程的調度因素,這樣造成JVM對於等待中的線程調度次序和操作系統對線程的調度之間的不匹配。對於鎖的快速且重復的獲取過程中,連續獲取的概率是非常高的,而公平鎖會壓制這種情況,雖然公平性得以保障,但是響應比卻下降了,但是並不是任何場景都是以TPS作為唯一指標的,因為公平鎖能夠減少“饑餓”發生的概率,等待越久的請求越是能夠得到優先滿足。

要放棄synchronized?

從上邊的介紹,看上去ReentrantLock不僅擁有synchronized的所有功能,而且有一些功能synchronized無法實現的特性。性能方面,ReentrantLock也不比synchronized差,那麽到底我們要不要放棄使用synchronized呢?答案是不要這樣做。

J.U.C包中的鎖定類是用於高級情況和高級用戶的工具,除非說你對Lock的高級特性有特別清楚的了解以及有明確的需要,或這有明確的證據表明同步已經成為可伸縮性的瓶頸的時候,否則我們還是繼續使用synchronized。相比較這些高級的鎖定類,synchronized還是有一些優勢的,比如synchronized不可能忘記釋放鎖。還有當JVM使用synchronized管理鎖定請求和釋放時,JVM在生成線程轉儲時能夠包括鎖定信息,這些信息對調試非常有價值,它們可以標識死鎖以及其他異常行為的來源。

如何選擇鎖:

  • 若業務邏輯需使用到鎖的高級功能去實現,那麽就可以選擇ReentrantLock
  • 需要細粒度操作鎖時,選擇ReentrantLock
  • 對ReentrantLock的機制很了解,有足夠經驗能夠避免死鎖的出現的開發者,可以選擇ReentrantLock,不建議對鎖機制不是很熟悉的開發者使用ReentrantLock
  • 對鎖的需求較簡單,使用synchornized
  • 初級開發者建議使用synchornized

使用示例

基本使用:

@Slf4j
public class LockExample2 {
    /**
     * 請求總數
     */
    public static int clientTotal = 5000;

    /**
     * 同時並發執行的線程數量
     */
    public static int threadTotal = 200;

    /**
     * 計數
     */
    private static int count = 0;

    /**
     * 鎖對象,默認是使用非公平鎖,可以傳入true和false來決定使用公平所還是非公平鎖
     */
    private final static Lock LOCK = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(threadTotal);
        CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(() -> {
                try {
                    // 從信號量獲取執行許可,若並發達到設定的數量,那麽就不會獲取到許可,將會阻塞當前線程,直到能夠獲取到執行許可為止
                    semaphore.acquire();
                    LockExample2.add();
                    // 釋放當前線程
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("", e);
                }
                countDownLatch.countDown();
            });
        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("count: {}", count);
    }

    private static void add() {
        // 加鎖
        LOCK.lock();
        try {
            count++;
        } finally {
            // 解鎖
            LOCK.unlock();
        }
    }
}

在ReentrantLock 中,lock()方法是一個無條件的鎖,與synchronize意思差不多,但是另一個方法 tryLock()方法只有在成功獲取了鎖的情況下才會返回true,如果別的線程當前正持有鎖,則會立即返回false。如果為這個方法加上timeout參數,則會在等待timeout的時間才會返回false或者在獲取到鎖的時候返回true。

其他常用方法:

boolean isHeldByCurrentThread();   // 當前線程是否保持鎖定
boolean isLocked()  // 是否存在任意線程持有鎖資源
void lockInterruptbly()  // 如果當前線程未被中斷,則獲取鎖定;如果已中斷,則拋出異常(InterruptedException)
int getHoldCount()   // 查詢當前線程保持此鎖定的個數,即調用lock()方法的次數
int getQueueLength()   // 返回正等待獲取此鎖定的預估線程數
int getWaitQueueLength(Condition condition)  // 返回與此鎖定相關的約定condition的線程預估數
boolean hasQueuedThread(Thread thread)  // 當前線程是否在等待獲取鎖資源
boolean hasQueuedThreads()  // 是否有線程在等待獲取鎖資源
boolean hasWaiters(Condition condition)  // 是否存在指定Condition的線程正在等待鎖資源
boolean isFair()   // 是否使用的是公平鎖

Condition

Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被調用)時 ,這些等待線程才會被喚醒,從而重新爭奪鎖。

Condition可以非常靈活的操作線程的喚醒,下面是一個線程等待與喚醒的例子,其中用1、2、3、4序號標出了日誌輸出順序:

@Slf4j
public class LockExample6 {
    public static void main(String[] args) {
        // 構建ReentrantLock實例
        ReentrantLock reentrantLock = new ReentrantLock();
        // 從reentrantLock實例裏獲取condition實例
        Condition condition = reentrantLock.newCondition();

        // 線程1
        new Thread(() -> {
            try {
                // 線程1調用了lock方法,這時線程1就會加入到了AQS的等待隊裏面去
                reentrantLock.lock();
                log.info("wait signal"); // 1 等待信號
                // 調用await方法後,線程1就會從AQS隊列裏移除,這裏其實就已經釋放了鎖,然後線程1會馬上進入到condition隊列裏面去,等待一個信號
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal");  // 4 得到信號
            // 線程1釋放鎖,整個過程執行完畢
            reentrantLock.unlock();
        }).start();

        // 線程2
        new Thread(() -> {
            // 由於線程1中調用了await釋放了鎖的關系,所以線程2就會被喚醒獲取到鎖,加入到AQS等待隊列中
            reentrantLock.lock();
            log.info("get lock");  // 2 獲取鎖
            try {
                // 睡眠3秒
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 調用signalAll發送信號的方法,此時condition等待隊列裏線程1所在的節點元素就會被取出,然後重新放到AQS等待隊列裏(註意此時線程1還沒有被喚醒)
            condition.signalAll();
            log.info("send signal ~ ");   // 3 發送信號
            // 線程2釋放鎖,這時候AQS隊列中只剩下線程1,然後AQS會按照從頭到尾的順序喚醒線程,於是線程1開始執行
            reentrantLock.unlock();
        }).start();
    }
}

可以看到,整個協調通信的過程是靠線程所在的節點在AQS的等待隊列和condition的等待隊列中來回移動實現的。condition作為一個條件類很好的維護了一個等待信號的隊列,並在signal 或者 signalAll方法被調用後,將等待的線程節點重新放回AQS的等待隊列中,從而實現喚醒線程的操作。


ReentrantReadWriteLock

ReentrantReadWriteLock是Lock的另一種實現方式,我們已經知道了ReentrantLock是一個排他鎖,同一時間只允許一個線程訪問,而ReentrantReadWriteLock允許多個讀線程同時訪問,但不允許寫線程和讀線程、寫線程和寫線程同時訪問。在沒有任何讀寫鎖的時候才能取得寫入的鎖,可用於實現悲觀讀取。相對於排他鎖,提高了並發性。在實際應用中,大部分情況下對共享數據(如緩存)的訪問都是讀操作遠多於寫操作,這時ReentrantReadWriteLock能夠提供比排他鎖更好的並發性和吞吐量,所以讀寫鎖適用於讀多寫少的情況。但讀多寫少的場景下可能會令寫入線程遭遇饑餓,即寫入線程遲遲無法獲取到鎖資源而處於等待狀態。

與互斥鎖相比,使用讀寫鎖能否提升性能則取決於讀寫操作期間讀取數據相對於修改數據的頻率,以及數據的爭用——即在同一時間試圖對該數據執行讀取或寫入操作的線程數。

讀寫鎖內部維護了兩個鎖,一個用於讀操作,一個用於寫操作。所有 ReadWriteLock實現都必須保證 writeLock操作的內存同步效果也要保持與相關 readLock的聯系。也就是說,成功獲取讀鎖的線程會看到寫入鎖之前版本所做的所有更新。

ReentrantReadWriteLock支持以下功能:

1.非公平模式(默認):連續競爭的非公平鎖可能無限期地推遲一個或多個reader或writer線程,但吞吐量通常要高於公平鎖。

2.公平模式:線程利用一個近似到達順序的策略來爭奪進入。當釋放當前保持的鎖時,可以為等待時間最長的單個writer線程分配寫入鎖,如果有一組等待時間大於所有正在等待的writer線程的reader,將為該組分配讀者鎖。試圖獲得公平寫入鎖的非重入的線程將會阻塞,除非讀取鎖和寫入鎖都自由(這意味著沒有等待線程)。

3.支持可重入。讀線程在獲取了讀鎖後還可以獲取讀鎖;寫線程在獲取了寫鎖之後既可以再次獲取寫鎖又可以獲取讀鎖

4.還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不允許的

5.讀取鎖和寫入鎖都支持鎖獲取期間的中斷

6.Condition支持。僅寫入鎖提供了一個 Conditon 實現;讀取鎖不支持 Conditon ,readLock().newCondition() 會拋出 UnsupportedOperationException。

7.監測:此類支持一些確定是讀取鎖還是寫入鎖的方法。這些方法設計用於監視系統狀態,而不是同步控制。

例如我現在有一個類,裏面有一個map集合,我們都知道操作map時都是讀多寫少的,所以我希望在對其讀寫的時候能夠進行一些線程安全的保護,這時我們就可以使用到ReentrantReadWriteLock。示例代碼如下:

public class LockExample3 {
    private final Map<String, Data> map = new TreeMap<>();
    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final Lock readLock = readWriteLock.readLock();
    private final Lock writeLock = readWriteLock.writeLock();

    public Data get(String key) {
        // 讀鎖
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        // 讀鎖
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        // 在沒有任何讀寫鎖的時候才會進行寫入操作
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    class Data {
    }
}

StempedLock

StampedLock是Java8引入的一種新的鎖機制,簡單的理解,可以認為它是讀寫鎖的一個改進版本,讀寫鎖雖然分離了讀和寫的功能,使得讀與讀之間可以完全並發,但是讀和寫之間依然是沖突的,讀鎖會完全阻塞寫鎖,它使用的依然是悲觀的鎖策略。如果有大量的讀線程,它也有可能引起寫線程的饑餓。而StampedLock則提供了一種樂觀的讀策略,這種樂觀策略的鎖非常類似於無鎖的操作,使得樂觀鎖完全不會阻塞寫線程。

StempedLock控制鎖有三種形式,分別是寫,讀,和樂觀讀,重點在樂觀鎖。一個StempedLock,狀態是由版本和模式兩個部分組成。鎖獲取的方法返回的是一個數字作為票據(Stempe),他用相應的鎖狀態來表示並控制相關的訪問,數字0表示沒有寫鎖被授權訪問,在讀鎖上分為悲觀讀和樂觀讀。

所謂的樂觀讀模式,也就是若讀的操作很多,寫的操作很少的情況下,你可以樂觀地認為,寫入與讀取同時發生幾率很少,因此不悲觀地使用完全的讀取鎖定,程序可以查看讀取資料之後,是否遭到寫入執行的變更,再采取後續的措施(重新讀取變更信息,或者拋出異常) ,這一個小小改進,可大幅度提高程序的吞吐量

適用場景:

樂觀讀取模式僅用於短時間讀取操作時經常能夠降低競爭和提高吞吐量。當然,它的使用在本質上是脆弱的。樂觀讀取的區域應該只包括字段,並且在validation之後用局部變量持有它們從而在後續使用。樂觀模式下讀取的字段值很可能是非常不一致的,所以它應該只用於那些你熟悉如何展示數據,從而你可以不斷檢查一致性和調用方法validate

優化點:

1.樂觀讀不阻塞悲觀讀和寫操作,有利於獲得寫鎖

2.隊列頭結點采用有限次數SPINS次自旋(增加開銷),增加獲得鎖幾率(因為闖入的線程會競爭鎖),有效夠降低上下文切換

3.讀模式的集合通過一個公共節點被聚集在一起(cowait鏈),當隊列尾節點為RMODE,通過CAS方法將該節點node添加至尾節點的cowait鏈中,node成為cowait中的頂元素,cowait構成了一個LIFO隊列。

4.不支持鎖重入,如果只悲觀讀鎖和寫鎖,效率沒有ReentrantReadWriteLock高。

基本使用示例:

public class LockExample5 {
    private final static StampedLock LOCK = new StampedLock();

    private static void add() {
        // 加寫鎖
        long stamp = LOCK.writeLock();
        try {
            count++;
        } finally {
            // 解鎖需要傳入加鎖時返回的stamp
            LOCK.unlock(stamp);
        }
    }
}

其實在StempedLock的源碼中,提供了一段示例代碼,但沒有相應的註釋,所以這裏對該示例代碼給出一些註釋。如下:

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) { // an exclusively locked method
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 樂觀讀鎖案例
    double distanceFromOrigin() { // A read-only method
        long stamp = sl.tryOptimisticRead(); //獲得一個樂觀讀鎖
        double currentX = x, currentY = y;  //將兩個字段讀入本地局部變量
        if (!sl.validate(stamp)) { //檢查發出樂觀讀鎖後同時是否有其他寫鎖發生?
            stamp = sl.readLock();  //如果沒有,我們再次獲得一個讀悲觀鎖
            try {
                currentX = x; // 將兩個字段讀入本地局部變量
                currentY = y; // 將兩個字段讀入本地局部變量
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    // 悲觀讀鎖案例
    void moveIfAtOrigin(double newX, double newY) { // upgrade
        // Could instead start with optimistic, not read mode
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) { //循環,檢查當前狀態是否符合
                long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉為寫鎖
                if (ws != 0L) { //這是確認轉為寫鎖是否成功
                    stamp = ws; //如果成功 替換票據
                    x = newX; //進行狀態改變
                    y = newY;  //進行狀態改變
                    break;
                } else { //如果不能成功轉換為寫鎖
                    sl.unlockRead(stamp);  //我們顯式釋放讀鎖
                    stamp = sl.writeLock();  //顯式直接進行寫鎖 然後再通過循環再試
                }
            }
        } finally {
            sl.unlock(stamp); //釋放讀鎖或寫鎖
        }
    }
}

下圖是和ReadWritLock相比,在一個線程情況下,是讀速度其4倍左右,寫是1倍:
技術分享圖片

下圖是六個線程情況下,讀性能是其幾十倍,寫性能也是近10倍左右:
技術分享圖片

下圖是吞吐量提高:
技術分享圖片

StampedLock 小結:

StampedLock 對吞吐量有巨大的改進,特別是在讀線程越來越多的場景下。但StampedLock有一個復雜的API,對於加鎖操作,很容易誤用其他方法。StampedLock 可以說是Lock的一個很好的補充,吞吐量以及性能上的提升足以打動很多人了,但並不是說要替代之前Lock的東西,畢竟它還是有些應用場景的,起碼API比StampedLock容易入手

總結關於鎖的幾個類:

  • synchronized:JVM實現,不但可以通過一些監控工具監控,而且在出現未知異常的時候JVM也會自動幫我們釋放鎖
  • ReentrantLock、ReentrantRead/WriteLock、StempedLock 他們都是對象層面的鎖定,要想保證鎖一定被釋放,要放到finally裏面,才會更安全一些。StempedLock對性能有很大的改進,特別是在讀線程越來越多的情況下。

如何使用:

  1. 在只有少量競爭者的時候,synchronized是一個很好的鎖的實現
  2. 競爭者不少,但是增長量是可以預估的,ReentrantLock是一個很好的鎖的通用實現(適合使用場景的才是最好的,不是越高級越好)

部分參考:

https://blog.csdn.net/luoyuyou/article/details/30259877
http://www.importnew.com/14941.html

Java並發編程(5)- J.U.C之AQS及其相關組件詳解