三种方式实现生产者-消费者模型

article/2025/9/12 16:59:33

前言

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

看完了定义,相信懵逼的依然懵逼,那我就来说人话吧。
生产者消费者模型需要抓住“三个主体,三个要点“,三个主体是指:生产者消费者缓冲区。生产者往缓冲区放数据,消费者从缓冲区取数据。
生产者-消费者模型
整个模型大致就是上面图示的结构。
三个要点是指:

  • 缓冲区有固定大小
  • 缓冲区满时,生产者不能再往缓冲区放数据(产品),而是被阻塞,直到缓冲区不是满的
  • 缓冲区为空时,消费者不能再从缓冲区取数据,而是被阻塞,直到缓冲区不是空的。

因为数据(产品)往往是先生产出来的先被消费。所以缓冲区一般用有界队列实现,又由于生产者、消费者在特定情况下需要被阻塞,所以更具体一点,缓冲区一般用有界阻塞队列来实现。
本篇用三种方式实现生产者-消费者模型:wait/notify + 队列、Lock/Condition + 队列、有界阻塞队列。

wait/notify + 队列

实现生产者-消费者模型,主要是实现两个核心方法:往缓冲区中放元素、从缓冲区中取元素。
以下是缓冲区的代码实现,是生产者-消费者模型的核心。

