1. 程式人生 > >java併發程式設計一一執行緒池原理分析(一)

java併發程式設計一一執行緒池原理分析(一)

1、併發包

1、CountDownLatch(計數器)

CountDownLatch 類位於 java.util.concurrent 包下,利用它可以實現類似於計數器的功能。
比如有一個任務A,它要等待其他4個任務執行完成之後才能執行,此時就可以利用CountDownLatch
來實現這種功能了。CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。
每當一個執行緒完成了自己的任務後,計數器的值就會減1,當計數器的值到達0 時,他表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務了。

package com.javagi.test.thread;
import java.util.concurrent.CountDownLatch;
/**
 * @author kuiwang Created by Administrator on 2018/10/19.
 */
public class CountDownLatchTest {
	public static void main(String[] args) throws InterruptedException {
		CountDownLatch countDownLatch = new CountDownLatch(2);
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName() + ",子執行緒開始執行...");
				countDownLatch.countDown();
				System.out.println(Thread.currentThread().getName() + ",子執行緒結束執行...");
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName() + ",子執行緒開始執行...");
				countDownLatch.countDown();// 計數器值每次減去1
				System.out.println(Thread.currentThread().getName() + ",子執行緒結束執行...");
			}
		}).start();

		countDownLatch.await();// 減去為0,恢復任務繼續執行
		System.out.println("兩個子執行緒執行完畢....");
		System.out.println("主執行緒繼續執行.....");
		for (int i = 0; i < 10; i++) {
			System.out.println("main,i:" + i);
		}
	}
}

2、CyclicBarrier(屏障)

CyclicBarrier初始化時規定一個數目,然後計算呼叫了CyclicBarrier.wait() 進入等帶的執行緒數。
當執行緒數達到了這個數目時,所有進入等帶狀態的執行緒數被喚醒並繼續。
CyclicBarrier 就像他的名字一樣,可以看成是個障礙,所有的執行緒必須到齊後才能一起通過這
障礙。
CyclicBarrier初始化時還可以帶一個Runnable的引數,此Runnable任務在CyclicBarrier的數目達到後,
所有其它執行緒被喚醒前被執行。

package com.javagi.test.thread;
import java.util.concurrent.CyclicBarrier;
class Writer extends Thread {
	private CyclicBarrier cyclicBarrier;

	public Writer(CyclicBarrier cyclicBarrier) {
		this.cyclicBarrier = cyclicBarrier;
	}

	@Override
	public void run() {
		System.out.println("執行緒" + Thread.currentThread().getName() + ",正在寫入資料");
		try {
			Thread.sleep(3000);
		} catch (Exception e) {
			// TODO: handle exception
		}
		System.out.println("執行緒" + Thread.currentThread().getName() + ",寫入資料成功.....");

		try {
			cyclicBarrier.await();
		} catch (Exception e) {
		}
		System.out.println("所有執行緒執行完畢..........");
	}
}

public class CyclicBarrierTest {
	public static void main(String[] args) {
		CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
		for (int i = 0; i < 5; i++) {
			Writer writer = new Writer(cyclicBarrier);
			writer.start();
		}
	}
}

3、Semaphore(計數訊號量)

Semaphore是一種基於計數的訊號量。它可以設定一個閾值,基於此,多個執行緒競爭獲取許可訊號,
做自己的申請後歸還,超過閾值後,執行緒申請許可訊號將會被阻塞。Semaphore可以用來構建一些
物件池和資源池之類的,比如資料連線池,我們也可以建立計數為1 的Semaphore,將其作為
一種類似於互斥鎖的機制,這也叫二元訊號量,便是兩種互斥狀態。用法如下:
avaliablePermits 函式用來獲取當前可用的資源數量
wc.acquire(); // 申請資源
wc.release(); // 釋放資源

package com.javagi.test.thread;

import java.util.concurrent.Semaphore;
/**
 * 可以用來控制程式的併發量
 * 當多餘的執行緒進來時 在後面排隊等待執行。
 * @author kuiwang
 *
 */
public class SemaphoreTest {
	public static void main(String[] args) {
		
		// 建立一個計數閾值為5的訊號量物件  
    	// 只能5個執行緒同時訪問  
    	Semaphore semp = new Semaphore(5);  
    	try {  
    	    // 申請許可  
    	    semp.acquire();  
    	    try {  
    	        // 業務邏輯  
    	    } catch (Exception e) {  
    	  
    	    } finally {  
    	        // 釋放許可  
    	        semp.release();  
    	    }  
    	} catch (InterruptedException e) {  
    	  
    	}  
	}
}

