1. 程式人生 > 其它 >淺談 Java 併發程式設計中的若干核心技術

淺談 Java 併發程式設計中的若干核心技術

作者:一字馬胡 原文:http://www.jianshu.com/p/5f499f8212e7

索引

  1. Java執行緒
  2. 執行緒模型
  3. Java執行緒池
  4. Future(各種Future)
  5. Fork/Join框架
  6. volatile
  7. CAS(原子操作)
  8. AQS(併發同步框架)
  9. synchronized(同步鎖)
  10. 併發佇列(阻塞佇列)

本文僅分析java併發程式設計中的若干核心問題,對於上面沒有提到但是又和java併發程式設計有密切關係的技術將會不斷新增進來完善文章,本文將長期更新,不斷迭代。本文試圖從一個更高的視覺來總結Java語言中的併發程式設計內容,希望閱讀完本文之後,可以收穫一些內容,至少應該知道在java中做併發程式設計實踐的時候應該注意什麼,應該關注什麼,如何保證執行緒安全,以及如何選擇合適的工具來滿足需求。

當然,更深層次的內容就會涉及到jvm層面的知識,包括底層對java記憶體的管理,對執行緒的管理等較為核心的問題,當然,本文的定位在於抽象與總結,更為具體而深入的內容就需要自己去實踐,考慮到可能篇幅過長、重複描述某些內容,以及自身技術深度等原因,本文將在深度和廣度上做一些權衡,某些內容會做一些深入的分析,而有些內容會一帶而過,點到為止,總之,本文就當是對學習java併發程式設計內容的一個總結,以及給哪些希望快速瞭解java併發程式設計內容的讀者拋磚引玉,不足之處還望指正。

Java執行緒

一般來說,在java中實現高併發是基於多執行緒程式設計的,所謂併發,也就是多個執行緒同時工作,來處理我們的業務,在機器普遍多核心的今天,併發程式設計的意義極為重大,因為我們有多個cpu供執行緒使用,如果我們的應用依然只使用單執行緒模式來工作的話,對極度浪費機器資源的。所以,學習java併發知識的首要問題是:如何建立一個執行緒,並且讓這個執行緒做一些事情?這是java併發程式設計內容的起點,下面將分別介紹多個建立執行緒,並且讓執行緒做一些事情的方法。

繼承Thread類

繼承Thread類,然後重寫run方法,這是第一種建立執行緒的方法。run方法裡面就是我們要做的事情,可以在run方法裡面寫我們想要在新的執行緒裡面執行的任務,下面是一個小例子,我們繼承了Thread類,並且在run方法裡面打印出了當然執行緒的名字,然後sleep1秒中之後就退出了:

/**
 * Created by hujian06 on 2017/10/31.
 * 
 * the demo of thread
 */
public class ThreadDemo {

    public static void main(String ... args) {

        AThread aThread = new AThread();

        //start the thread
        aThread.start();

    }

}

class AThread extends Thread {
    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

如果我們想要啟動這個執行緒,只需要像上面程式碼中那樣,呼叫Thread類的start方法就可以了。

實現Runnable介面

啟動一個執行緒的第二種方法是實現Runnable介面,然後實現其run方法,將你想要在新執行緒裡面執行的業務程式碼寫在run方法裡面,下面的例子展示了這種方法啟動執行緒的示例,實現的功能和上面的第一種示例是一樣的:

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Runnable
 */
public class ARunnableaDemo {

    public static void main(String ... args) {

        ARunnanle aRunnanle = new ARunnanle();
        Thread thread = new Thread(aRunnanle);

        thread.start();

    }

}

class ARunnanle implements Runnable {

    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在啟動執行緒的時候,依然還是使用了Thread這個類,只是我們在建構函式中將我們實現的Runnable物件傳遞進去了,所以在我們執行Thread類的start方法的時候,實際執行的內容是我們的Runnable的run方法。

使用FutureTask

啟動一個新的執行緒的第三種方法是使用FutureTask,下面來看一下FutureTask的類圖,就可以明白為什麼可以使用FutureTask來啟動一個新的執行緒了:

從FutureTask的類圖中可以看出,FutureTask實現了Runnable介面和Future介面,所以它兼備Runnable和Future兩種特性,下面先來看看如何使用FutureTask來啟動一個新的執行緒:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of FutureTask
 */
public class FutureTaskDemo {

