1. 程式人生 > >java併發api總結

java併發api總結

開發十年,就只剩下這套架構體系了! >>>   

1.java.util.concurrent包

1.1 Executors

  • Executor:介面,僅有一個方法為execute(Runnable)
  • ExecutorService:Executor的子介面,擴充套件了Executor的方法,如submit/shutdown等。
  • Executors:工廠類,提供生成各種執行緒池的靜態方法
  • ScheduledExecutorService:ExecutorService的子介面,提供定時排程功能,如schedule方法
  • Callable:介面,可以執行返回一個結果的任務(執行緒),僅有call方法。
  • Future:介面,主要方法為get,阻塞的方法,一般的呼叫:
     Future<String> future = executor.submit(new Callable<String>() {
         public String call() {
             return searcher.search(target);
         }
        });
        String str = future.get();
    
  • FutureTask:類,實現了Future,可取消的非同步計算,此類提供了對 Future 的基本實現.
  • ScheduledFuture:Future的子介面,接收ScheduledExecutorService執行的結果,該介面中沒有新增新方法。
  • Delayed :介面,用來標記那些應該在給定延遲時間之後執行的物件,主要方法為getDelay
  • CompletionService:介面,將生產新的非同步任務與使用已完成任務的結果分離開來的服務。生產者submit執行的任務。 使用者take已完成的任務,並按照完成這些任務的順序處理它們的結果。
  • ExecutorCompletionService:類,實現了CompletionService,使用提供的 Executor 來執行任務的 CompletionService。使用示例:
       void
    solve(Executor e, Collection<Callable<Result>> solvers) throws Exception
    { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); for (Callable<Result> s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } }
  • AbstractExecutorService:抽象類,ExecutorService的子類,提供ExecutorService執行方法的預設實現。此類使用newTaskFor返回的
  • RunnableFuture:實現submit、invokeAny和invokeAll方法,預設情況下,RunnableFuture是此包中提供的 FutureTask類。
  • ThreadPoolExecutor:執行緒池核心類,繼承了AbstractExecutorService,自定義執行緒池會用到,Executors中建立的執行緒池均為該類物件,如Executors.newCachedThreadPool()建立程式碼為:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());

以使用此類的構造方法來自主實現Executors建立的執行緒池。構造方法介紹如下:

public ThreadPoolExecutor(
    //達到執行條件的執行緒數,未達到則先建立執行緒
    int corePoolSize, 
    //最大執行緒數,達到後再來新的請求則不會再建立
    int maximumPoolSize,          
    //執行緒生存時間,如CacheThreadPool生存時間為60s
    long keepAliveTime,           
    //列舉,設定時間單位  
    TimeUnit unit,              
    /**
     * 工作佇列,直接提交:SynchronousQueue,
     * 無界佇列:LinkedBlockingQueue,
     * 有界佇列:ArrayBlockingQueue
     */
    BlockingQueue<Runnable> workQueue,    
    /** 
      * 使用ThreadFactory建立新執行緒。如果沒有另外說明,則在同一個
      * ThreadGroup中一律使用Executors.defaultThreadFactory()建立執行緒,
      * 並且這些執行緒具有相同的和非守護程序狀態。通過提供不同的ThreadFactory,
      * 可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,等等。如果從
      * newThread返回null時ThreadFactory未能建立執行緒,則執行程式將繼續執行,
      * 但不能執行任何任務。
      */
    ThreadFactory threadFactory
    /** 
      * 建立執行緒池並且提交任務失敗時,執行緒池會回撥RejectedExecutionHandler
      * 介面的rejectedExecution(Runnable task, ThreadPoolExecutor
      * executor)方法來處理執行緒池處理失敗的任務,其中task是使用者提交的任務,
      * 而executor是當前執行的任務的執行緒池 
      */
    RejectedExecutionHandler handler
   ){
        ......
   
   }

也可以繼承ThreadPoolExecutor,可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後呼叫。

  • ScheduledThreadPoolExecutor:ThreadPoolExecutor的子類,使用schedule方法設定定時排程,該方法的返回引數為ScheduledFuture

