做题的时候遇到了生产者消费者问题,这个问题可以说是线程学习的经典题目了,就忍不住研究了一波。它描述是有一块缓冲区(队列实现)作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。在Java中这个数组线程阻塞的问题,多个用户同时发送多个请求,怎么保证不发生线程死锁,是我们要考虑的问题。
生产者消费者模式说明:
1.生产者只在仓库未满时进行生产,仓库满时生产者进程被阻塞;
2.消费者只在仓库非空时进行消费,仓库为空时消费者进程被阻塞;
3.当消费者发现仓库为空时会通知生产者生产;
3.当生产者发现仓库满时会通知消费者消费;
实现的关键:
我们知道在JAVA环境中,线程Thread有如下几个状态:
1.新建状态
2.就绪状态
3.运行状态
4.阻塞状态
5.死亡状态
生产者消费者问题就是要控制线程的阻塞状态,保证生产者和消费者进程在一定条件下,一直稳定运行,不出现没有商品但是消费者还是一直购买,商品满了但是生产者还是不断生产导致浪费的情况。
我们考虑线程常用的Sychronized、RetrenLock还有阻塞队列来实现。
(1)Object的wait() / notify()方法
wait(): wait()方法可以让线程进入等待状态,当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
notify():notify随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态。当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
代码实现:
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;/*** 生产者消费者模式:使用Object.wait() / notify()方法实现*/
public class ProducerConsumer {private static final int CAPACITY = 5;
//申请一个容量最大的仓库public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;//队列作为仓库String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){
//while(condition)为自旋锁,为防止该线程没有收到notify()调用也从wait()中返回
//(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全
//地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行
//了,自旋锁当终止条件满足时,才会停止自旋,这里设置了一直执行,直到程序手动停
//止。synchronized(queue){//给队列加锁,保证线程安全while(queue.size() == maxSize){//当队列是满的时候,生产者线程等待,由消费者线程进行操作try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}//队列不为空的时候,生产者被唤醒进行操作System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//因此如果想在一个满的队列中加入一个新项,调用 add() 方法就会抛出一//个 unchecked 异常,而调用 offer() 方法会返回 falsequeue.notifyAll();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){synchronized(queue){while(queue.isEmpty()){try {//队列为空,说明没有生产者生产的商品,消费者进行等待System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();//如果队列元素为空,调用remove() 的行为与 Collection 接口的版本相似会抛出异常,这里是模拟消费者取走商品的过程// 但是新的 poll() 方法在用空集合调用时只是返回 null。因此新的方法更适合容易出现异常条件的情况。System.out.println("[" + name + "] Consuming value : " + x);queue.notifyAll();//唤醒所有队列,消费者和生产者根据队列情况进行操作try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}
}
2. 使用Lock和Condition的await() / signal()方法
Condition接口的await()和signal()是用来做同步的两种方法,它们的功能基本上和Object的wait()/ nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
代码实现:
import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生产者消费者模式:使用Lock和Condition实现*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//队列满的条件private static final Condition emptyCondition = lock.newCondition();//队列空的条件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//这里可以和wait()进行对比,两种控制线程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁,Lock不同于Sychronized,需要手动释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");//队列为空满足条件,消费者线程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}
(3)BlockingQueue阻塞队列方法
我们采用一个阻塞队列来实现。
通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
我们这里使用LinkedBlockingQueue,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()/ signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。
- put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
- take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
代码实现:
import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生产者消费者模式:使用Lock和Condition实现*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//队列满的条件private static final Condition emptyCondition = lock.newCondition();//队列空的条件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//这里可以和wait()进行对比,两种控制线程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);
//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁,Lock不同于Sychronized,需要手动释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
//队列为空满足条件,消费者线程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}
小结:三种实现形式,其实理念都是相同的,都是控制阻塞状态,根据条件去控制线程的运行状态和阻塞状态。生产者消费者模式 为信息传输开辟了一个崭新的概念,因为它的优先级最高,所以即使网络发生堵塞时它也会最先通过,最大程度的保证了设备的安全。也有缺点,就是在网络中的个数是有限制的。生产者消费者模式在设置时比较简单,使用方便安全,在将来的自动化行业必定会大大被人们所认同。
参考资料:
https://blog.csdn.net/u010983881/article/details/78554671#commentBox