生产者消费者模型你知道多少

article/2025/9/12 16:33:25

背景

        进入正题之前先说点故事。从最开始学java的那里开始:我是从08年下半年开始学Java,在《我的六年程序之路》中提到了一些。当时比较简单,每天看尚学堂的视频(对于初学者而言看视频好一些。),然后写代码。比较清楚的记得马士兵讲到生产者消费者模型的时候还大谈特谈要是掌握了这个工资可以+1000(现在回忆起有点像历史一样,多少有些伤感)。那时候已经过年了,我在家的时候把那段代码敲了很多遍,不过基本上是默写下来的,也没有仔细的想过其中的一些细节,不久后我就忘记了其中的写法。这里又衍生出一个学习方法的问题。我在学习上一直有一个毛病:学习的过程中不会刨根问底,这导致很多问题思考的不透彻,在后面很大程度上影响我整个知识体系的扎实程度,也应了那句话:出来混迟早要还的,在学习技术的过程中就是要一步一个脚印,总有一天会豁然开朗,不知不觉的发现所有知识都串起来了,一件很神奇的事情。在技术学习的过程中还是需要有点钻牛角尖的精神,基本上技术上的大牛都有钻牛角尖的“毛病”。

        第二段故事是在2012年,这是正儿八经的使用生产者消费者模型在项目中进行开发,中间也写出了一大堆问题,这个不是写出来的,是慢慢调试出来的。最开始使用wait(),notify(),后来用CountDownLatch。

        第三个故事是前几天的事情,这也是我为什么要写这篇文章的原因。一个同事问我代码的问题,他里面就是用到了生产者消费者,当时我整了半天都没整利索,很没面子。后面虽然找出了问题(这个问题我今天也碰到了),就是习惯性的把整个容器(下面代码的Container)锁住,然后发现只要一个线程wait住,所有线程都不动了,因为这个wait住的线程还持有Container的锁没有释放,其他线程也就进不来。我发现自己对于这个模型并没有完全摸透,就在周末的时候好好的把这个模型从头到尾的写了写,当然从中又有一些新的收获。


正题

上面说了一大堆故事。下面就开始今天的主题: 生产者消费者模型。这里先问两个问题:1、 生产者消费者模型旨在解决什么问题。2、 在现有的场景下哪里会用到

模型图

从下面图中可以发现生产者和消费者之间用中间类似一个队列一样的东西串起来。这个队列可以想像成一个存放产品的“ 仓库”,生产者只需要关心这个“ 仓库”,并不需要关心具体的消费者,对于生产者而言甚至都不知道有这些消费者存在。对于消费者而言他也不需要关心 具体的生产者,到底有多少生产者也不是他关心的事情,他只要关心这个“ 仓库”中还有没有东西。这种模型是一种松耦合模型。这样可以回答我上面提出的第一个问题。这个模型的产生就是为了 复用和解耦。比如常见的消息框架(非常经典的一种生产者消费者模型的使用场景) ActiveMQ。发送端和接收端用Topic进行关联。这个Topic可以理解为我们这里“ 仓库”的地址,这样就可以实现点对点和广播两种方式进行消息的分发。

具体实现

       我的代码里只是简单的模拟了这样一个模型的实现。在写的过程中还是碰到一些问题。编程过程中一旦用多线程复杂度就会陡升。所以编码原则是 能用单线程解决问题就不要去用多线程

UML类图

图中比较简单的画了生产者和消费者之间的关系,如果看到这里还比较迷惑没有关系,下面会有具体的代码展示。当然最好的方式就是自己从头开始写,这样可以知道一些在看文章中遗漏的一些细节。在学习的过程中往往会有这样的感受: 你以为你看懂了,理解了,自己写的时候就发现傻眼了,这也是为什么代码一定要多写,写的过程是即是思考的过程

逻辑分析

下面我会重点讲两个类的代码,Consumer.java和Producer.java,以及我在写的过程中碰到了些什么问题。现在有点发现用一篇的篇幅有点讲不清楚,代码我会在后面贴出来,我估计在这里贴出来也很少人会去看这个代码。从我自己的经验来看一看到一大坨的代码就傻眼了。
Producer.java

Consumer.java