1.2 Queues

  • BolckingQueue:介面,Queue的子介面,支援兩個附加操作的Queue,這兩個操作是:獲取元素時等待佇列變為非空(take方法),以及儲存元素時等待空間變得可用(put方法)。
  • LinkedBlockingQueue:類,BlockingQueue的連結串列實現,一般用來實現無界佇列。
  • ArrayBlockingQueue:類,BlockingQueue的陣列實現,一般用來實現有界佇列。
  • ConcurrentLinkedQueue:java.util.AbstractQueue的子類,一個基於連結節點的無界執行緒安全佇列。此佇列按照 FIFO(先進先出)原則對元素進行排序,此實現採用了有效的“無等待 (wait-free)”演算法
  • SynchronousQueue:類,BlockingQueue的實現,一般用來實現任務即時提交。
    • 注意1:它一種阻塞佇列,其中每個 put 必須等待一個 take,反之亦然。同步佇列沒有任何內部容量,甚至連一個佇列的容量都沒有。
    • 注意2:它是執行緒安全的,是阻塞的。
    • 注意3:不允許使用 null 元素。
    • 注意4:公平排序策略是指呼叫put的執行緒之間,或take的執行緒之間。
    • 注意5:SynchronousQueue的以下方法很有趣:
      • iterator() 永遠返回空,因為裡面沒東西。
      • peek() 永遠返回null。
      • put() 往queue放進去一個element以後就一直wait直到有其他thread進來把這個element取走。
      • offer() 往queue裡放一個element後立即返回,如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false。
      • offer(2000, TimeUnit.SECONDS) 往queue裡放一個element但是等待指定的時間後才返回,返回的邏輯和offer()方法一樣。
      • take() 取出並且remove掉queue裡的element(認為是在queue裡的。。。),取不到東西他會一直等。
      • poll() 取出並且remove掉queue裡的element(認為是在queue裡的。。。),只有到碰巧另外一個執行緒正在往 queue裡offer資料或者put資料的時候,該方法才會取到東西。否則立即返回null。
      • poll(2000, TimeUnit.SECONDS) 等待指定的時間然後取出並且remove掉queue裡的element,其實就是再等其他的thread來往裡塞。
      • isEmpty()永遠是true。
      • remainingCapacity() 永遠是0。
      • remove()和removeAll() 永遠是false。
  • PriorityBlockingQueue:類,BlockingQueue的實現,一個無界阻塞佇列,它使用與類PriorityQueue相同的順序規則,並且提供了阻塞獲取操作。
  • DelayQueue:類,BlockingQueue的實現,Delayed元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部是延遲期滿後儲存時間最長的 Delayed元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且poll將返回null。當一個元素的getDelay(TimeUnit.NANOSECONDS)方法返回一個小於等於0的值時,將發生到期。即使無法使用take或poll移除未到期的元素,也不會將這些元素作為正常元素對待。

1.3 Concurrent Collections

  • ConcurrentMap:介面,java.util.Map的子介面,提供其他原子putIfAbsent、remove、replace方法的Map。
  • ConcurrentHashMap:類,支援獲取的完全併發和更新的所期望可調整併發的雜湊表。此類遵守與Hashtable相同的功能規範,並且包括對應於Hashtable的每個方法的方法版本。使用分段鎖來進行執行緒訪問控制,單執行緒環境下只損失非常小的效能。
  • CopyOnWriteArrayList:類,實現了List介面,ArrayList的一個執行緒安全的變體,其中所有可變操作(add、set 等等)都是通過對底層陣列進行一次新的複製來實現的。這一般需要很大的開銷,但是當遍歷操作的數量大大超過可變操作的數量時,這種方法可能比其他替代方法更 有效。
  • CopyOnWriteArraySet:類,實現了Set介面,對其所有操作使用內部CopyOnWriteArrayList的Set。因此,它共享以下相同的基本屬性:
    • 它最適合於具有以下特徵的應用程式:set 大小通常保持很小,只讀操作遠多於可變操作,需要在遍歷期間防止執行緒間的衝突。
    • 它是執行緒安全的。
    • 因為通常需要複製整個基礎陣列,所以可變操作(add、set 和 remove 等等)的開銷很大。
    • 迭代器不支援可變 remove 操作。
    • 使用迭代器進行遍歷的速度很快,並且不會與其他執行緒發生衝突。在構造迭代器時,迭代器依賴於不變的陣列快照。

