1. 程式人生 > >多線程-生產者消費者(BlockingQueue實現)

多線程-生產者消費者(BlockingQueue實現)

insert inter exceptio ted 不同的 方法 nta 條件 -i

三、采用BlockingQueue實現

BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。

BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類
1、ArrayBlockingQueue:一個由數組支持的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。


2、LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。


3、PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序。


4、SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。

LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。

import java.util.concurrent.BlockingQueue;  
  
public class Producer implements Runnable {  
    BlockingQueue<String> queue;  
  
    public Producer(BlockingQueue<String> queue) {  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        try {  
            String temp = "A Product, 生產線程:"  
                    + Thread.currentThread().getName();  
            System.out.println("I have made a product:"  
                    + Thread.currentThread().getName());  
            queue.put(temp);//如果隊列是滿的話,會阻塞當前線程  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
  
}  

import java.util.concurrent.BlockingQueue;  
  
public class Consumer implements Runnable{  
    BlockingQueue<String> queue;  
      
    public Consumer(BlockingQueue<String> queue){  
        this.queue = queue;  
    }  
      
    @Override  
    public void run() {  
        try {  
            String temp = queue.take();//如果隊列為空,會阻塞當前線程  
            System.out.println(temp);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  
import java.util.concurrent.LinkedBlockingQueue;  
  
public class Test3 {  
  
    public static void main(String[] args) {  
      BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);  
     // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();  
     //不設置的話,LinkedBlockingQueue默認大小為Integer.MAX_VALUE  
          
    // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);  
  
        Consumer consumer = new Consumer(queue);  
        Producer producer = new Producer(queue);  
        for (int i = 0; i < 5; i++) {  
            new Thread(producer, "Producer" + (i + 1)).start();  
  
            new Thread(consumer, "Consumer" + (i + 1)).start();  
        }  
    }  
}  

BlockingQueue接口,擴展了Queue接口

package java.util.concurrent;

import java.util.Collection;
import java.util.Queue;

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}

我們用到的take() 和put(E e)

兩個方法,在ArrayBlockingQueue中的實現

  public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
  }
 private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
}

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
 private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

看得到其實也是利用了Lock以及Condition條件變量的await()方法和signal()方法實現的,這個實現和我們之前實現的Lock用法區別:

1)使用了兩個條件變量 consume的await放置在notEmpty 之上,喚醒在put的時候,produce的await放置在notfull之上,喚醒在take()的時候,喚醒是signal而不是signalAll,這樣做就不會因為大量喚醒導致競爭從而減低效率,通過鎖對象的分析,減低競爭

優點:更有利於協調生產消費線程的平衡

多線程-生產者消費者(BlockingQueue實現)