生产者消费者模型详解以及实现

article/2025/9/28 8:07:37

生产者消费者模式

我们先来看看什么是生产者消费者模式,生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,又或是多个生产者对应多个消费者时,大家可能会手忙脚乱。如何才能让大家更好地配合呢?这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。

使用生产者消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。而中间的 3 和 4 分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。

 

那么什么时候阻塞线程需要被唤醒呢?有两种情况。第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。

 

如何用 BlockingQueue 实现生产者消费者模式

我们接下来看如何用 wait/notify/Condition/BlockingQueue 实现生产者消费者模式,先从最简单的 BlockingQueue 开始讲起:

 

public static void main(String[] args) {BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);Runnable producer = () -> {while (true) {queue.put(new Object());}};new Thread(producer).start();
new Thread(producer).start();Runnable consumer = () -> {while (true) {queue.take();
}};
new Thread(consumer).start();
new Thread(consumer).start();
}

如代码所示,首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue,命名为 queue 并将它的容量设置为 10;其次,创建一个简单的生产者,while(true) 循环体中的queue.put() 负责往队列添加数据;然后,创建两个生产者线程并启动;同样消费者也非常简单,while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。为了代码简洁并突出设计思想,代码里省略了 try/catch 检测,我们不纠结一些语法细节。以上便是利用 BlockingQueue 实现生产者消费者模式的代码。虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

 

生产者消费者模型首先要创建一个生产者线程,一个消费者线程,阻塞队列为共享资源,生产者不产者向阻塞队列放东西,消费者从阻塞队列拿东西。这里实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

 

如何用 Condition 实现生产者消费者模式

BlockingQueue 实现生产者消费者模式看似简单,背后却暗藏玄机,我们在掌握这种方法的基础上仍需要掌握更复杂的实现方法。我们接下来看如何在掌握了 BlockingQueue 的基础上利用 Condition 实现生产者消费者模式,它们背后的实现原理非常相似,相当于我们自己实现一个简易版的 BlockingQueue:

public class MyBlockingQueueForCondition {private Queue queue;private int max = 16;private ReentrantLock lock = new ReentrantLock();private Condition notEmpty = lock.newCondition();private Condition notFull = lock.newCondition();public MyBlockingQueueForCondition(int size) {this.max = size;queue = new LinkedList();}public void put(Object o) throws InterruptedException {lock.lock();try {while (queue.size() == max) {notFull.await();}queue.add(o);notEmpty.signalAll();} finally {lock.unlock();}}public Object take() throws InterruptedException {lock.lock();try {while (queue.size() == 0) {notEmpty.await();}Object item = queue.remove();notFull.signalAll();return item;} finally {lock.unlock();}}
}

如代码所示,首先,定义了一个队列变量 queue 并设置最大容量为 16;其次,定义了一个 ReentrantLock 类型的 Lock 锁,并在 Lock 锁的基础上创建两个 Condition,一个是 notEmpty,另一个是 notFull,分别代表队列没有空和没有满的条件;最后,声明了 put 和 take 这两个核心方法。

 

因为生产者消费者模式通常是面对多线程的场景,需要一定的同步措施保障线程安全,所以在 put 方法中先将 Lock 锁上,然后,在 while 的条件里检测 queue 是不是已经满了,如果已经满了,则调用 notFull 的 await() 阻塞生产者线程并释放 Lock,如果没有满,则往队列放入数据并利用 notEmpty.signalAll() 通知正在等待的所有消费者并唤醒它们。最后在 finally 中利用 lock.unlock() 方法解锁,把 unlock 方法放在 finally 中是一个基本原则,否则可能会产生无法释放锁的情况。

 

下面再来看 take 方法,take 方法实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后在 finally 中解锁。

 

这里需要注意,我们在 take() 方法中使用 while( queue.size() == 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )。为什么呢?大家思考这样一种情况,因为生产者消费者往往是多线程的,我们假设有两个消费者,第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。

 

如何用 wait/notify 实现生产者消费者模式

最后我们再来看看使用 wait/notify 实现生产者消费者模式的方法,实际上实现原理和Condition 是非常类似的,它们是兄弟关系:

class MyBlockingQueue {private int maxSize;private LinkedList<Object> storage;public MyBlockingQueue(int size) {this.maxSize = size;storage = new LinkedList<>();}public synchronized void put() throws InterruptedException {while (storage.size() == maxSize) {wait();}storage.add(new Object());notifyAll();}public synchronized void take() throws InterruptedException {while (storage.size() == 0) {wait();}System.out.println(storage.remove());notifyAll();}
}