1.4 Synchronizers

  • CountDownLatch:一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。用給定的計數初始化CountDownLatch。由於呼叫了countDown()方法,所以在當前計數到達零之前,await方法會一直受阻塞。之後,會釋放所有等待的執行緒,await的所有後續呼叫都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
  • Semaphore:類,一個計數訊號量。acquire()獲取許可,release()釋放獲得的許可。若執行緒未獲得許可,會一直阻塞。
  • Exchanger:類,可以在對中對元素進行配對和交換的執行緒的同步點。每個執行緒將條目上的某個方法呈現給exchange方法,與夥伴執行緒進行匹配,並且在返回時接收其夥伴的物件。應用較少。
  • CyclicBarrier:CyclicBarrier類似於CountDownLatch也是個計數器,不同的是CyclicBarrier數的是呼叫了CyclicBarrier.await()進入等待的執行緒數, 當執行緒數達到了CyclicBarrier初始時規定的數目時,所有進入等待狀態的執行緒被喚醒並繼續。CyclicBarrier初始時還可帶一個Runnable的引數 此Runnable任務在CyclicBarrier的數目達到後,在所有其它執行緒被喚醒前被執行。

1.5 Timing

  • TimeUnit --列舉,TimeUnit表示給定單元粒度的時間段,它提供在這些單元中進行跨單元轉換和執行計時及延遲操作的實用工具方法。

  • fork/join平行計算框架 fork/join框架是ExecutorService介面的一種具體實現,目的是為了幫助你更好地利用多處理器帶來的好處。它是為那些能夠被遞迴地拆解成子任務的工作型別量身設計的。其目的在於能夠使用所有可用的運算能力來提升應用的效能。 fork/join框架的核心是ForkJoinPool類,它是對AbstractExecutorService類的擴充套件。ForkJoinPool實現了工作偷取演算法,並可以執行ForkJoinTask任務。程式碼形式:

       if (當前這個任務工作量足夠小)
            直接完成這個任務
        else
            將這個任務或這部分工作分解成兩個部分
            分別觸發(invoke)這兩個子任務的執行,並等待結果
    
    示例1:
       public class Calculator extends RecursiveTask<Integer> {
            private static final int THRESHOLD = 100;
            private int start;
            private int end;
            public Calculator(int start, int end) {
                this.start = start;
                this.end = end;
            }
            @Override
            protected Integer compute() {
                int sum = 0;
                if((start - end) < THRESHOLD){
                    for(int i = start; i< end;i++){
                        sum += i;
                    }
                }else{
                    int middle = (start + end) /2;
                    Calculator left = new Calculator(start, middle);
                    Calculator right = new Calculator(middle + 1, end);
                    left.fork();
                    right.fork();
                    sum = left.join() + right.join();
                }
                return sum;
            }
            public void run() throws Exception{  
                ForkJoinPool forkJoinPool = new ForkJoinPool();  
                Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));  
                assertEquals(new Integer(49995000), result.get());  
            } 
        }
    
    示例2:
        public class SortTask extends RecursiveAction {
            final long[] array;
            final int start;
            final int end;
            private int THRESHOLD = 100; //For demo only
            public SortTask(long[] array) {
                this.array = array;
                this.start = 0;
                this.end = array.length - 1;
            }
            public SortTask(long[] array, int start, int end) {
                this.array = array;
                this.start = start;
                this.end = end;
            }
            protected void compute() {
                if (end - start < THRESHOLD)
                    sequentiallySort(array, start, end);
                else {
                    int pivot = partition(array, start, end);
                    new SortTask(array, start, pivot - 1).fork();
                    new SortTask(array, pivot + 1, end).fork();
                }
            }
            private int partition(long[] array, int start, int end) {
                long x = array[end];
                int i = start - 1;
                for (int j = start; j < end; j++) {
                    if (array[j] <= x) {
                        i++;
                        swap(array, i, j);
                    }
                }
                swap(array, i + 1, end);
                return i + 1;
            }
            private void swap(long[] array, int i, int j) {
                if (i != j) {
                    long temp = array[i];
                    array[i] = array[j];
                    array[j] = temp;
                }
            }
            private void sequentiallySort(long[] array, int lo, int hi) {
                Arrays.sort(array, lo, hi + 1);
            }
            @Test
            public void run() throws InterruptedException {
                ForkJoinPool forkJoinPool = new ForkJoinPool();
                Random rnd = new Random();
                long[] array = new long[SIZE];
                for (int i = 0; i < SIZE; i++) {
                    array[i] = rnd.nextInt();
                }
                forkJoinPool.submit(new SortTask(array));
    
                forkJoinPool.shutdown();
                forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);
    
                for (int i = 1; i < SIZE; i++) {
                    assertTrue(array[i - 1] < array[i]);
                }
            }
        }
    
  • ForkJoinPool:AbstractExecutorService的子類,實現ExecutorService介面和work-stealing演算法,管理工作執行緒和提供關於任務的狀態和它們執行的資訊。

  • ForkJoinTask:類,實現了Future介面,在ForkJoinPool中執行的任務的基類,提供在任務中執行fork()和join()操作的機制,並且這兩個方法控制任務的狀態。

  • ForkJoinWorkerThrea:類,繼承了Thread類,

  • RecursiveAction:ForkJoinTask的子類,不返回執行結果

  • RecursiveTask:ForkJoinTask的子類,返回執行結果

