【Java程式效能優化 第一版】第四章(不變模式, 生產者-消費者模式)
4.1.4 不變模式
不變模式天生就是多執行緒友好的,它的核心思想是,一個物件一旦被建立,則它的內部狀態將永遠不會發生改變。所以,沒有一個執行緒可以修改其內部狀態和資料,同時其內部狀態也絕不會自行發生改變。基於這個特性,對不變物件的多執行緒操作將不需要進行同步控制。
△ 注意:不變模式和只讀屬性是有一定區別的。不變模式比只讀屬性具有更強的一致性和不變性。對只讀屬性的物件而言,物件本身不能被其它執行緒修改,但是物件的自身狀態卻可能自行修改。
比如,一個物件的存活時間(物件的建立時間和當前時間的時間差)是隻讀的,因為任何一個第三方執行緒都不能修改這個屬性,但是這是一個可變的屬性,因為隨著時間的推移,它的值會發生改變。而不變模式要求,無論出於什麼原因,物件自建立後,其內部狀態和資料保持絕對的穩定
綜上所述,不變模式的主要使用場景主要需要滿足以下兩個條件:
□當物件建立後,其內部狀態和資料不再發生任何變化。
□ 物件需要被共享,被多執行緒頻繁訪問。
在Java語言中,不變模式的實現很簡單。為確保物件建立後,不發生任何改變,並保證不變模式正常工作,只需要注意一下4點:
□去除setter方法以及所有修改自身屬性的方法
□將所有屬性設定為私有,並用final標記,確保其不可修改
□確保沒有子類可以過載修改它的行為
□有一個可以建立完整物件的建構函式。
public final class Product {// final確保無子類 private final String no;// 私有屬性,不會被其它物件修改,final確保屬性不會被2次修改 private final String name; private final double price; // 在建立物件後, 需指定資料,因為建立後, 無法進行修改 public Product(String no, String name, double price) { super(); this.no = no; this.name = name; this.price = price; } public String getNo() { return no; } public String getName() { return name; } public double getPrice() { return price; } }
在不變模式的實現中,final關鍵字起到了重要的作用。對class的final定義保證了不變類沒有子類,確保其所有的getter不會被修改。對屬性的final定義確保所有的資料只能在物件被構造時賦值1次,之後,永遠不會發生改變。
JDK中,不變模式的使用非常廣泛。其中,最為典型的就是java.lang.String類。此外,所有元資料類的包裝類,都是使用不變模式實現的。由於基本資料型別和String型別在實際的軟體開發中應用極其廣泛,使用不變模式後,所有的例項的方法均不需要進行同步操作,保證了他們在多執行緒環境下的效能。
△ 注意:不變模式通過迴避問題而不是解決問題的態度來處理多執行緒併發訪問空置。不變物件是不需要進行同步操作的。由於併發同步會對效能產生不良的影響,因此,在需求允許的情況下,不變模式可以提高系統的併發效能和併發量
4.1.5 生產者-消費者模式
生產者-消費者模式是一個經典的多執行緒設計模式,它為多執行緒間的協作提供了良好的解決方案。在生產者-消費者模式中,通常有兩類執行緒,即若干個生產者執行緒和若干個消費者執行緒。生產者執行緒負責提交使用者請求,消費者執行緒則負責具體處理生產者提交的任務。生產者和消費者通過共享的緩衝區進行通訊。
△ 注意:生產者-消費者模式中的記憶體緩衝區的主要功能是資料在多執行緒間的共享。此外,通過該緩衝區,可以緩解生產者和消費者間的效能差。緩衝區是生產者和消費者的通訊橋樑,避免了生產者和消費者之間直接通訊,從而降低了它們之間的耦合。生產者不需要知道消費者的存在,消費者也不需要知道生產者的存在。
同時,由於記憶體緩衝區的存在,允許生產者和消費者在執行速度上存在時間差,無論是生產者在某一區域性時間內速度高於消費者,或者消費者在區域性時間內高於生產者,都可以通過共享記憶體緩衝區得到緩解,確保系統正常執行。
程式碼如下:BlockingQueue充當了共享記憶體緩衝區,用於維護任務或資料佇列(PCData物件)。PCData物件表示一個生產任務,或者相關任務的資料。生產者物件和消費者物件均引用同一個BlockingQueue例項。生產者負責建立PCData物件,並將它加入BlockingQueue中,消費者則從BlockingQueue佇列中獲取資料。
public class Producer implements Runnable{
private volatile boolean isRunning = true;
private BlockingDeque<PCData> queue;// 記憶體緩衝區
private static AtomicInteger count = new AtomicInteger();// 總數,原子操作
private static final int SLEEPTIME = 1000;
public Producer(
BlockingDeque<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producer id="+Thread.currentThread().getId());
try {
while(isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());// 構造任務資料
System.out.println(data + " is put into queue");
if(!queue.offer(data, 2, TimeUnit.SECONDS)) {// 提交到緩衝區中
System.out.println("failed to put data:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupted();
}
}
public void stop() {
isRunning = false;
}
}
public class Consumer implements Runnable{
private BlockingDeque<PCData> queue;// 緩衝區
private static final int SLEEPTIME = 1000;
public Consumer(
BlockingDeque<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id="+Thread.currentThread().getId());
Random r = new Random();// 隨機等待時間
try {
while(true) {
PCData data = queue.take();// 提取任務
if(null != data) {
int d = data.getData();
int re = d * d;// 計算平方
System.out.println(MessageFormat.format("{0}*{1}={2}", d, d, re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
//任務相關的資料
public class PCData {
private final int intData;// 資料
public PCData(int intData) {
this.intData = intData;
}
public PCData(String str) {
this.intData = Integer.valueOf(str);
}
public int getData() {
return intData;
}
@Override
public String toString() {
return "PCData{" +
"intData=" + intData +
'}';
}
public static void main(String[] args) throws InterruptedException{
// 建立緩衝區
BlockingDeque<PCData> queue = new LinkedBlockingDeque<PCData>(10);
// 建立生產者
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Producer producer4 = new Producer(queue);
// 建立消費者
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
Consumer consumer4 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();// 建立執行緒池
// 執行生產者
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(producer4);
// 執行消費者
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
service.execute(consumer4);
Thread.sleep(10 * 1000);
// 停止生產者
producer1.stop();
producer2.stop();
producer3.stop();
producer4.stop();
Thread.sleep(4000);
service.shutdown();
}
}