1. 程式人生 > 程式設計 >Java併發程式設計入門(十二)生產者和消費者模式-程式碼模板

Java併發程式設計入門(十二)生產者和消費者模式-程式碼模板

Java極客  |  作者  /  鏗然一葉
這是Java極客的第 40 篇原創文章

一、應用場景

生產者和消費者模式應用於非同步處理場景,非同步處理的好處是生產者和消費者解耦,不互相依賴,生產者不需要等待消費者處理完,就可以持續生產消費內容,效率大大提高。

二、程式碼類結構

生產者和消費者程式碼類結構如下:


1.BlockedQueue是一個阻塞的有界佇列,用於存、取消費內容。

2.Producer是生產者,在這裡是一個抽象類,子類需要實現generateTask方法。

3.Consumer是消費者,在這裡是一個抽象類,子類需要實現exec方法。

4.這裡的Producer和Consumer只是一個抽象後的程式碼模板,邏輯比較簡單,落地時可根據實際需要編寫合適的模板。

三、Show me code

I、BlockedQueue.java

import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName BlockedQueue
 * @Description 阻塞任務佇列,新增任務時如果已經達到容量上限,則會阻塞等待
 * @Author 鏗然一葉
 * @Date 2019/10/5 11:32
 * @Version
1.0 * javashizhan.com **/
public class BlockedQueue<T>{ //鎖 private final Lock lock = new ReentrantLock(); // 條件變數:佇列不滿 private final Condition notFull = lock.newCondition(); // 條件變數:佇列不空 private final Condition notEmpty = lock.newCondition(); //任務集合 private Vector<T> taskQueue = new
Vector<T>(); //佇列容量 private final int capacity; /** * 構造器 * @param capacity 佇列容量 */ public BlockedQueue(int capacity) { this.capacity = capacity; } /** * 入隊操作 * @param t */ public void enq(T t) { lock.lock(); try { System.out.println("size: " + taskQueue.size() + " capacity: " + capacity); while (taskQueue.size() == this.capacity) { // 佇列滿了之後等待,等待佇列不滿 notFull.await(); } System.out.println(Thread.currentThread().getName() + " add task: " + t.toString()); taskQueue.add(t); // 入隊後,通知佇列不空了,可以出隊 notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 出隊操作 * @return */ public T deq(){ lock.lock(); try { try { while (taskQueue.size() == 0) { // 佇列為空時等待,等待佇列不空 notEmpty.await(); } } catch (InterruptedException e) { e.printStackTrace(); } T t = taskQueue.remove(0); // 出隊後,通知佇列不滿,可以繼續入隊 notFull.signal(); return t; }finally { lock.unlock(); } } } 複製程式碼

II、Producer.java

/**
 * @ClassName Producer
 * @Description 生產者,這個類比較簡單,使用繼承也省不了多少程式碼,可繼承,也可以自行實現。
 * @Author 鏗然一葉
 * @Date 2019/10/5 11:19
 * @Version 1.0
 * javashizhan.com
 **/
public abstract class Producer<T> implements Runnable {

    private BlockedQueue<T> taskQueue;

    public Producer(BlockedQueue<T> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run() {
        while(true) {
            T[] tasks = generateTask();
            if (null != tasks && tasks.length > 0) {
                for(T task: tasks) {
                    if (null != task) {
                        this.taskQueue.enq(task);
                    }
                }
            }
        }
    }

    /**
     * 生成任務,使用了“模板方法”設計模式,子類只要實現此方法則可。
     * @return
     */
    public abstract T[] generateTask();
}
複製程式碼

III、Consumer.java

/**
 * @ClassName Consumer
 * @Description 消費者,這個類比較簡單,使用繼承也省不了多少程式碼,可繼承,也可以自行實現。
 * @Author 鏗然一葉
 * @Date 2019/10/5 11:10
 * @Version 1.0
 * javashizhan.com
 **/
public abstract class Consumer<T> implements Runnable {

    private BlockedQueue<T> taskQueue;

