一: 什么是生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
简单来说:
生产者消费者模型就是指,在一个系统中,存在两种角色,一个为生产者,一个为消费者,通过一个缓冲区(仓库)进行通信,生产者将生产的产品放入仓库,消费者从仓库中取产品。当仓库满时,生产者阻塞,当仓库空时,消费者阻塞。
二: 关系图
三: 实现方式
- 采用 wait—notify 的方式
- 采用 阻塞队列 方式
3.1 wait—notify 方式
3.1.1 举例1
生产者类
/*** 生产者类* 实现runnable接口* @author DH**/
public class Producer implements Runnable{private BufferArea ba;//通过传入参数的方式是使得对象相同,具有互斥锁的效果。public Producer(BufferArea ba){this.ba = ba;}@Overridepublic void run() {while(true){setIntervalTime();ba.set();//生产产品}}//设置时间间隔public void setIntervalTime(){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
消费者类
/*** 消费者类* 实现runnable接口* @author DH**/
public class Consumer implements Runnable{private BufferArea ba;public Consumer(BufferArea ba){this.ba = ba;}@Overridepublic void run() {while(true){setIntervalTime();ba.get();//消费产品}}//设置时间间隔public void setIntervalTime(){try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}
}
仓库
/*** 仓库* 缓冲区* wait()/notify()* @author DH**/
public class BufferArea {private int currNum = 0;//当前仓库的产品数量private int maxNum = 10;//仓库最大产品容量public synchronized void set(){if(currNum<maxNum){currNum++;System.out.println(Thread.currentThread().getName()+" 生产了一件产品!当前产品数为:"+currNum);notifyAll();}else{//当前产品数大于仓库的最大容量try {System.out.println(Thread.currentThread().getName()+" 开始等待!当前仓库已满,产品数为:"+currNum);wait();} catch (InterruptedException e) {e.printStackTrace();}}}public synchronized void get(){if(currNum>0){//仓库中有产品currNum--;System.out.println(Thread.currentThread().getName()+" 获得了一件产品!当前产品数为:"+currNum);notifyAll();}else{try {System.out.println(Thread.currentThread().getName()+" 开始等待!当前仓库为空,产品数为:"+currNum);wait();} catch (InterruptedException e) {e.printStackTrace();}}}
}
测试类
/*** 测试类* @author DH**/
public class MainCode {public static void main(String[] args) {//同一个仓库BufferArea ba = new BufferArea();//三个生产者Producer p1 = new Producer(ba);Producer p2 = new Producer(ba);Producer p3 = new Producer(ba);//三个消费者Consumer c1 = new Consumer(ba);Consumer c2 = new Consumer(ba);Consumer c3 = new Consumer(ba);//创建线程,并给线程命名Thread t1 = new Thread(p1,"生产者1");Thread t2 = new Thread(p2,"生产者2");Thread t3 = new Thread(p3,"生产者3");Thread t4 = new Thread(c1,"消费者1");Thread t5 = new Thread(c2,"消费者2");Thread t6 = new Thread(c3,"消费者3");//使线程进入就绪状态t1.start();t2.start();t3.start();t4.start();t5.start();t6.start();}
}
通过设置生产者消费者的时间间隔,可以测试仓库满和空时的情况,当生产者时间间隔小,表示生产的快,会出现仓库满了的情况。测试结果如下。
生产者2 生产了一件产品!当前产品数为:1
生产者1 生产了一件产品!当前产品数为:2
生产者3 生产了一件产品!当前产品数为:3
生产者3 生产了一件产品!当前产品数为:4
生产者1 生产了一件产品!当前产品数为:5
生产者2 生产了一件产品!当前产品数为:6
生产者1 生产了一件产品!当前产品数为:7
生产者3 生产了一件产品!当前产品数为:8
生产者2 生产了一件产品!当前产品数为:9
生产者3 生产了一件产品!当前产品数为:10
生产者2 开始等待!当前仓库已满,产品数为:10
生产者1 开始等待!当前仓库已满,产品数为:10
消费者1 获得了一件产品!当前产品数为:9
消费者2 获得了一件产品!当前产品数为:8
当消费者时间间隔小,表示消费的快,会出现仓库为空的情况。测试结果如下。
消费者2 开始等待!当前仓库为空,产品数为:0
生产者3 生产了一件产品!当前产品数为:1
生产者2 生产了一件产品!当前产品数为:2
生产者1 生产了一件产品!当前产品数为:3
消费者2 获得了一件产品!当前产品数为:2
消费者1 获得了一件产品!当前产品数为:1
消费者3 获得了一件产品!当前产品数为:0
消费者3 开始等待!当前仓库为空,产品数为:0
3.1.2 举例2
产品类(仓库)
package test.exception.producer_consumer_model;/*
假设为产品为笔*/public class Production {private String type = "";private String color = "";private long code = 0; // 产品编号private boolean isProduced = false; // 是否生产完成 初始状态为未生产状态private boolean isContinueProduce = true; // 是否停产该产品public void setContinueProduce(boolean continueProduce) {isContinueProduce = continueProduce;}public void setCode(long code) {this.code = code;}public Production(){}public boolean isContinueProduce() {return isContinueProduce;}public void setType(String type) {this.type = type;}public void setColor(String color) {this.color = color;}public void setProduced(boolean produced) {isProduced = produced;}public boolean isProduced() {return isProduced;}@Overridepublic String toString() {return color + type + "-" + code;}
}
生产者
package test.exception.producer_consumer_model;public class Producer implements Runnable {private final Production pen; // 产品public Producer(Production pen) {this.pen = pen;}// 生产public void produce() {long code = 0;while (this.pen.isContinueProduce()) {synchronized (this.pen) {if (this.pen.isProduced()) {try {this.pen.wait(); // 等待消费者消费} catch (InterruptedException e) {e.printStackTrace();}}// 开始生产this.pen.setType("铅笔");this.pen.setColor("蓝色");this.pen.setCode(code++);this.pen.setProduced(true);System.out.println(this.pen + " is produced");this.pen.notify();}}System.out.println("finish producing");}@Overridepublic void run() {produce();}
}
消费者
package test.exception.producer_consumer_model;public class Consumer implements Runnable {private final Production pen;public Consumer(Production pen) {this.pen = pen;}// 持续消费public void consumer() {while (this.pen.isContinueProduce()) {synchronized (this.pen) {if (!this.pen.isProduced()) {try {this.pen.wait(); // 等待生产者生产} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(this.pen + " is consumed"); // 使用this.pen.setProduced(false); // 使用完后更新状态this.pen.notify();}}// 确保停止生产后,能够使用最后生产的一支笔if (this.pen.isProduced()) {System.out.println(this.pen + " is consumed");}System.out.println("finish using");}@Overridepublic void run() {consumer();}
}
测试类
package test.exception.producer_consumer_model;public class Demo {public static void main(String[] args) throws InterruptedException {Production pen = new Production();Consumer consumer = new Consumer(pen);Producer producer = new Producer(pen);new Thread(producer).start(); // 开启生产者线程new Thread(consumer).start(); // 开启消费者线程Thread.sleep(10000);pen.setContinueProduce(false); // 10s后停止生产该类型的笔}
}
运行结果
3.2 阻塞队列方式
这里因为篇幅原因, 详细的实现方式可以看我的这篇博客; https://blog.csdn.net/m0_50370837/article/details/124339524
参考文章: Java的生产者消费者模型_Hi--Man的博客-CSDN博客
https://www.jb51.net/article/187908.htm