先回答我上面的两把锁的问题,对于生产者和消费者的唤醒和挂起操作分别用了两个监控器。原因很简单,就像我在生产者消费者模型图里面看到的生产者和消费者均可以是一个或者多个。如果这里用一个对象控制,在多个生产者和多个消费者的时候发现:本来要唤醒生产者会把消费者也唤醒,就会比较混乱。
再看第二个问题是: 为什么在三个地方都需要做判满操作?这里看生产者那张图,仔细的看这一段代码,这里的操作 并非原子操作。图中标识的1处判满操作后,到了第3处后并不意味着容器还是满的。如果只是在1处作了校验,那么很有可能会出现这样的情况:在2处把消费者唤醒后,等运行到了第3处发现 这时候的容器可能已经空了!这样就不需要把生产者进行挂起。而如果没有这段校验逻辑很有可能就会出现 生产者和消费者同时挂起的情况,这里可以细细体会一下,或者写代码试验一下。
         这里再说一下wait()和notify()/notifyAll()。这两个操作是一对。这里要注意的第一个问题是对于线程进行wait()和notify()操作都要 锁住持有对象。图中都可以看出代码中对两个monitor都加了锁,不加锁的话就会抛出 java.lang.IllegalMonitorStateException异常。另外一个问题很容易的一个惯性思维就是线程挂起不就是this.wait(),这样可以吗?答案是可以,你只要锁住自己就OK。但是问题来了, 谁来唤醒它?这也是为什么用一个公用对象进行唤醒和挂起线程操作,wait()和notify()属于Object类,任何对象都可以进行这个操作。

代码展示