    public static void main(String ... args) {

        ACallAble callAble = new ACallAble();

        FutureTask<String> futureTask = new FutureTask<>(callAble);

        Thread thread = new Thread(futureTask);

        thread.start();

        do {

        }while (!futureTask.isDone());

        try {
            String result = futureTask.get();

            System.out.println("Result:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

}

class ACallAble implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "Thread-Name:" + 
                Thread.currentThread().getName();
    }
}

可以看到,使用FutureTask來啟動一個執行緒之後,我們可以監控這個執行緒是否完成,上面的示例中主執行緒會一直等待這個新建立的執行緒直到它返回,其實只要是Future提供的介面,我們在FutureTask中都可以使用,這極大的方便了我們,Future在併發程式設計中的意義極為重要,Future代表一個未來會發生的東西,它是一種暗示,一種佔位符,它示意我們它可能不會立即得到結果,因為它的任務還在執行,但是我們可以得到一個對這個執行緒的監控物件,我們可以對執行緒的執行做一些判斷,甚至是控制,比如,如果我們覺得我們等了太久,並且我們覺得沒有必要再等待下去的時候,就可以將這個Task取消,還有一點需要提到的是,Future代表它可能正在執行,也可能已經返回,當然Future更多的暗示你可以在等待這個結果的同時可以使用其他的執行緒做一些其他的事情,當你真的需要這個結果的時候再來獲取就可以了,這就是併發,理解這一點非常重要。

本小節通過介紹三種建立並啟動一個新執行緒的方法,為進行併發程式設計開了一個頭,目前,我們還只是在能建立多個執行緒,然後讓多個執行緒做不同個的事情的階段,當然,這是學習併發程式設計最為基礎的,無論如何,現在,我們可以讓我們的應用執行多個執行緒了,下面的文章將會基於這個假設(一個應用開啟了多個執行緒)討論一些併發程式設計中值得關注的內容。關於本小節更為詳細的內容,可以參考文章Java CompletableFuture中的部分內容。

執行緒模型

我們現在可以啟動多個執行緒,但是好像並沒有形成一種類似於模型的東西,非常混亂,並且到目前為止我們的多個執行緒依然只是各自做各自的事情,互不相干,多個執行緒之間並沒有互動(通訊),這是最簡單的模型,也是最基礎的模型,本小節試圖介紹執行緒模型,一種指導我們的程式碼組織的思想,執行緒模型確定了我們需要處理那些多執行緒的問題,在一個系統中,多個執行緒之間沒有通訊是不太可能的,更為一般的情況是,多個執行緒共享一些資源,然後相互競爭來獲取資源許可權,多個執行緒相互配合,來提高系統的處理能力。正因為多個執行緒之間會有通訊互動,所以本文接下來的討論才有了意義,如果我們的系統裡面有幾百個執行緒在工作,但是這些執行緒互不相干,那麼這樣的系統要麼實現的功能非常單一,要麼毫無意義(當然不是絕對的,比如Netty的執行緒模型)。

繼續來討論執行緒模型,上面說到執行緒模型是一種指導程式碼組織的思想,這是我自己的理解,不同的執行緒模型需要我們使用不同的程式碼組織,好的執行緒模型可以提高系統的併發度,並且可以使得系統的複雜度降低,這裡需要提一下Netty 4的執行緒模型,Netty 4的執行緒模型使得我們可以很容易的理解Netty的事件處理機制,這種優秀的設計基於Reactor執行緒模型,Reactor執行緒模型分為單執行緒模型、多執行緒模型以及主從多執行緒模型,Netty的執行緒模型類似於Reactor主從多執行緒模型。

當然執行緒模型是一種更高級別的併發程式設計內容,它是一種程式設計指導思想,尤其在我們進行底層框架設計的時候特別需要注意執行緒模型,因為一旦執行緒模型設計不合理,可能會導致後面框架程式碼過於複雜,並且可能因為執行緒同步等問題造成問題不可控,最終導致系統執行失控。類似於Netty的執行緒模型是一種好的執行緒模型,下面展示了這種模型:

簡單來說,Netty為每個新建立的Channel分配一個NioEventLoop,而每個NioEventLoop內部僅使用一個執行緒,這就避免了多執行緒併發的同步問題,因為為每個Channel處理的執行緒僅有一個,所以不需要使用鎖等執行緒同步手段來做執行緒同步,在我們的系統設計的時候應該借鑑這種執行緒模型的設計思路,可以避免我們走很多彎路。關於執行緒池以及Netty執行緒池這部分的內容,可以參考文章Netty執行緒模型及EventLoop詳解。

Java執行緒池

池化技術是一種非常有用的技術,對於執行緒來說,建立一個執行緒的代價是很高的,如果我們在建立了一個執行緒,並且讓這個執行緒做一個任務之後就回收的話,那麼下次要使用執行緒來執行我們的任務的時候又需要建立一個新的執行緒,是否可以在建立完成一個執行緒之後一直緩衝,直到系統關閉的時候再進行回收呢?java執行緒池就是這樣的元件,使用執行緒池,就沒必要頻繁建立執行緒,執行緒池會為我們管理執行緒,當我們需要一個新的執行緒來執行我們的任務的時候,就向執行緒池申請,而執行緒池會從池子裡面找到一個空閒的執行緒返回給請求者,如果池子裡面沒有可用的執行緒,那麼執行緒池會根據一些引數指標來建立一個新的執行緒,或者將我們的任務提交到任務佇列中去,等待一個空閒的執行緒來執行這個任務。細節內容在下文中進行分析,目前我們只需要明白,執行緒池裡面有很多執行緒,這些執行緒會一直到系統關係才會被回收,否則一直會處於處理任務或者等待處理任務的狀態。

首先,如何使用執行緒池呢?下面的程式碼展示瞭如何使用java執行緒池的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Executors
 */
public class ExecutorsDemo {

