生產者和消費者簡單實現
阿新 • • 發佈:2020-10-09
一、synchronized實現
package com.duchong.queue; /** * @author DUCHONG * @since 2020-09-17 18:23 **/ public class SynchronizedDemo { //物件鎖 static Object lock=new Object(); //剩餘資源數量 static int leftCount=0; //最大資源數量 static int maxLimit=5; public static void main(String[] args)throws Exception { SynchronizedDemo synchronizedDemo = new SynchronizedDemo(); new Thread(synchronizedDemo.new SyncProvider(),"provider-thread").start(); new Thread(synchronizedDemo.new SyncConsumer(),"consumer-thread").start(); } /** * 生產者 */ class SyncProvider implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()+"---啟動"); synchronized (lock) { while (leftCount <= maxLimit) { leftCount++; System.out.println(Thread.currentThread().getName()+"---生產者生產資源---" + leftCount); try { if(leftCount==maxLimit) { System.out.println(Thread.currentThread().getName()+"---生產者開始阻塞---釋放鎖---" + leftCount); lock.wait(); } //通知消費者 lock.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } /** * 消費者 */ class SyncConsumer implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()+"---啟動"); synchronized (lock){ //數量為0時,等待 while (leftCount>0){ System.out.println(Thread.currentThread().getName()+"---消費者消費資源---"+leftCount); leftCount--; try { if(leftCount==0) { System.out.println(Thread.currentThread().getName()+"---消費者開始阻塞---" + leftCount); lock.wait(); } //通知生產者 lock.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
二、ReentrantLock實現
package com.duchong.queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author DUCHONG * @since 2020-09-18 14:52 **/ public class ReentrantLockDemo { //剩餘資源數量 static int leftCount=0; //最大資源數量 static int maxLimit=5; //一個lock物件 Lock lock=new ReentrantLock(); Condition provider=lock.newCondition(); Condition consumer=lock.newCondition(); public static void main(String[] args) { ReentrantLockDemo lockDemo = new ReentrantLockDemo(); new Thread(lockDemo.new LockProvider(),"provider-thread").start(); new Thread(lockDemo.new LockConsumer(),"consumer-thread").start(); } /** * 生產者 */ class LockProvider implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()+"---啟動"); lock.lock(); try { while (leftCount <= maxLimit) { leftCount++; System.out.println(Thread.currentThread().getName()+"---生產者生產資源---" + leftCount); if(leftCount==maxLimit) { System.out.println(Thread.currentThread().getName()+"---生產者開始阻塞---釋放鎖---" + leftCount); provider.await(); } //通知消費者 consumer.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放鎖 lock.unlock(); } } } /** * 消費者 */ class LockConsumer implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()+"---啟動"); lock.lock(); try { //數量為0時,等待 while (leftCount>0){ System.out.println(Thread.currentThread().getName()+"---消費者消費資源---"+leftCount); leftCount--; if(leftCount==0) { System.out.println(Thread.currentThread().getName()+"---消費者開始阻塞---" + leftCount); consumer.await(); } //通知生產者 provider.signalAll(); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
三、BlockQueue實現
package com.duchong.queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @author DUCHONG * @since 2020-09-17 18:23:09 **/ public class LinkBlockQueueDemo { private static final int MAX_CAPACITY = 20; private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(MAX_CAPACITY); public static void main(String[] args)throws Exception { new Thread(new Provider(queue),"provider-thread").start(); TimeUnit.SECONDS.sleep(2L); new Thread(new Consumer(queue),"consumer-thread").start(); } } class Consumer implements Runnable { LinkedBlockingQueue queue; public Consumer(LinkedBlockingQueue queue) { this.queue=queue; } @Override public void run() { System.out.println("---"+Thread.currentThread().getName()+"---啟動"); while (true) { try { System.out.println(Thread.currentThread().getName()+"---消費---" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Provider implements Runnable { LinkedBlockingQueue queue; public Provider(LinkedBlockingQueue queue) { this.queue=queue; } @Override public void run() { System.out.println("---"+Thread.currentThread().getName()+"---啟動"); while (true) { try { for (int i = 1; i <= 30; i++) { queue.put("food-"+i); System.out.println(Thread.currentThread().getName()+"---生產---food-"+i); } } catch (InterruptedException e) { e.printStackTrace(); } } } }