案例
需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麼辦?假設10的人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麼1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。
程式碼示例:

package com.javagi.test.thread;
import java.util.Random;
import java.util.concurrent.Semaphore;

class ThradDemo001 extends Thread {
	private String name;
	private Semaphore wc;

	public ThradDemo001(String name, Semaphore wc) {
		this.name = name;
		this.wc = wc;
	}

	@Override
	public void run() {
		// 剩下的資源
		int availablePermits = wc.availablePermits();
		if (availablePermits > 0) {
			System.out.println(name + "天助我也,終於有茅坑了.....");
		} else {
			System.out.println(name + "怎麼沒有茅坑了...");
		}
		try {
			// 申請資源
			wc.acquire();
		} catch (InterruptedException e) {

		}
		System.out.println(name + "終於上廁所啦.爽啊" + ",剩下廁所:" + wc.availablePermits());
		try {
			Thread.sleep(new Random().nextInt(1000));
		} catch (Exception e) {
			// TODO: handle exception
		}
		System.out.println(name + "廁所上完啦!");
		// 釋放資源
		wc.release();
	}
}

public class SemaphoreTest2 {
	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(3);
		for (int i = 1; i <= 10; i++) {
			ThradDemo001 thradDemo001 = new ThradDemo001("第" + i + "個人", semaphore);
			thradDemo001.start();
		}
	}
} 

4、併發佇列

在併發佇列上JDK提供了兩套實現,一個是ConcurrentLinkedQueue 為代表的高效能佇列非阻塞,一個是
以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承Queue。

1、阻塞佇列與非阻塞佇列

阻塞佇列與普通佇列的區別在於,當佇列是空的時候,從佇列中獲取元素的操作將會被阻塞,或者當佇列是
滿時,往佇列裡面新增元素的操作會內阻塞,檢視從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其它的執行緒往空的佇列插入新的元素。同樣,檢視往已滿餓阻塞佇列中新增新元素的執行緒同時也會被阻塞,直到
其它的執行緒使佇列重新變得空閒起來,如從佇列中移除一個或者多個元素,或者完全清空佇列。

  1. ArrayDeque 陣列雙端佇列
  2. PriorityQueue 優先順序佇列
  3. ConcurrentLinkedQueue 基於連結串列的併發佇列
  4. DelayQueue 延期阻塞佇列 阻塞佇列實現了BlockingQueue介面
  5. ArrayBlockingQueue 基於資料的併發阻塞佇列
  6. LinkedBlockingQueue 基於連結串列的FIFO 阻塞佇列
  7. LlinkedBlockingDeque 基於連結串列的FIFO雙端阻塞佇列
  8. PriorityBlockingDeque 待優先順序的無界阻塞佇列
  9. SynchronousQueue 併發同步阻塞佇列

2、ConcurrentLinkedDeque

ConcurrentLinkedDeque:是一個使用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的
高效能,通常ConcurrentLinkedDeque 效能好於 BlockingQueue 。它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循了先進先出的原則, 頭是最先加入的,尾是最近加入的,該佇列不允許null元素。
ConcurrentLinkedDeque重要方法:
add 和 offer () 都是加入元素的方法(在ConcurrentLinkedDeque 中這兩個方法沒有任何區別)
poll 和 peek() 都是取頭元素節點, 區別在於前者會刪除元素,後者不會。

public class ConcurrentLinkedDequeTest {
    public static void main(String[] args) {
        ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
        q.offer("若成風");
        q.offer("碼雲");
        q.offer("javagi");
        q.offer("張傑");
        q.offer("艾姐");
        //從頭獲取元素,刪除該元素
        System.out.println(q.poll());
        //獲取總長度
        System.out.println(q.size());
        //從頭獲取元素,不刪除該元素
        System.out.println(q.peek());
        //獲取總長度
        System.out.println(q.size());
    }
}

3、BlockingQueue