    public static void main(String ... args) {

        int cpuCoreCount = Runtime.getRuntime().availableProcessors();
        AThreadFactory threadFactory = new AThreadFactory();
        ARunnanle runnanle = new ARunnanle();

        ExecutorService fixedThreadPool=
                Executors.newFixedThreadPool(cpuCoreCount, threadFactory);

        ExecutorService cachedThreadPool = 
                Executors.newCachedThreadPool(threadFactory);

        ScheduledExecutorService newScheduledThreadPool = 
                Executors.newScheduledThreadPool(cpuCoreCount, threadFactory);

        ScheduledExecutorService singleThreadExecutor = 
                Executors.newSingleThreadScheduledExecutor(threadFactory);

        fixedThreadPool.submit(runnanle);
        cachedThreadPool.submit(runnanle);
        newScheduledThreadPool.scheduleAtFixedRate(runnanle, 0, 1, TimeUnit.SECONDS);
        singleThreadExecutor.scheduleWithFixedDelay(runnanle, 0, 100, TimeUnit.MILLISECONDS);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        fixedThreadPool.shutdownNow();
        cachedThreadPool.shutdownNow();
        newScheduledThreadPool.shutdownNow();
        singleThreadExecutor.shutdownNow();
    }

}

class ARunnable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Current Thread Name:" + 
                Thread.currentThread().getName());
    }
}

/**
 * the thread factory
 */
class AThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread("aThread-" + threadNumber.incrementAndGet());
    }
}

更為豐富的應用應該自己去探索,結合自身的需求來藉助執行緒池來實現,下面來分析一下Java執行緒池實現中幾個較為重要的內容。

ThreadPoolExecutor和ScheduledThreadPoolExecutor

ThreadPoolExecutor和ScheduledThreadPoolExecutor是java實現執行緒池的核心類,不同型別的執行緒池其實就是在使用不同的建構函式,以及不同的引數來構造出ThreadPoolExecutor或者ScheduledThreadPoolExecutor,所以,學習java執行緒池的重點也在於學習這兩個核心類。前者適用於構造一般的執行緒池,而後者繼承了前者,並且很多內容是通用的,但是ScheduledThreadPoolExecutor增加了schedule功能,也就是說,ScheduledThreadPoolExecutor使用於構造具有排程功能的執行緒池,在需要週期性排程執行的場景下就可以使用ScheduledThreadPoolExecutor。關於ThreadPoolExecutor與ScheduledThreadPoolExecutor較為詳細深入的分析可以參考下面的文章:

  • Java執行緒池詳解(一)
  • Java執行緒池詳解(二)
  • Java排程執行緒池ScheduleExecutorService
  • Java排程執行緒池ScheduleExecutorService(續)

下面展示了ThreadPoolExecutor和ScheduledThreadPoolExecutor的類圖,可以看出他們的關係,以及他們的繼承關係:

ThreadPoolExecutor類圖

ScheduledThreadPoolExecutor類圖

關於較為細節的內容不再本文的敘述範圍之內,如果想要了解這些內容的詳細內容,可以參考文章中給出的連結,這些文章較為深入的分析和總結了相關的內容。

上文中提到,執行緒池會管理著一些執行緒,這些執行緒要麼處於執行狀態,要麼處於等待任務的狀態,當然這只是我們較為形象的描述,一個執行緒的狀態不僅有執行態與等待狀態,還有其他的狀態,但是對我我們來說,執行緒池裡面的執行緒確實是要麼處於執行狀態,要麼處於等待任務的狀態,這體現在,當我們向一個執行緒池提交一個任務的時候,可能會被等待任務的執行緒立即執行,但是可能執行緒池裡面的執行緒都處於忙碌狀態,那麼我們提交的任務就會被加入到等待執行的任務佇列中去,當有空閒執行緒了,或者佇列也滿了,那麼執行緒池就會採用一些策略來執行任務,並且在某些時刻會拒絕提交的任務,這些細節都可以在ThreadPoolExecutor的實現中找到。

線上程池的實現中,有一個角色特別重要,那就是任務佇列,當執行緒池裡面沒有空閒的執行緒來執行我們的任務的時候,我們的任務就會被新增到任務佇列中去等待執行,而這個任務佇列可能會被多個執行緒併發讀寫,所以需要支援多執行緒安全訪問,java提供了一類支援併發環境的佇列,稱為阻塞佇列,這是一類特殊的佇列,他們的使用時非常廣泛的,特別是在jdk自身的類庫建設上,當然在我們實際的工作中也是有很多使用場景的。

關於ThreadPoolExecutor是如何處理一個提交的任務的細節,可以參考下面的程式碼:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

下面來看一下java中藉助ThreadPoolExecutor來構造的幾個執行緒池的特性:

1、newFixedThreadPool

使用ThreadPoolExecutor構造一個newCachedThreadPool的流程如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