2. java.util.concurrent.lock包

2.1 Locks

  • Lock:介面,Lock實現提供了比使用synchronized方法和語句可獲得的更廣泛的鎖定操作。此實現允許更靈活的結構,可以具有差別很大的屬性,可以支援多個相關的Condition物件。方法介紹如下:

    • void lock():獲取鎖。
    • void lockInterruptibly():如果當前執行緒未被中斷,則獲取鎖。
    • Condition newCondition():返回繫結到此Lock例項的新Condition例項。
    • boolean tryLock():僅在呼叫時鎖為空閒狀態才獲取該鎖。
    • boolean tryLock(long time, TimeUnit unit):如果鎖在給定的等待時間內空閒,並且當前執行緒未被中斷,則獲取鎖。
    • void unlock():釋放鎖。
  • Condition:Condition將Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意Lock實現組合使用,為每個物件提供多個等待set(wait-set)。其中,Lock替代了synchronized方法和語句的使用,Condition替代了Object監視器方法的使用。不同於其他執行緒,使用此類時,等待與喚醒不再使用wait與notity方法,而是使用await()與signal()。

  • ReentrantLock:類,Lock的實現,一個可重入的互斥鎖Lock,此類的構造方法接受一個可選的公平 引數。

  • ReadWriteLock:口,ReadWriteLock維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有writer,讀取鎖可以由多個reader執行緒同時保持。寫入鎖是獨佔的。

  • ReentrantReadWriteLock:類,ReadWriteLock的實現,此類不會將讀取者優先或寫入者優先強加給鎖訪問的排序。但是支援可選的公平策略。

  • AbstractQueuedSynchronizer:抽象類,為實現依賴於先進先出 (FIFO) 等待佇列的阻塞鎖和相關同步器(訊號量、事件,等等)提供一個框架。

  • LockSupport:類,用來建立鎖和其他同步類的基本執行緒阻塞原語。park和unpark方法提供了阻塞和解除阻塞執行緒的有效方法

3. java.util.concurrent.atomic包

3.1 Atomics:提供原子操作的包,

  • AtomicBoolean:Boolean的原子類
  • AtomicInteger:Integer的原子類,java.lang.Number的子類
  • AtomicIntegerArray:可以用原子方式更新其元素的int陣列
  • AtomicIntegerFieldUpdater:基於反射的實用工具,可以對指定類的指定volatile int欄位進行原子更新。
  • AtomicLong:Long的原子類,java.lang.Number的子類
  • AtomicLongArray:可以用原子方式更新其元素的 long 陣列。
  • AtomicLongFieldUpdater:基於反射的實用工具,可以對指定類的指定 volatile long 欄位進行原子更新。
  • AtomicMarkableReference:維護帶有標記位的物件引用,可以原子方式對其進行更新。
  • AtomicReference:可以用原子方式更新的物件引用。
  • AtomicReferenceArray:可以用原子方式更新其元素的物件引用陣列。
  • AtomicReferenceFieldUpdater:基於反射的實用工具,可以對指定類的指定 volatile 欄位進行原子更新。

原文地址:https://my.oschina.net/funcy/blog/1928224