阻塞佇列BlockingQueue:是一個支援兩個附件操作的佇列。這兩個附件的操作是:
在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
當佇列滿時,儲存元素的執行緒會等待佇列可用。
阻塞佇列通常用於生產者和消費者的場景, 生產者是往佇列裡面新增元素的執行緒,
消費者是從佇列裡面拿出元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者
也只是從容器裡拿出元素
BlockingQueue 即阻塞佇列,從阻塞這個詞可以看出,在某些情況下對阻塞佇列的訪問可能會造成阻塞。
被阻塞的情況主要分為兩種:

  1. 當佇列滿了的時候進行入佇列的操作
  2. 當佇列空了的時候進行初初佇列的操作
    因此,當一個執行緒試圖對一個已經滿了的佇列進行入佇列操作時,他將會被阻塞,除非有另一個執行緒做了
    出佇列的操作,同樣。當一個執行緒檢視對一個空佇列進行出佇列操作時,他將會被阻塞, 除非有另一個執行緒進行了入佇列的操作。
    在Java中,BlockingQueue的介面位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞佇列的特性可知,阻塞佇列是執行緒安全的。
    在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
    認識BlockingQueue

阻塞佇列,顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:
從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;
常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)
  先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
  後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。
多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)
下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:

1.ArrayBlockingQueue

ArrayBlockingQueue 是一個有邊界的阻塞佇列,他的內部實現是一個數組,有邊界的意思是他的容量是有險的
我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可以改變。
ArrayBlockingQueue 是以先進先出的方式儲存資料,最新插入的物件是尾部,最新移出的物件是頭部,下面是初始化和使用ArrayBlockingQueue 的例子:

public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3);
        arrays.add("張三");
        arrays.add("李四");
        arrays.add("王五");
        // 新增阻塞佇列
        try {
            arrays.offer("若成風", 1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
2.LinkedBlockingQueue

LinkedBlockingQueue阻塞佇列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是採用了預設大小為Integer.MAX_VALUE的容量 。它的內部實現是一個連結串列。
和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進先出的方式儲存資料,最新插入的物件是尾部,最新移出的物件是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size());

3.PriorityBlockingQueue

PriorityBlockingQueue是一個沒有邊界的佇列,它的排序規則和 java.util.PriorityQueue一樣。需要注
意,PriorityBlockingQueue中允許插入null物件。
所有插入PriorityBlockingQueue的物件必須實現 java.lang.Comparable介面,佇列優先順序的排序規則就
是按照我們對這個介面的實現來定義的。
另外,我們可以從PriorityBlockingQueue獲得一個迭代器Iterator,但這個迭代器並不保證按照優先順序順
序進行迭代。
下面我們舉個例子來說明一下,首先我們定義一個物件型別,這個物件需要實現Comparable介面:

4. SynchronousQueue

SynchronousQueue佇列內部僅允許容納一個元素。當一個執行緒插入一個元素後會被阻塞,除非這個元素被另一個執行緒消費。

4、使用BlockingQueue模擬生產者和消費者

package com.javagi.threadpool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2018/10/25.
 */
class ProducerThread implements Runnable {
    private BlockingQueue<String> blockingQueue;

    private AtomicInteger count = new AtomicInteger();

    private volatile boolean FLAG = true;

    public ProducerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "生產者開始啟動。。。");
        while (true) {
            String data = count.incrementAndGet() + "";
            try {
                boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
                if (offer) {
                    System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "成功..");
                } else {
                    System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "失敗..");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "生產者執行緒結束。。。");
        }
    }
    public void stop() {
        this.FLAG = FLAG;
    }
}

class ConsumerThread implements Runnable {
    private volatile boolean FLAG = true;
    private BlockingQueue<String> blockingQueue;

    public ConsumerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "消費者開始啟動....");
        while (FLAG) {
            try {
                String data = blockingQueue.poll(2, TimeUnit.SECONDS);
                if (data == null || data == "") {
                    FLAG = false;
                    System.out.println("消費者超過2秒時間未獲取到訊息.");
                    return;
                }
                System.out.println("消費者獲取到佇列資訊成功,data:" + data);

            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }
}


public class BlockingQueueTest {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
        ProducerThread producerThread = new ProducerThread(blockingQueue);
        ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
        Thread t1 = new Thread(producerThread);
        Thread t2 = new Thread(consumerThread);
        t1.start();
        t2.start();
        //10秒後 停止執行緒..
        try {
            Thread.sleep(10*1000);
            producerThread.stop();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
}