如代码所示,最主要的部分仍是 take 与 put 方法,我们先来看 put 方法,put 方法被 synchronized 保护,while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样,take 方法也被 synchronized 修饰,while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。使用这个 MyBlockingQueue 实现的生产者消费者代码如下:

 

生成者消费者完整代码,写三个类的实现方法,

阻塞队列类

/**
* 描述:     wait形式实现生产者消费者模式
*/
public class WaitStyle {public static void main(String[] args) {MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);Producer producer = new Producer(myBlockingQueue);Consumer consumer = new Consumer(myBlockingQueue);new Thread(producer).start();new Thread(consumer).start();}
}

生产者

class Producer implements Runnable {private MyBlockingQueue storage;public Producer(MyBlockingQueue storage) {this.storage = storage;}@Overridepublic void run() {for (int i = 0; i < 100; i++) {try {storage.put();} catch (InterruptedException e) {e.printStackTrace();}}}
}

 

消费者

class Consumer implements Runnable {private MyBlockingQueue storage;public Consumer(MyBlockingQueue storage) {this.storage = storage;}@Overridepublic void run() {for (int i = 0; i < 100; i++) {try {storage.take();} catch (InterruptedException e) {e.printStackTrace();}}}
}

 

思考:为什么只需要wait和notify就可以代替两个条件变量

1. 当队列为空时,消费者线程阻塞,否则唤醒消费者线程

2. 当队列为满时,生产者线程阻塞,否则唤醒生成者线程

 

因为消费者只从队列里面拿数据,调用take方法,而生产者只从队列放东西,调用put方法,这两个方法是独立的。

 


http://chatgpt.dhexx.cn/article/3fx6TQff.shtml

相关文章

Java生产者消费者模型的五种实现方式

转自&#xff1a;https://juejin.im/entry/596343686fb9a06bbd6f888c 前言 生产者和消费者问题是线程模型中的经典问题&#xff1a;生产者和消费者在同一时间段内共用同一个存储空间&#xff0c;生产者往存储空间中添加产品&#xff0c;消费者从存储空间中取走产品&#xff0c…

生产者消费者模型---详解及代码实现

概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯&#xff0c;而通过阻塞队列来进行通讯&#xff0c;所以生产者生产完数据之后不用等待消费者处理&#xff0c;直接扔给阻塞队列&#xff0c;消费者不找生产者要数据…

生产消费者模型

生产消费者模型中包含三个部分&#xff0c;生产者、消费者和交易场所。其中涉及如下的关系&#xff1a; &#xff08;1&#xff09;生产者和生产者之间的关系&#xff1a;由于生产者的生产面向的都是交易场所&#xff0c;所以生产者之间是存在竞争关系的&#xff0c;就像一家超…

生产者-消费者模型

什么是生产者消费者模型 生产者 - 消费者模型&#xff08; Producer-consumer problem&#xff09; 是一个非常经典的多线程并发协作的模型&#xff0c;在分布式系统里非常常见。 这个模型由两类线程和一个缓冲区组成来组成 生产者线程&#xff1a;生产数据&#xff0c;并把…

最长上升子序列和最长公共子序列

文章目录 文章目录 文章目录一、基本知识二、最长上升子序列1.朴素版2.二分版 三、最长公共子序列 一、基本知识 1.子串和子序列的区别&#xff1a; 子串必须连续,子序列可以不连续。 2.最长上升子序列(LIS)&#xff1a; 是指一个序列中最长的单调递增的子序列。 3.最长公共…

求最长子序列及回溯

D - 最长公共子序列问题 Description 给定两个序列 X{x1,x2,…,xm} 和 Y{y1,y2,…,yn}&#xff0c;找出X和Y的最长公共子序列。 Input 输入数据有多组&#xff0c;每组有两行 &#xff0c;每行为一个长度不超过500的字符串&#xff08;输入全是大写英文字母&#xff08;A,Z…

算法14-求最长子序列

题目&#xff1a; 给定数组arr。求最长的有序子序列的长度&#xff0c;返回长度int 分析&#xff1a; 1 要求的子串是有序的&#xff0c;就要比大小 2 用最暴力大方法&#xff0c;看成窗口问题&#xff0c;每一个元素求出它左边的最长序列子串&#xff0c;写入一个数组dp&…

最长子序列——动态规划

动态规划算法通常用于求解具有某种最优性质的问题。动态规划算法与分治法类似&#xff0c;其基本思想也是将待求解问题分解成若干个子问题&#xff0c;先求解子问题&#xff0c;然后从这些子问题的解得到原问题的解。与分治法不同的是&#xff0c;适合于用动态规划求解的问题&a…