import java.util.LinkedList;
import java.util.Queue;/*** wait/notify机制实现生产者-消费者模型*/
public class ProducerConsumerQueue<E> {/*** 队列最大容量*/private final static int QUEUE_MAX_SIZE = 3;/*** 存放元素的队列*/private Queue<E> queue;public ProducerConsumerQueue() {queue = new LinkedList<>();}/*** 向队列中添加元素** @param e* @return*/public synchronized boolean put(E e) {// 如果队列是已满,则阻塞当前线程while (queue.size() == QUEUE_MAX_SIZE) {try {wait();} catch (InterruptedException e1) {e1.printStackTrace();}}// 队列未满,放入元素,并且通知消费线程queue.offer(e);System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());notify();return true;}/*** 从队列中获取元素* @return*/public synchronized E get() {// 如果队列是空的,则阻塞当前线程while (queue.isEmpty()) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}// 队列非空,取出元素,并通知生产者线程E e = queue.poll();System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());notify();return e;}
}

实现了缓冲区后,对于生产者、消费者线程的实现就比较简单了

/*** 生产者线程*/
public class Producer implements Runnable {private ProducerConsumerQueue<Integer> queue;public Producer(ProducerConsumerQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {for (int i = 0; i < 10; i++) {queue.put(i);}}
}/*** 消费者线程*/
public class Consumer implements Runnable {private ProducerConsumerQueue<Integer> queue;public Consumer(ProducerConsumerQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {for (int i = 0; i < 10; i++) {queue.get();}}
}

测试代码如下:

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ProducerConsumerDemo {private final static ExecutorService service = Executors.newCachedThreadPool();public static void main(String[] args) throws InterruptedException {Random random = new Random();// 生产者-消费者模型缓冲区ProducerConsumerQueue<Integer> queue = new ProducerConsumerQueue<>();Producer producer = new Producer(queue);Consumer consumer = new Consumer(queue);for (int i = 0; i < 3; i++) {// 休眠0-50毫秒,增加随机性Thread.sleep(random.nextInt(50));service.submit(producer);}for (int i = 0; i < 3; i++) {// 休眠0-50毫秒,增加随机性Thread.sleep(random.nextInt(50));service.submit(consumer);}// 关闭线程池service.shutdown();}
}

执行结果(由于执行结果比较长,所以截取部分结果)

pool-1-thread-1 -> 生产元素,元素个数为:1
pool-1-thread-1 -> 生产元素,元素个数为:2
pool-1-thread-1 -> 生产元素,元素个数为:3
pool-1-thread-4 -> 消费元素,元素个数为:2
pool-1-thread-1 -> 生产元素,元素个数为:3
pool-1-thread-4 -> 消费元素,元素个数为:2
pool-1-thread-3 -> 生产元素,元素个数为:3
pool-1-thread-4 -> 消费元素,元素个数为:2
pool-1-thread-4 -> 消费元素,元素个数为:1
pool-1-thread-4 -> 消费元素,元素个数为:0
pool-1-thread-2 -> 生产元素,元素个数为:1
pool-1-thread-2 -> 生产元素,元素个数为:2
pool-1-thread-2 -> 生产元素,元素个数为:3
pool-1-thread-4 -> 消费元素,元素个数为:2
......

虽然是部分结果,但是依然可以看出几点:

  • 由于队列的最大长度是3(QUEUE_MAX_SIZE),所以缓冲区元素不会超过3,说明缓冲区满时,生产者确实被阻塞了
  • 缓冲区元素个数最小为0,不会出现负数,说明缓冲区为空时,消费者被阻塞了

这就是生产者-消费者模型基于wait/notify+队列的基本实现。

Lock/Condition + 队列

同样,核心部分缓冲区的实现代码实现如下:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** Lock/Condition实现生产者-消费者模型*/
public class ProducerConsumerQueue<E> {/*** 队列最大容量*/private final static int QUEUE_MAX_SIZE = 3;/*** 存放元素的队列*/private Queue<E> queue;private final Lock lock = new ReentrantLock();private final Condition producerCondition = lock.newCondition();private final Condition consumerCondition = lock.newCondition();public ProducerConsumerQueue() {queue = new LinkedList<>();}/*** 向队列中添加元素* @param e* @return*/public boolean put(E e) {final Lock lock = this.lock;lock.lock();try {while (queue.size() == QUEUE_MAX_SIZE) {// 队列已满try {producerCondition.await();} catch (InterruptedException e1) {e1.printStackTrace();}}queue.offer(e);System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());consumerCondition.signal();} finally {lock.unlock();}return true;}/*** 从队列中取出元素* @return*/public E get() {final Lock lock = this.lock;lock.lock();try {while (queue.isEmpty()) {// 队列为空try {consumerCondition.await();} catch (InterruptedException e1) {e1.printStackTrace();}}E e = queue.poll();System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());producerCondition.signal();return e;} finally {lock.unlock();}}
}

可以看到,代码基本和wait/notify实现方式一致,基本只是API的不同而已。生产者线程、消费者线程、测试代码更是和wait/notify方式一致,所以就不赘述了。

有界阻塞队列

同样,缓冲区的实现也是其核心部分,不过阻塞队列已经提供了相应的阻塞API,所以不需要额外编写阻塞部分的代码

/*** 阻塞队列实现生产者-消费者模型* 对应的阻塞方法是put()/take()*/
public class ProducerConsumerQueue<E> {/*** 队列最大容量*/private final static int QUEUE_MAX_SIZE = 3;/*** 存放元素的队列*/private BlockingQueue<E> queue;public ProducerConsumerQueue() {queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);}/*** 向队列中添加元素* @param e* @return*/public boolean put(E e) {try {queue.put(e);System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());} catch (InterruptedException e1) {e1.printStackTrace();}return true;}/*** 从队列中取出元素* @return*/public E get() {try {E e = queue.take();System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());return e;} catch (InterruptedException e1) {e1.printStackTrace();}return null;}
}

生产者线程、消费者线程、测试代码也和前面两种一模一样。

总结

通过三种方式实现生产者-消费者模型,可以看出使用阻塞队列的方式最简单,也更安全。其实看看阻塞队列的源码,会发现其内部的实现和这里的前两种差不多,只是JDK提供的阻塞队列健壮性更好。

说完了三种实现方式,再来说说为什么要使用生产者-消费者模式,消费者直接调用生产者不好吗?
回顾文章开始的那张图,试想一下,如果没有生产者-消费者模式会怎样,大概会变成如下这样
没有生产者-消费者模式
可以看到,三个生产者,三个消费者就会产生 3 * 3 = 9条调用关系(箭头方法代表数据走向),还有一点就是消费者也有可能还是生产者,生产者也有可能还是消费者,一旦生产者、消费者的数量多了之后就会形成复杂的调用网。所以生产者-消费者模型的最大好处就是解耦
其次如果生产者和消费者的速度上有较大的差异,就一定会存在一方总是在等待另一方的情况。比如快递小哥如果每一个快递都必须直接送到用户手上,如果某个用户一直联系不上,或者说过了很久才取快递,那么快递小哥就只能一直等待。所以就出现了快递站,快递小哥只需要把快递放在指定位置,用户去指定位置取就行了。所以生产者-消费者模型的第二个好处就是平衡生产能力和消费能力的差异

以上就是本篇关于生产者-消费者模型的全部内容。


http://chatgpt.dhexx.cn/article/tBKZures.shtml

相关文章

生产者消费者模型

目录 一、生产者消费者模型的概念 二、生产者消费者模型的特点 三、生产者消费者模型优点 四、基于BlockingQueue的生产者消费者模型 4.1 基本认识 4.2 模拟实现 五、POSIX信号量 5.1 信号量概念 5.2 信号量函数 5.2.1 初始化信号量 5.2.2 销毁信号量 5.2.3 等待信…

打家劫舍问题

打家劫舍问题 最近碰见这种问题实在是太多了,感觉还是有必要学习一下打家劫舍以及其变种问题这一类问题采用的都是动态规划的解法 一些练习题目 6378. 最小化旅行的价格总和 198. 打家劫舍I 213. 打家劫舍 II 337. 打家劫舍 III 2560. 打家劫舍 IV 1 、打家劫舍I 题目…

经典动态规划:打家劫舍系列问题

打家劫舍系列总共有三道&#xff0c;难度设计非常合理&#xff0c;层层递进。第一道是比较标准的动态规划问题&#xff0c;而第二道融入了环形数组的条件&#xff0c;第三道更绝&#xff0c;让盗贼在二叉树上打劫. House Robber | public int rob(int[] nums);题目很容易理解…

【算法】动态规划(三)——打家劫舍系列问题

目录 一、前言 二、打家劫舍 &#xff08;1&#xff09;198. 打家劫舍Ⅰ • 整体代码&#xff1a; &#xff08;2&#xff09;213. 打家劫舍 II • 题目分析 • 整体代码&#xff1a; &#xff08;3&#xff09;337. 打家劫舍Ⅲ • 思路分析 • 整体代码&#xff1a; 三、补充知…

动态规划之打家劫舍系列

前言 打家劫舍问题是一种非常经典的有限制条件的动态规划问题&#xff0c;按理说&#xff0c;不是一种特殊的类型&#xff0c;但是因为力扣上纯纯的出了三道题&#xff08;1&#xff0c;2&#xff0c;3&#xff09;来考察&#xff0c;题目的难度是依次递进的&#xff0c;还结合…

动态规划之打家劫舍

动态规划之打家劫舍 文章目录 动态规划之打家劫舍1. "198. 打家劫舍"2. "198. 打家劫舍&#xff08;变种&#xff1a;输出路径&#xff09;"3. "213. 打家劫舍 II"4. "337. 打家劫舍 III" 1. “198. 打家劫舍” dp数组定义&#xff1a…

oracle 根据部分字段去重

问题&#xff1a;在oracle中使用group by分组&#xff0c;group by子句中必须包含所有的select中的字段和order by子句中的字段。 在不使用group by子句的情况下&#xff0c;进行分组。&#xff08;根据部分字段分组&#xff09; over()分析函数 原sql SELECTIM. ID mediaGrou…

oracle字段去重查询,oracle怎么去重查询

oracle去重查询的方法是&#xff1a; oracle 数据库多字段去重 方法介绍&#xff1a;distinct 关键字、group by 、row_number ()over(partition by 列 order by 列 desc) 我的需求是&#xff1a;根据某几列去重 查询出去重后的全部信息。最后我选择的是第三种方法。 我的想法&…

oracle 数据去重方法

1. 创建表&#xff1a; -- Create table create table TEST_USER (user_id NUMBER(3),user_name VARCHAR2(20),user_age NUMBER(3) ) tablespace GUAN_TABLESPACEpctfree 10initrans 1maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);--测试数据…

oracle 字符串去重

select regexp_replace(1,1,3,5,5, ([^,])(,\1)*(,|$), \1\3) from dual;注意&#xff1a; 但是&#xff0c;这个去重&#xff0c;必须建立在排序的基础上&#xff0c;如果listagg拼接出来的数值像 a, b, a, c 这时候&#xff0c;该正则就会失效。

MYSQL/ORACLE多字段去重-根据某字段去重

通过百度上的答案多数无效 自己搞了个 使用oracle row_number()函数&#xff0c;给每个同名的加一个序号&#xff0c;最后筛选第n个想同的即可 oracle与mysql不同 1.oracel 多字段distinct(字段名去重) group by去重失效 可以用row_number() over(partition) 给同名列加个序号…

Oracle 数据去重

在Oracle数据库中删除重复数据 一&#xff0c;查询及删除重复记录的SQL语句 Person01表&#xff1a; 1. 查询表中多余的重复数据&#xff0c;根据ID字段来判断是否重复 SELECT * FROM PERSON01 WHERE ID IN (SELECT ID FROM PERSON01 GROUP BY ID HAVING COUNT(ID) > 1)…

Oracle根据多列去重

&#xff08;1&#xff09;distinct 关键词 distinct用于返回唯一不同的值&#xff0c;可作用于单列和多列 但必须将其放在开头&#xff0c;否则会提示错误 而若在其后添加多个变量名&#xff0c;则返回的将是这多个变量名不同时重复的列&#xff0c;因而使用distinct筛选某…

oracle 数据库去重查询

oracle数据库中有如下一张表&#xff0c;包含id,loginid,name,researchtime等字段&#xff0c;其中name字段中的数据有重复&#xff0c;查询数据时要重复数据只取一条&#xff0c;利用row_number ()over(partition by 列 order by 列 desc)方法实现 1:select a.,row_number() o…

oracle去重函数

1、distinct &#xff08;1&#xff09;、常用的distinct select distinct column from table; &#xff08;2&#xff09;、统计去重后数量 select count(distinct column) from table;–查去重后数量 &#xff08;3&#xff09;、distinct必须放在开头 select id, distinct n…

oracle 数据库 去重查询

oracle 数据库多字段去重 方法介绍&#xff1a;distinct 关键字、group by 、row_number ()over(partition by 列 order by 列 desc) 我的需求是&#xff1a;根据某几列去重 查询出去重后的全部信息。最后我选择的是第三种方法。 我的想法&#xff1a;我想找出一种更简单的方…

Oracle实现去重的两种方式总结

业务场景 需要查询某数据&#xff0c;由于需要三张表关联查询&#xff0c;查询结果如下&#xff1a; 原始SQL语句 SELECT D.ORDER_NUM AS "申请单号" ,D.CREATE_TIME ,D.EMP_NAME AS "申请人",(SELECT extractvalue(t1.row_data,/root/row/FI13_wasteNam…

mysql默认密码的查找与修改

注&#xff1a;此方法仅可用于初始安装数据库或学习时使用&#xff0c;在实际生产中会使所有数据库文件删除&#xff0c;故应先提前备份相关重要数据&#xff0c;以免造成不必要的损失&#xff0c;请谨慎使用。 若使用mysqld –initialize初始化mysql数据库&#xff0c;会产生一…

rpm安装mysql后密码_CentOs安装Mysql和配置初始密码

装载自&#xff1a;https://www.cnblogs.com/FlyingPuPu/p/7783735.html 一、Mysql下载安装 使用上传命令上传至/home目录&#xff0c;如&#xff1a;rz命令(yum install -y lrzsz) 添加mysql仓库(-Uvh后面接的为你下载的rpm文件名) sudo rpm -Uvh mysql57-community-release-e…

MySQL初始密码的查看

问题&#xff1a;在安装MySQL过程中&#xff0c;以管理员身份运行cmd后进入MySQL的bin目录&#xff0c;然后输入命令“mysqld --initialize”后没有显示初始密码&#xff0c;没办法进行后续的登录怎么办&#xff1f; 1.打开你的MySQL的安装目录下的data文件夹&#xff08;就是…