在任意時刻,newFixedThreadPool構造出來的執行緒池中最多隻可能存活著nThreads個執行緒,如果所有的執行緒都在執行任務,那麼這個時候提交的任務將會被新增到任務佇列中去等待執行。我們可以控制corePoolSize和maximumPoolSize來使得通過ThreadPoolExecutor構造出來的執行緒池具有一些不一樣的特性,但是需要注意的是,當我們設定的maximumPoolSize大於corePoolSize的時候,如果當前執行緒池裡面的執行緒數量已經達到了corePoolSize了,並且當前所以執行緒都處於執行任務的狀態,那麼在這個時候提交的任務會被新增到任務佇列中去,只有在任務佇列滿了的時候,才會去建立新的執行緒,如果執行緒數量已經達到了maximumPoolSize了,那麼到此就會拒絕提交的任務,這些流程可以參考上面展示出來的execute方法的實現。該型別的執行緒池使用的任務佇列是LinkedBlockingQueue型別的阻塞佇列。

2、newCachedThreadPool

通過ThreadPoolExecutor構造一個newCachedThreadPool執行緒池的流程如下:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

newCachedThreadPool適合於類似秒殺系統中,它可以按需建立執行緒。每個執行緒在空閒了一段時間之後會被回收,然後需要建立的時候再創建出來,在使用的時候應該使用合適的構造引數。該型別使用的任務佇列是SynchronousQueue這種同步佇列,這是一種特別的佇列,每個執行緒都是有使命的,每個執行緒都會等待另外一個執行緒和自己交易,在交易完成之前都會阻塞住執行緒,他們之間有一種傳遞關係,資料是從一個執行緒直接傳遞到例外一個執行緒中去的,SynchronousQueue這種佇列不儲存實際的資料,而是儲存著一些執行緒的資訊,而SynchronousQueue管理著這些執行緒之間的交易,更為詳細的細節參考後面的文章。

上面提到,ScheduleThreadPoolExecutor是繼承自ThreadPoolExecutor的,而且從類圖中也可以看出來這種關係,所以其實ScheduleThreadPoolExecutor是對ThreadPoolExecutor的增強,它增加了schedule功能,使用與那些需要週期性排程執行,或者是延時執行的任務,在ScheduleThreadPoolExecutor中使用了一種阻塞佇列稱為延時阻塞佇列,這種佇列有能力持有一段時間資料,我們可以設定這種時間,時間沒到的時候嘗試獲取資料的執行緒會被阻塞,直到設定的時間到了,執行緒才會被喚醒來消費資料。而關於ScheduleThreadPoolExecutor是如何運作的,包括他的週期性任務排程是如何工作的,可以參考上面提到的連結。

Future

Future代表一種未來某個時刻會發生的事情,在併發環境下使用Future是非常重要的,使用Future的前提是我們可以容許執行緒執行一段時間來完成這個任務,但是需要在我們提交了任務的時候就返回一個Future,這樣在接下來的時間程式設計師可以根據實際情況來取消任務或者獲取任務,在多個任務沒有相互依賴關係的時候,使用Future可以實現多執行緒的併發執行,多個執行緒可以執行在不同的處理器上,然後在某個時間點來統一獲取結果就可以了。上文中已經提到了FutureTask,FutureTask既是一種Runnable,也是一種Future,並且結合了兩種型別的特性。下面展示了Future提供的一些方法,使用這些方法可以很方便的進行任務控制:

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在java 8中增加了一個新的類CompletableFuture,這是對Future的極大增強,CompletableFuture提供了非常豐富的操作可以來控制我們的任務,並且可以根據多種規則來關聯多個Future。關於CompletableFuture的詳細分析總結可以參考文章Java CompletableFuture。

Fork/Join框架

Fork/Join框架是一種並行框架,它可以將一個較大的任務切分成一些小任務來執行,並且多個執行緒之間會相互配合,每個執行緒都會有一個任務佇列,對於某些執行緒來說它們可能很快完成了自己的任務佇列中的任務,但是其他的執行緒還沒有完成,那麼這些執行緒就會去竊取那些還沒有完成任務執行的執行緒的任務來執行,這成為“工作竊取”演算法,關於Fork/Join中的工作竊取,其實現還是較為複雜的,如果想對Fork/Join框架有一個大致的認識,可以參考文章Java Fork/Join並行框架。下面展示了Fork/Join框架的工作模式:

可以從上面的圖中看出,一個較大的任務會被切分為一個小任務,並且小任務還會繼續切分,直到符合我們設定的執行閾值,然後就會執行,執行完成之後會進行join,也就是將小任務的結果組合起來,組裝出我們提交的整個任務的結果,這是一種非常先進的工作模式,非常有借鑑意義。當然,使用Fork/Join框架的前提是我們的任務時可以拆分成小任務來執行的,並且小人物的結果可以組裝出整個大任務的結果,歸併排序是一種可以藉助Fork/Join框架來提供處理速度的演算法,下面展示了使用Fork/Join框架來執行歸併排序的程式碼,可以試著調整引數來進行效能測試:

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * Created by hujian06 on 2017/10/23.
 *
 * merge sort by fork/join
 */
public class ForkJoinMergeSortDemo {

    public static void main(String ... args) {
        new Worker().runWork();
    }

}

class Worker {

    private static final boolean isDebug = false;

    public void runWork() {

        int[] array = mockArray(200000000, 1000000); // mock the data

        forkJoinCase(array);
        normalCase(array);

    }

    private void printArray(int[] arr) {

        if (isDebug == false) {
            return;
        }

        for (int i = 0; i < arr.length; i ++) {
            System.out.print(arr[i] + " ");
        }

        System.out.println();
    }