最长公共子串与最长子序列

一 序 本文属于《图解算法》系列&#xff0c;上一篇整理了动态规划&#xff0c;动态规划可以帮助我们解决给定约束条件下找到最优解&#xff0c;例如背包问题。 在问题可分解为彼此独立且离散的子问题时&#xff0c;就可使用动态规划来解决。 在看个例子&#xff0c;求两个字…

动态规划:最长子序列

最长公共子序列&#xff1a; 链接&#xff1a;https://www.nowcoder.com/questionTerminal/9ae56e5bdf4f480387df781671db5172 题目&#xff1a; 我们有两个字符串m和n&#xff0c;如果它们的子串a和b内容相同&#xff0c;则称a和b是m和n的公共子序列。子串中的字符不一定在原字…

C++最长子序列

LeedCode-300. 最长上升子序列 LeetCode-300. 最长上升子序列 方法一&#xff1a;O(n^2)可能会超时&#xff1b;方法二&#xff1a;贪心二分法&#xff0c;使用lower_bound()&#xff1b; 下面是贪心二分算法&#xff1a; 由于要得到最长的递增子序列&#xff0c;就要让序列…

数组:最长子序列问题四种解法

数组&#xff1a;最长子序列问题四种解法 问题描述&#xff1a; 给定一个整数数组 nums &#xff0c;找到一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 示例 1 : 输入: [-2,1,-3,4,-1,2,1,-5,4], 输出: 6 解释: 连续子…

动态规划:最长子序列问题

关于动态规划中的最长子序列问题有很多优秀的解读&#xff0c;在这里推荐一位博主的关于最长子序列的文章&#xff0c;非常不错&#xff0c;配有大量的图片和文字解答&#xff0c;在这里推荐给大家。本文章转载自这里 1.基本概念 首先需要科普一下&#xff0c;最长公共子序列…

一道有趣的最长子序列问题

一道有趣的最长子序列问题 – 潘登同学的金融经济学笔记 文章目录 一道有趣的最长子序列问题 -- 潘登同学的金融经济学笔记 来源求解递推公式算法实现 来源 前几天在刷视频的时候&#xff0c;发现了这样一道题 所谓子序列就是一个序列 a i 1 , a i 2 , ⋯ , a i n a_{i1},a_{…

最长子序列最长子串的题型汇总

1.最长公共子序列的长度 题目&#xff1a;对于两个字符串&#xff0c;请设计一个高效算法&#xff0c;求他们的最长公共子序列的长度&#xff0c;这里的最长公共子序列定义为有两个序列U1,U2,U3...Un和V1,V2,V3...Vn,其中Ui&ltUi1&#xff0c;Vi&ltVi1。且A[Ui] B[Vi…

动态规划解最长子序列问题

动态规划法 经常会遇到复杂问题不能简单地分解成几个子问题&#xff0c;而会分解出一系列的子问题。简单地采用把大问题分解成子问题&#xff0c;并综合子问题的解导出大问题的解的方法&#xff0c;问题求解耗时会按问题规模呈幂级数增加。 为了节约重复求相同子问题的时间&…

最长****子序列

&#xff08;在研读大佬写的博客后&#xff0c;打算记录下自己学习过程&#xff09; 通过最长上升子序列的拓展了解到&#xff0c;其实这是一个系列的问题主要代表有&#xff1a; 1 最长上升子序列 2 最长不上升子序列 3 最长下降子序列 4 最长不下降子序列 就以最长上升子…

最长公共子序列

最长公共子序列&#xff08;Longest Common Subsequence&#xff0c;简称 LCS&#xff09;是一道非常经典的面试题目&#xff0c;因为它的解法是典型的二维动态规划&#xff0c;大部分比较困难的字符串问题都和这个问题一个套路&#xff0c;比如说编辑距离。而且&#xff0c;这…

最长公共子序列(LCS) 过程图解

1.基本概念 首先需要科普一下&#xff0c;最长公共子序列&#xff08;longest common sequence&#xff09;和最长公共子串&#xff08;longest common substring&#xff09;不是一回事儿。什么是子序列呢&#xff1f;即一个给定的序列的子序列&#xff0c;就是将给定序列中零…

最长子序列问题详解

提到最长子序列问题&#xff0c;想必大家都不陌生&#xff0c;今天我主要想分享一下我对子序列问题的一些理解&#xff1a; 先拿最长上升子序列问题来说吧&#xff1a; 很明显这是一个动态规化问题&#xff0c;仔细想想也不难得出其状态转移方程 首先介绍一下dp[]数组的含义…