    public Consumer(BlockedQueue<T> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run() {
        while(true) {
            T task = taskQueue.deq();
            exec(task);
        }
    }

    /**
     * 執行任務,使用了“模板方法”設計模式,子類只要實現此方法則可
     * @param task
     */
    public abstract void exec(T task);
}
複製程式碼

IV、使用程式碼例子

import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName BlockedQueue
 * @Description TODO
 * @Author 鏗然一葉
 * @Date 2019/10/5 9:13
 * @Version 1.0
 * javashizhan.com
 **/

public class LockTest {
    public static void main(String[] args) {
        BlockedQueue<String> taskQueue = new BlockedQueue<String>(10);

        for (int i = 0; i < 3; i++) {
            String producerName = "Producder-" + i;
            Thread producer = new Thread(new Producer<String>(taskQueue) {
                @Override
                public String[] generateTask() {
                    String[] tasks = new String[20];
                    for (int i = 0; i < tasks.length; i++) {
                        long timestamp = System.currentTimeMillis();
                        tasks[i] = "Task_" + timestamp + "_" + i;
                    }
                    return tasks;
                }
            },producerName);
            producer.start();
        }

        for (int i = 0; i < 5; i++) {
            String consumerName = "Consumer-" + i;
            Thread consumer = new Thread(new Consumer<String>(taskQueue) {
                @Override
                public void exec(String task) {
                    System.out.println(Thread.currentThread().getName() + " do task [" + task + "]");
                    //休眠一會,模擬任務執行耗時
                    sleep(2000);
                }

                private void sleep(long millis) {
                    try {
                        Thread.sleep(millis);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },consumerName);
            consumer.start();
        }
    }
}
複製程式碼

輸出日誌:

size: 0 capacity: 10
Producder-1 add task: Task_1570250409102_0
size: 1 capacity: 10
Producder-1 add task: Task_1570250409103_1
size: 2 capacity: 10
Producder-1 add task: Task_1570250409103_2
size: 3 capacity: 10
Producder-1 add task: Task_1570250409103_3
size: 4 capacity: 10
Producder-1 add task: Task_1570250409103_4
size: 5 capacity: 10
Producder-1 add task: Task_1570250409103_5
size: 6 capacity: 10
Producder-1 add task: Task_1570250409103_6
size: 7 capacity: 10
Producder-1 add task: Task_1570250409103_7
size: 8 capacity: 10
Producder-1 add task: Task_1570250409103_8
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_9
size: 10 capacity: 10
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-0 do task [Task_1570250409102_0]
Consumer-4 do task [Task_1570250409103_1]
Consumer-3 do task [Task_1570250409103_2]
Producder-1 add task: Task_1570250409103_10
Consumer-1 do task [Task_1570250409103_3]
Producder-0 add task: Task_1570250409102_0
size: 8 capacity: 10
Producder-0 add task: Task_1570250409103_1
size: 9 capacity: 10
Producder-0 add task: Task_1570250409103_2
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_4]
Producder-0 add task: Task_1570250409103_3
size: 10 capacity: 10
Consumer-3 do task [Task_1570250409103_6]
Producder-2 add task: Task_1570250409103_0
Consumer-1 do task [Task_1570250409103_5]
size: 9 capacity: 10
Producder-2 add task: Task_1570250409103_1
size: 10 capacity: 10
Consumer-4 do task [Task_1570250409103_7]
Consumer-0 do task [Task_1570250409103_8]
Producder-1 add task: Task_1570250409103_11
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_12
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_9]
Producder-1 add task: Task_1570250409103_13
size: 10 capacity: 10
複製程式碼

四、其他說明

1.這裡用到了Lock來加鎖,Lock相比synchronized關鍵字加鎖更靈活一些,如果有特殊需要,方便改造。

2.synchronized實現生產者和消費者模式的例子可參考“Java併發程式設計入門(七)輕鬆理解wait和notify以及使用場景”,那個程式碼還不夠通用,你可以修改得通用一些。

3.就當前這個例子而言,使用Lock加鎖和“Java併發程式設計入門(七)輕鬆理解wait和notify以及使用場景”中使用synchronized加鎖沒有多大區別,這裡僅僅是為了體會下Lock的使用方法。

4.使用有界阻塞佇列時需要注意生產者生產任務過程是否可控,如果是第三方不可控呼叫,當生產任務速度遠遠大於消費者處理任務速度時,可能由於阻塞導致長時間掛起,要麼掛起時間過長,導致等待執行緒太多,要麼超時失敗。這時就不適合使用阻塞方式,應該在佇列滿時丟擲異常以通知呼叫方不要再等待。

end.


相關閱讀:
Java併發程式設計(一)知識地圖
Java併發程式設計(二)原子性
Java併發程式設計(三)可見性
Java併發程式設計(四)有序性
Java併發程式設計(五)建立執行緒方式概覽
Java併發程式設計入門(六)synchronized用法
Java併發程式設計入門(七)輕鬆理解wait和notify以及使用場景
Java併發程式設計入門(八)執行緒生命週期
Java併發程式設計入門(九)死鎖和死鎖定位
Java併發程式設計入門(十)鎖優化
Java併發程式設計入門(十一)限流場景和Spring限流器實現
Java併發程式設計入門(十三)讀寫鎖和快取模板
Java併發程式設計入門(十四)CountDownLatch應用場景
Java併發程式設計入門(十五)CyclicBarrier應用場景
Java併發程式設計入門(十六)秒懂執行緒池差別
Java併發程式設計入門(十七)一圖掌握執行緒常用類和介面
Java併發程式設計入門(十八)再論執行緒安全


Java極客站點: javageektour.com/