1. 程式人生 > 實用技巧 >生產者和消費者簡單實現

生產者和消費者簡單實現

一、synchronized實現

package com.duchong.queue;

/**
 * @author DUCHONG
 * @since 2020-09-17 18:23
 **/
public class SynchronizedDemo {

    //物件鎖
    static Object lock=new Object();
    //剩餘資源數量
    static int leftCount=0;
    //最大資源數量
    static int maxLimit=5;

    public static void main(String[] args)throws Exception {

        SynchronizedDemo synchronizedDemo = new SynchronizedDemo();

        new Thread(synchronizedDemo.new SyncProvider(),"provider-thread").start();
        new Thread(synchronizedDemo.new SyncConsumer(),"consumer-thread").start();

    }

    /**
     * 生產者
     */
    class SyncProvider implements Runnable{

        @Override
        public void run() {

            System.out.println(Thread.currentThread().getName()+"---啟動");

            synchronized (lock) {

                while (leftCount <= maxLimit) {

                    leftCount++;
                    System.out.println(Thread.currentThread().getName()+"---生產者生產資源---" + leftCount);

                    try {

                        if(leftCount==maxLimit) {
                            System.out.println(Thread.currentThread().getName()+"---生產者開始阻塞---釋放鎖---" + leftCount);
                            lock.wait();
                        }
                        //通知消費者
                        lock.notifyAll();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    /**
     * 消費者
     */
    class SyncConsumer implements Runnable{


        @Override
        public void run() {

            System.out.println(Thread.currentThread().getName()+"---啟動");
            synchronized (lock){
                //數量為0時,等待
                while (leftCount>0){

                    System.out.println(Thread.currentThread().getName()+"---消費者消費資源---"+leftCount);
                    leftCount--;
                    try {

                        if(leftCount==0) {
                            System.out.println(Thread.currentThread().getName()+"---消費者開始阻塞---" + leftCount);
                            lock.wait();
                        }
                        //通知生產者
                        lock.notifyAll();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}


二、ReentrantLock實現

package com.duchong.queue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author DUCHONG
 * @since 2020-09-18 14:52
 **/
public class ReentrantLockDemo {

    //剩餘資源數量
    static int leftCount=0;
    //最大資源數量
    static int maxLimit=5;
    //一個lock物件
    Lock lock=new ReentrantLock();
    Condition provider=lock.newCondition();
    Condition consumer=lock.newCondition();

    public static void main(String[] args) {
        ReentrantLockDemo lockDemo = new ReentrantLockDemo();

        new Thread(lockDemo.new LockProvider(),"provider-thread").start();
        new Thread(lockDemo.new LockConsumer(),"consumer-thread").start();
    }

    /**
     * 生產者
     */
    class LockProvider implements Runnable{

        @Override
        public void run() {

            System.out.println(Thread.currentThread().getName()+"---啟動");

            lock.lock();

            try {
                while (leftCount <= maxLimit) {

                    leftCount++;
                    System.out.println(Thread.currentThread().getName()+"---生產者生產資源---" + leftCount);

                    if(leftCount==maxLimit) {
                        System.out.println(Thread.currentThread().getName()+"---生產者開始阻塞---釋放鎖---" + leftCount);
                        provider.await();
                    }
                    //通知消費者
                    consumer.signalAll();
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally {
                //釋放鎖
                lock.unlock();
            }
        }
    }

    /**
     * 消費者
     */
    class LockConsumer implements Runnable{


        @Override
        public void run() {

            System.out.println(Thread.currentThread().getName()+"---啟動");
            lock.lock();
            try {
                //數量為0時,等待
                while (leftCount>0){

                    System.out.println(Thread.currentThread().getName()+"---消費者消費資源---"+leftCount);
                    leftCount--;
                    if(leftCount==0) {
                        System.out.println(Thread.currentThread().getName()+"---消費者開始阻塞---" + leftCount);
                        consumer.await();
                    }
                    //通知生產者
                    provider.signalAll();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
        }
    }
}

三、BlockQueue實現

package com.duchong.queue;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author DUCHONG
 * @since 2020-09-17 18:23:09
 **/
public class LinkBlockQueueDemo {


    private static final int MAX_CAPACITY = 20;
    private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(MAX_CAPACITY);

    public static void main(String[] args)throws Exception {

        new Thread(new Provider(queue),"provider-thread").start();

        TimeUnit.SECONDS.sleep(2L);

        new Thread(new Consumer(queue),"consumer-thread").start();

    }
}

class Consumer implements Runnable {

    LinkedBlockingQueue queue;
    public Consumer(LinkedBlockingQueue queue) {
        this.queue=queue;
    }

    @Override
    public void run() {

        System.out.println("---"+Thread.currentThread().getName()+"---啟動");
        while (true) {
            try {

                System.out.println(Thread.currentThread().getName()+"---消費---" + queue.take());
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


 class Provider implements Runnable {

     LinkedBlockingQueue queue;
     public Provider(LinkedBlockingQueue queue) {
         this.queue=queue;
     }

     @Override
    public void run() {

         System.out.println("---"+Thread.currentThread().getName()+"---啟動");

        while (true) {
            try {
                for (int i = 1; i <= 30; i++) {
                    queue.put("food-"+i);
                    System.out.println(Thread.currentThread().getName()+"---生產---food-"+i);
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}