    private void forkJoinCase(int[] array) {
        ForkJoinPool pool = new ForkJoinPool();

        MergeSortTask mergeSortTask = new MergeSortTask(array, 0, array.length - 1);

        long start = System.currentTimeMillis();

        pool.invoke(mergeSortTask);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[for/join mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private void normalCase(int[] array) {

        long start = System.currentTimeMillis();

        new MergeSortWorker().sort(array, 0, array.length - 1);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[normal mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private static final  int[] mockArray(int length, int up) {
        if (length <= 0) {
            return null;
        }

        int[] array = new int[length];

        Random random = new Random(47);

        for (int i = 0; i < length; i ++) {
            array[i] = random.nextInt(up);
        }

        return array;
    }
}

class MergeSortTask extends RecursiveAction {

    private static final int threshold = 100000;
    private final MergeSortWorker mergeSortWorker = new MergeSortWorker();

    private int[] data;

    private int left;
    private int right;

    public MergeSortTask(int[] array, int l, int r) {
        this.data = array;
        this.left = l;
        this.right = r;
    }

    @Override
    protected void compute() {
        if (right - left < threshold) {
            mergeSortWorker.sort(data, left, right);
        } else {
            int mid = left + (right - left) / 2;
            MergeSortTask l = new MergeSortTask(data, left, mid);
            MergeSortTask r = new MergeSortTask(data, mid + 1, right);

            invokeAll(l, r);

            mergeSortWorker.merge(data, left, mid, right);
        }
    }
}

class MergeSortWorker {

    // Merges two subarrays of arr[].
    // First subarray is arr[l..m]
    // Second subarray is arr[m+1..r]
    void merge(int arr[], int l, int m, int r) {
        // Find sizes of two subarrays to be merged
        int n1 = m - l + 1;
        int n2 = r - m;

        /* Create temp arrays */
        int L[] = new int[n1];
        int R[] = new int[n2];

        /*Copy data to temp arrays*/
        for (int i = 0; i < n1; ++i)
            L[i] = arr[l + i];
        for (int j = 0; j < n2; ++j)
            R[j] = arr[m + 1 + j];

        /* Merge the temp arrays */

        // Initial indexes of first and second subarrays
        int i = 0, j = 0;

        // Initial index of merged subarry array
        int k = l;
        while (i < n1 && j < n2) {
            if (L[i] <= R[j]) {
                arr[k ++] = L[i ++];
            } else {
                arr[k ++] = R[j ++];
            }
        }

        /* Copy remaining elements of L[] if any */
        while (i < n1) {
            arr[k ++] = L[i ++];
        }

        /* Copy remaining elements of R[] if any */
        while (j < n2) {
            arr[k ++] = R[j ++];
        }
    }

    // Main function that sorts arr[l..r] using
    // merge()
    void sort(int arr[], int l, int r) {
        if (l < r) {
            // Find the middle point
            int m = l + (r - l) / 2;

            // Sort first and second halves
            sort(arr, l, m);
            sort(arr, m + 1, r);

            // Merge the sorted halves
            merge(arr, l, m, r);
        }
    }
}

在jdk中,使用Fork/Join框架的一個典型案例是Streams API,Streams API試圖簡化我們的併發程式設計,可以使用很簡單的流式API來處理我們的資料流,在我們無感知的狀態下,其實Streams的實現上藉助了Fork/Join框架來實現了併發計算,所以強烈建議使用Streams API來處理我們的流式資料,這樣可以充分的利用機器的多核心資源,來提高資料處理的速度。關於java的Streams API的分析總結可以參考文章Java Streams API,關於Stream API是如何使用Fork/Join框架來實現平行計算的內容可以參考文章Java Stream的並行實現。鑑於Fork/Join框架的先進思想,理解並且學會使用Fork/Join框架來處理我們的實際問題是非常有必要的。

Java volatile關鍵字

volatile解決的問題是多個執行緒的記憶體可見性問題,在併發環境下,每個執行緒都會有自己的工作空間,每個執行緒只能訪問各自的工作空間,而一些共享變數會被載入到每個執行緒的工作空間中,所以這裡面就有一個問題,記憶體中的資料什麼時候被載入到執行緒的工作快取中,而執行緒工作空間中的內容什麼時候會回寫到記憶體中去。這兩個步驟處理不當就會造成記憶體可加性問題,也就是資料的不一致,比如某個共享變數被執行緒A修改了,但是沒有回寫到記憶體中去,而執行緒B在載入了記憶體中的資料之後讀取到的共享變數是髒資料,正確的做法應該是執行緒A的修改應該對執行緒B是可見的,更為通用一些,就是在併發環境下共享變數對多個執行緒是一致的。

對於記憶體可見性的一點補充是,之所以會造成多個執行緒看到的共享變數的值不一樣,是因為執行緒在佔用CPU時間的時候,cpu為了提高處理速度不會直接和記憶體互動,而是會先將記憶體中的共享內容讀取到內部快取中(L1,L2),然後cpu在處理的過程中就只會和內部快取互動,在多核心的機器中這樣的處理方式就會造成記憶體可見性問題。

volatile可以解決併發環境下的記憶體可見性問題,只需要在共享變數前面加上volatile關鍵字就可以解決,但是需要說明的是,volatile僅僅是解決記憶體可見性問題,對於像i++這樣的問題還是需要使用其他的方式來保證執行緒安全。使用volatile解決記憶體可見性問題的原理是,如果對被volatile修飾的共享變數執行寫操作的話,JVM就會向cpu傳送一條Lock字首的指令,cpu將會這個變數所在的快取行(快取中可以分配的最小快取單位)寫回到記憶體中去。但是在多處理器的情況下,將某個cpu上的快取行寫回到系統記憶體之後,其他cpu上該變數的快取還是舊的,這樣再進行後面的操作的時候就會出現問題,所以為了使得所有執行緒看到的內容都是一致的,就需要實現快取一致性協議,cpu將會通過監控總線上傳遞過來的資料來判斷自己的快取是否過期,如果過期,就需要使得快取失效,如果cpu再來訪問該快取的時候,就會發現快取失效了,這時候就會重新從記憶體載入快取。 總結一下,volatile的實現原則有兩條: 1、JVM的Lock字首的指令將使得cpu快取寫回到系統記憶體中去 2、為了保證快取一致性原則,在多cpu的情景下,一個cpu的快取回寫記憶體會導致其他的cpu上的快取都失效,再次訪問會重新從系統記憶體載入新的快取內容。

原子操作CAS

原子操作表達的意思是要麼一個操作成功,要麼失敗,中間過程不會被其他的執行緒中斷,這一點對於併發程式設計來說非常重要,在java中使用了大量的CAS來做併發程式設計,包括jdk的ConcurrentHsahMap的實現,還有AtomicXXX的實現等其他一些併發工具的實現都使用了CAS這種技術,CAS包括兩部分,也就是Compare and swap,首先是比較,然後再互動,這樣做的原因是,在併發環境下,可能不止一個執行緒想要來改變某個共享變數的值,那麼在進行操作之前使用一個比較,而這個比較的值是當前執行緒認為(知道)該共享變數最新的值,但是可能其他執行緒已經改變了這個值,那麼此時CAS操作就會失敗,只有在共享變數的值等於執行緒提供的用於比較的值的時候才會進行原子改變操作。

java中有一個類是專門用於提供CAS操作支援的,那就是Unsafe類,但是我們不能直接使用Unsafe類,因為Unsafe類提供的一些底層的操作,需要非常專業的人才能使用好,並且Unsafe類可能會造成一些安全問題,所以不建議直接使用Unsafe類,但是如果想使用Unsafe類的話還是有方法的,那就是通過反射來獲取Unsafe例項,類似於下面的程式碼:

class UnsafeHolder {

    private static Unsafe U = null;

    public static Unsafe getUnsafe() {
        if (U == null) {
            synchronized (UnsafeHolder.class) {
                if (U == null) {

                    List<Exception> exception = null;
                    try {
                        Field field = Unsafe.class.getDeclaredField("theUnsafe");

                        field.setAccessible(true);

                        try {
                            U = (Unsafe) field.get(null);
                        } catch (IllegalAccessException e) {

                            exception.add(e);
                        }
                    } catch (NoSuchFieldException e) {

                        exception.add(e);
                    } finally {

                        if (exception != null) {
                            reportException(exception);
                        }

                    }
                }
            }
        }

        return U;
    }

    /**
     * handler the exception in this method .
     * @param e The exception
     */
    private static void reportException(List<Exception> e) {
        e.forEach(System.out::println);
    }

}

如果想要了解Unsafe類到底提供了哪些較為底層的操作,可以直接參考Unsafe的原始碼。CAS操作解決了原子操作問題,只要進行操作,CAS就會保證操作會成功,不會被中斷,這是一種非常好非常強大的特性,下面就java 8中的ConcurrentHashMap的size實現來談談CAS操作在併發環境下的使用案例。

在java 7中,ConcurrentHashMap的實現是基於分段鎖協議的實現,本質上還是使用了鎖,只是基於一種考慮,就是多個執行緒訪問雜湊桶具有隨機性,基於這種考慮來將資料儲存在不同的雜湊段上面,然後每一個段配有一把鎖,在需要寫某個段的時候需要加鎖,而在這個時候,其他訪問其他段的執行緒是不需要阻塞的,但是對於該段的執行緒訪問就需要等待,直到這個加鎖的執行緒釋放了鎖,其他執行緒才能進行訪問。在java 8中,ConcurrentHashMap的實現拋棄了這種複雜的架構設計,但是繼承了這種分散執行緒競爭壓力的思想,其實就提高系統的併發度這一維度來說,分散競爭壓力是一種最為直接明瞭的解決方案,而java 8在實現ConcurrentHashMap的時候大量使用了CAS操作,減少了使用鎖的頻度來提高系統的響應度,其實使用鎖和使用CAS來做併發在複雜度上不是一個數量級的,使用鎖在很大程度上假設了多個執行緒的排斥性,並且使用鎖會將執行緒阻塞等待,也就是說使用鎖來做執行緒同步的時候,執行緒的狀態是會改變的,但是使用CAS是不會改變執行緒的狀態的(不太嚴謹的說),所以使用CAS比起使用synchronized或者使用Lcok來說更為輕量級。

現在就ConcurrentHashMap的size方法來分析一下如何將執行緒競爭的壓力分散出去。在java 7的實現上,在呼叫size方法之後,ConcurrentHashMap會進行兩次對雜湊桶中的記錄累加的操作,這兩次累加的操作是不加鎖的,然後判斷兩次結果是否一致,如果一致就說明目前的系統是讀多寫少的場景,並且可能目前沒有執行緒競爭,所以直接返回就可以,這就避免了使用鎖,但是如果兩次累加結果不一致,那就說明此時可能寫的執行緒較多,或者執行緒競爭較為嚴重,那麼此時ConcurrentHashMap就會進行一個重量級的操作,對所有段進行加鎖,然後對每一個段進行記錄計數,然後求得最終的結果返回。在最有情況下,size方法需要做兩次累加計數,最壞情況需要三次,並且會涉及全域性加鎖這種重量級的加鎖操作,效能肯定是不高的。而在java 8的實現上,ConcurrentHashMap的size方法實際上是與ConcurrentHashMap是解耦的,size方法更像是接入了一個額外的併發計數系統,在進行size方法呼叫的時候是不會影響資料的存取的,這其實是一種非常先進的思想,就是一個系統模組化,然後模組可以進行更新,系統解耦,比如java 8中接入了併發計陣列件Striped64來作為size方法的支撐,可能未來出現了比Striped64更為高效的演算法來計數,那麼只需要將Striped64模組換成新的模組就可以了,對原來的核心操作是不影響的,這種模組化系統設定的思想應該在我們的專案中具體實踐。

上面說到java 8在進行size方法的設計上引入了Striped64這種併發計陣列件,這種元件的計數思想其實也是分散競爭,Striped64的實現上使用了volatile和CAS,在Striped64的實現中是看不到鎖的使用的,但是Striped64確實是一種高效的適用於併發環境下的計陣列件,它會基於請求計數的執行緒,Striped64的計數會根據兩部分的內容來得到最後的結果,類似於java 7中ConcurrentHashMap的size方法的實現,在Striped64的實現上也是借鑑了這種思想的,Striped64會首先嚐試將某個執行緒的計數請求累加到一個base共享變數上,如果成功了,那麼說明目前的競爭不是很激烈,也就沒必要後面的操作了,但是很多情況下,併發環境下的執行緒競爭是很激烈的,所以嘗試累加到base上的計數請求很大概率是會失敗的,那麼Striped64會維護一個Cell陣列,每個Cell是一個計陣列件,Striped64會為每個請求計數的執行緒計算一個雜湊值,然後雜湊到Cell陣列中的某個位置上,然後這個執行緒的計數就會累加到該Cell上面去。當然,Striped64的實現細節是非常複雜的,想要了解Striped64的實現細節的讀者可以參考文章Java 併發計陣列件Striped64詳解,配合Striped64的原始碼效果更佳。

併發同步框架AQS

AQS是java中實現Lock的基礎,也是實現執行緒同步的基礎,AQS提供了鎖的語義,並且支援獨佔模式和共享模式,對應於悲觀鎖和樂觀鎖,獨佔模式的含義是說同一時刻只能有一個執行緒獲取鎖,而其他試圖獲取鎖的執行緒都需要阻塞等待,而共享鎖的含義是說可以有多個執行緒獲得鎖,兩種模式在不同的場景下使用。

而鎖在併發程式設計中的地位不言而喻,多個執行緒的同步很多時候是需要鎖來做同步的,比如對於某些資源,我們希望可以有多個執行緒獲得鎖來讀取,但是隻允許有一個執行緒獲得鎖來執行寫操作,這種鎖稱為讀寫鎖,它的實現上結合了AQS的共享模式和獨佔模式,共享模式對應於可以使得多個執行緒獲得鎖來進行讀操作,獨佔模式對應於只允許有一個執行緒獲得鎖來進行寫操作。關於java中多個Lock的實現細節,以及是如何藉助AQS來實現其具體邏輯的內容,可以參考文章ava可重入鎖詳解。該文章詳細講述了多個Lock介面的實現類,以及他們是如何藉助AQS來實現的具體細節。

某些時候,我們需要定製我們自己的執行緒同步策略,個性化的執行緒同步藉助AQS可以很容易的實現,比如我們的需求是允許限定個數的執行緒獲得鎖來進行一些操作,想要實現這樣的語義,只需要實現一個類,繼承AQS,然後重寫方法下面兩個方法:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

關於AQS的具體分析,可以參考文章Java同步框架AbstractQueuedSynchronizer。

還需要提到的一點是,鎖分為公平鎖和非公平鎖,java中大多數時候會使用佇列來實現公平鎖,而使用棧來實現非公平鎖,當然這是基於佇列和棧這兩種資料結構的特點來實現的,直觀的來說,使用佇列的FIFO的特性就可以實現類似排隊的效果,也就保證了公平性,而棧是一個後進先出的資料結構,它的這種結構造成的結果就是,最新進入的執行緒可能比那些等待過一段時間的執行緒更早的獲得鎖,更為具體的內容可以參考上面的文章進行了解。

synchronized(同步鎖)

相對於volatile,synchronized就顯得比較重量級了。 首先,我們應該知道,在java中,所有的物件都可以作為鎖。可以分為下面三種情況:

  1. 普通方法同步,鎖是當前物件
  2. 靜態方法同步,鎖是當前類的Class物件
  3. 普通塊同步,鎖是synchronize裡面配置的物件

當一個執行緒試圖訪問同步程式碼時,必須要先獲得鎖,退出或者丟擲異常時必須要釋放鎖。

JVM基於進入和退出Monitor物件來實現方法同步和程式碼塊同步,可以使用monitorenter和monitorexit指令實現。monitorenter指令是在編譯後插入到同步程式碼塊的開始位置,而monitorexit指令則插入到方法結束和異常處,JVM保證每個monitorenter都有一個monitorexit閾值相對應。執行緒執行到monitorenter的時候,會嘗試獲得物件所對應的monitor的鎖,然後才能獲得訪問許可權,synchronize使用的鎖儲存在Java物件頭中。

併發佇列(阻塞佇列,同步佇列)

併發佇列,也就是可以在併發環境下使用的佇列,為什麼一般的佇列不能再併發環境下使用呢?因為在併發環境下,可能會有多個執行緒同時來訪問一個佇列,這個時候因為上下文切換的原因可能會造成資料不一致的情況,併發佇列解決了這個問題,並且java中的併發佇列的使用時非常廣泛的,比如在java的執行緒池的實現上使用了多種不同特性的阻塞佇列來做任務佇列,對於阻塞佇列來說,它要解決的首要的兩個問題是:

  • 多執行緒環境支援,多個執行緒可以安全的訪問佇列
  • 支援生產和消費等待,多個執行緒之間互相配合,當佇列為空的時候,消費執行緒會阻塞等待佇列不為空;當佇列滿了的時候,生產執行緒就會阻塞直到佇列不滿。

java中提供了豐富的併發佇列實現,下面展示了這些併發佇列的概覽:

根據上面的圖可以將java中實現的併發佇列分為幾類:

  • 一般的阻塞佇列
  • 支援雙端存取的併發佇列
  • 支援延時獲取資料的延時阻塞佇列
  • 支援優先順序的阻塞佇列

這些佇列的區別就在於從佇列中存取資料時的具體表現,比如對於延時佇列來說,獲取資料的執行緒可能被阻塞等待一段時間,也可能立刻返回,對於優先順序阻塞佇列,獲取的資料是根據一定的優先順序取到的。下面展示了一些佇列操作的具體表現:

操作型別

Throws Exception

Special Value

Blocked

Timed out

插入

add(o)

offer(o)

put(o)

offer(o, timeout, unit)

取出(刪除)

remove(o)

poll()

take()

poll(timeout, unit)

  • Throws Exception 型別的插入和取出在不能立即被執行的時候就會丟擲異常。
  • Special Value 型別的插入和取出在不能被立即執行的情況下會返回一個特殊的值(true 或者 false)
  • Blocked 型別的插入和取出操作在不能被立即執行的時候會阻塞執行緒直到可以操作的時候會被其他執行緒喚醒
  • Timed out 型別的插入和取出操作在不能立即執行的時候會被阻塞一定的時候,如果在指定的時間內沒有被執行,那麼會返回一個特殊值

關於更為具體的分析總結可以參考文章Java 併發佇列詳解。

總結

本文總結了java併發程式設計中的若干核心技術,並且對每一個核心技術都做了一些分析,並給出了參考連結,可以在參考連結中查詢到更為具體深入的分析總結內容。java併發程式設計需要解決一些問題,比如執行緒間同步問題,如何保證資料可見性問題,以及如何高效的協調多個執行緒工作等內容,本文在這些維度上都有所設計,本文作為對閱讀java.util.Concurrent包的原始碼閱讀的一個總結,同時本文也作為一個起點,一個開始更高層次分析總結的起點,之前的分析都是基於jdk原始碼來進行的,並且某些細節的內容還沒有完全搞明白,其實在閱讀了一些原始碼之後就會發現,如果想要深入分析某個方面的內容,就需要一些底層的知識,否則很難完整的分析總結出來,但是這種不徹底的分析又是很有必要的,至少可以對這些內容有一些大概的瞭解,並且知道自己的不足,以及未來需要了解的底層內容。

對於java併發包的分析研究,深入到底層就是對jvm如何管理內容,如何管理執行緒的分析,在深入下去,就是作業系統對記憶體的管理,對執行緒的管理等內容,從作業系統再深入下去,就是去理解cpu的指令系統,學習磁碟知識等內容,當然,知識的關聯是無止境的,學習也是無止境的,目前來說,首要解決的問題是可以熟練的使用java提供的併發包內容來進行併發程式設計,在業務上提高併發處理能力,在出現問題的時候可以很快找到問題並且解決問題,在達到這個要求之後,可以去了解一些jvm層次的內容,比如jvm的記憶體模型,以及執行緒的實現,並且可以與學習作業系統的相關內容並行進行。

覺得本文對你有幫助?請分享給更多人。