Producer.java
/*** 生产者* @author 百恼  2013-11-7下午12:18:39**/
public class Producer implements Runnable {//简单的模拟,这里一个生产容器,设置成final类型的话不允许再次赋值private final Container<Bread> container;//生产者线程监听器private final Object producerMonitor;//消费者线程监听器private final Object consumerMonitor;public Producer(Object producerMonitor,Object consumerMonitor,Container<Bread> container){this.producerMonitor = producerMonitor;this.consumerMonitor = consumerMonitor;this.container = container;}/* (non-Javadoc)* @see java.lang.Runnable#run()*/@Overridepublic void run() {while(true){produce();}}public void produce(){//这里为了形象,模拟几个制作面包的步骤step1();Bread bread = step2();//如果发现容器已经满了,生产者要停if(container.isFull()){//唤醒消费者synchronized(consumerMonitor){if(container.isFull()){consumerMonitor.notify();}}//生产者挂起,两把锁的问题synchronized(producerMonitor){try {if(container.isFull()){System.out.println("生产者挂起...");producerMonitor.wait();                        }} catch (InterruptedException e) {e.printStackTrace();}}}else{//容器中还有容量,把面包放到容器内,这里可能会有丢失boolean result = container.add(bread);System.out.println("Producer:"+result);}}public void step1(){}public Bread step2(){return new Bread();}
}
Consumer.java
/*** 消费者* @author 百恼  2013-11-7下午12:18:50**/
public class Consumer implements Runnable{//简单的模拟,这里一个生产容器,设置成final类型的话不允许再次赋值private final Container<Bread> container;//生产者线程监听器private final Object producerMonitor;//消费者线程监听器private final Object consumerMonitor;public Consumer(Object producerMonitor,Object consumerMonitor,Container<Bread> container){this.producerMonitor = producerMonitor;this.consumerMonitor = consumerMonitor;this.container = container;}@Overridepublic void run() {while(true){consume();}}//消费,两把锁的问题public void consume(){//如果发现容器已经满了,生产者要停if(container.isEmpty()){//唤醒生产者synchronized(producerMonitor){if(container.isEmpty()){producerMonitor.notify();}}                //消费者挂起synchronized(consumerMonitor){try {if(container.isEmpty()){System.out.println("消费者挂起。。。");consumerMonitor.wait();                        }} catch (InterruptedException e) {e.printStackTrace();}}}else{//还有面包可以进行消费Bread bread = container.get();System.out.println("bread:"+bread);}       }
}
Container.java
/*** 装产品的容器* @author 百恼 2013-11-9下午02:06:59**/
public class Container<T> {private final int capacity;private final List<T> list;public Container(int capacity){this.capacity = capacity;list = new ArrayList<T>(capacity);}public List<T> getList(){return list;}/*** 添加产品 * @param product*/public synchronized boolean add(T product){if(list.size()<capacity){list.add(product);return true;}return false;}/*** 满* @return*/public synchronized boolean isFull(){if(list.size()>=capacity){return true;}return false;}public synchronized boolean isEmpty(){return list.isEmpty();}public synchronized T get(){if(list.size()>0){return list.remove(0);}return null;}public synchronized int getSize(){return list.size();}public int getCapacity(){return capacity;}
}
Client.java
/*** TODO Comment of Client* @author 百恼 2013-11-7下午12:20:08**/
public class Client {public static void main(String[] args){Object producerMonitor = new Object();Object consumerMonitor = new Object();Container<Bread> container = new Container<Bread>(10);//生产者开动new Thread(new Producer(producerMonitor,consumerMonitor,container)).start();new Thread(new Producer(producerMonitor,consumerMonitor,container)).start();new Thread(new Producer(producerMonitor,consumerMonitor,container)).start();new Thread(new Producer(producerMonitor,consumerMonitor,container)).start();//消费者开动new Thread(new Consumer(producerMonitor,consumerMonitor,container)).start();new Thread(new Consumer(producerMonitor,consumerMonitor,container)).start();}
}

总结

整个生产者消费者模型写到这里就要收尾了,这里可以用concurrent包中的CountDownLatch去实现,原理一样,在控制粒度上会更细一些。有兴趣的同学也可以去实现一下。另外我的模型中有明显的漏洞,代码中也注释出来了,就是有可能出现数据丢失的情况(在把面包加入到容器的过程中)。这又应该怎么处理?


http://chatgpt.dhexx.cn/article/FsfH6QZa.shtml

相关文章

生产者消费者模型详解

生产者消费者模型 文章目录 生产者消费者模型什么是生产者消费者模型基于BlockingQueue的生产者消费者模型单生产者单消费者模型多生产者多消费者模型 什么是生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接…

Python -- 生产者消费者

代码 # -*- coding: utf-8 -*- # Author : markadc # Time : 2021/4/14 11:43from queue import Queue import time import threading# maxsize: 指定队列最大长度 q Queue(maxsize10)# 生产者 def product(name):count 0while True:# 只要队列没有满&#xff0c;就一直…

生产者与消费者

生产者和消费者 目录 生产者和消费者1.什么是生产者和消费者2.生产者和消费者(不加唤醒机制)3.生产者和消费者(加唤醒机制)4.解决虚假唤醒5.使用lock锁6.面试题 1.什么是生产者和消费者 ​ 在日常生活中&#xff0c;我们去商店买东西&#xff0c;我们就是消费者&#xff0c;商…

三种方式实现生产者-消费者模型

前言 生产者消费者问题&#xff08;英语&#xff1a;Producer-consumer problem&#xff09;&#xff0c;也称有限缓冲问题&#xff08;英语&#xff1a;Bounded-buffer problem&#xff09;&#xff0c;是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的…

生产者消费者模型

目录 一、生产者消费者模型的概念 二、生产者消费者模型的特点 三、生产者消费者模型优点 四、基于BlockingQueue的生产者消费者模型 4.1 基本认识 4.2 模拟实现 五、POSIX信号量 5.1 信号量概念 5.2 信号量函数 5.2.1 初始化信号量 5.2.2 销毁信号量 5.2.3 等待信…

打家劫舍问题

打家劫舍问题 最近碰见这种问题实在是太多了,感觉还是有必要学习一下打家劫舍以及其变种问题这一类问题采用的都是动态规划的解法 一些练习题目 6378. 最小化旅行的价格总和 198. 打家劫舍I 213. 打家劫舍 II 337. 打家劫舍 III 2560. 打家劫舍 IV 1 、打家劫舍I 题目…

经典动态规划:打家劫舍系列问题

打家劫舍系列总共有三道&#xff0c;难度设计非常合理&#xff0c;层层递进。第一道是比较标准的动态规划问题&#xff0c;而第二道融入了环形数组的条件&#xff0c;第三道更绝&#xff0c;让盗贼在二叉树上打劫. House Robber | public int rob(int[] nums);题目很容易理解…

【算法】动态规划(三)——打家劫舍系列问题

目录 一、前言 二、打家劫舍 &#xff08;1&#xff09;198. 打家劫舍Ⅰ • 整体代码&#xff1a; &#xff08;2&#xff09;213. 打家劫舍 II • 题目分析 • 整体代码&#xff1a; &#xff08;3&#xff09;337. 打家劫舍Ⅲ • 思路分析 • 整体代码&#xff1a; 三、补充知…

动态规划之打家劫舍系列

前言 打家劫舍问题是一种非常经典的有限制条件的动态规划问题&#xff0c;按理说&#xff0c;不是一种特殊的类型&#xff0c;但是因为力扣上纯纯的出了三道题&#xff08;1&#xff0c;2&#xff0c;3&#xff09;来考察&#xff0c;题目的难度是依次递进的&#xff0c;还结合…

动态规划之打家劫舍

动态规划之打家劫舍 文章目录 动态规划之打家劫舍1. "198. 打家劫舍"2. "198. 打家劫舍&#xff08;变种&#xff1a;输出路径&#xff09;"3. "213. 打家劫舍 II"4. "337. 打家劫舍 III" 1. “198. 打家劫舍” dp数组定义&#xff1a…

oracle 根据部分字段去重

问题&#xff1a;在oracle中使用group by分组&#xff0c;group by子句中必须包含所有的select中的字段和order by子句中的字段。 在不使用group by子句的情况下&#xff0c;进行分组。&#xff08;根据部分字段分组&#xff09; over()分析函数 原sql SELECTIM. ID mediaGrou…

oracle字段去重查询,oracle怎么去重查询

oracle去重查询的方法是&#xff1a; oracle 数据库多字段去重 方法介绍&#xff1a;distinct 关键字、group by 、row_number ()over(partition by 列 order by 列 desc) 我的需求是&#xff1a;根据某几列去重 查询出去重后的全部信息。最后我选择的是第三种方法。 我的想法&…

oracle 数据去重方法

1. 创建表&#xff1a; -- Create table create table TEST_USER (user_id NUMBER(3),user_name VARCHAR2(20),user_age NUMBER(3) ) tablespace GUAN_TABLESPACEpctfree 10initrans 1maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);--测试数据…

oracle 字符串去重

select regexp_replace(1,1,3,5,5, ([^,])(,\1)*(,|$), \1\3) from dual;注意&#xff1a; 但是&#xff0c;这个去重&#xff0c;必须建立在排序的基础上&#xff0c;如果listagg拼接出来的数值像 a, b, a, c 这时候&#xff0c;该正则就会失效。

MYSQL/ORACLE多字段去重-根据某字段去重

通过百度上的答案多数无效 自己搞了个 使用oracle row_number()函数&#xff0c;给每个同名的加一个序号&#xff0c;最后筛选第n个想同的即可 oracle与mysql不同 1.oracel 多字段distinct(字段名去重) group by去重失效 可以用row_number() over(partition) 给同名列加个序号…

Oracle 数据去重

在Oracle数据库中删除重复数据 一&#xff0c;查询及删除重复记录的SQL语句 Person01表&#xff1a; 1. 查询表中多余的重复数据&#xff0c;根据ID字段来判断是否重复 SELECT * FROM PERSON01 WHERE ID IN (SELECT ID FROM PERSON01 GROUP BY ID HAVING COUNT(ID) > 1)…

Oracle根据多列去重

&#xff08;1&#xff09;distinct 关键词 distinct用于返回唯一不同的值&#xff0c;可作用于单列和多列 但必须将其放在开头&#xff0c;否则会提示错误 而若在其后添加多个变量名&#xff0c;则返回的将是这多个变量名不同时重复的列&#xff0c;因而使用distinct筛选某…

oracle 数据库去重查询

oracle数据库中有如下一张表&#xff0c;包含id,loginid,name,researchtime等字段&#xff0c;其中name字段中的数据有重复&#xff0c;查询数据时要重复数据只取一条&#xff0c;利用row_number ()over(partition by 列 order by 列 desc)方法实现 1:select a.,row_number() o…

oracle去重函数

1、distinct &#xff08;1&#xff09;、常用的distinct select distinct column from table; &#xff08;2&#xff09;、统计去重后数量 select count(distinct column) from table;–查去重后数量 &#xff08;3&#xff09;、distinct必须放在开头 select id, distinct n…

oracle 数据库 去重查询

oracle 数据库多字段去重 方法介绍&#xff1a;distinct 关键字、group by 、row_number ()over(partition by 列 order by 列 desc) 我的需求是&#xff1a;根据某几列去重 查询出去重后的全部信息。最后我选择的是第三种方法。 我的想法&#xff1a;我想找出一种更简单的方…