生产者-消费者模型

article/2025/9/28 9:17:30

什么是生产者消费者模型

生产者 - 消费者模型( Producer-consumer problem) 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。
在这里插入图片描述

这个模型由两类线程和一个缓冲区组成来组成

  • 生产者线程:生产数据,并把数据放在这个队列里面
  • 缓冲区:存放生产者的数据的地方
  • 消费者线程:从队列里面取数据,消费数据

运行流程

  • 生产者和消费者在同一时间段内共用同一个存储空间
  • 生产者往存储空间中添加产品
  • 消费者从存储空间中取走产品
  • 当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

为什么要用生产者消费者模型

在多线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中

  • 如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
  • 如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

实现方式

wait()和notify()方法的实现

  • wait方法:执行该方法的对象释放同步锁,JVM把该线程存放到等待池中,等待其他线程唤醒

  • notify方法:唤醒在等待池中的等待的任意一个线程,把线程转移到锁池中

  • notifyAll方法:唤醒在等待池中等待的所有线程,把线程转移到锁池中

public class WaitTest {private static int count = 0;private static final int buffCount = 10;private static String lock = "lock";class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (lock) {while (count == buffCount) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + count);lock.notifyAll();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (lock) {while (count == 0) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:"+ count);lock.notifyAll();}}}}public static void main(String[] args) {WaitTest waitTest = new WaitTest();new Thread(waitTest.new Producer()).start();new Thread(waitTest.new Consumer()).start();new Thread(waitTest.new Producer()).start();new Thread(waitTest.new Consumer()).start();new Thread(waitTest.new Producer()).start();new Thread(waitTest.new Consumer()).start();}}

使用ReentrantLock实现

juc并发包中的 Lock 框架是锁的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。

可重入锁,也叫做递归锁,指的是同一线程的外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响

简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。

已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。


public class LockTest {private static int count = 0;private static final int buffCount = 10;private static Lock lock = new ReentrantLock();//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}lock.lock();try {while (count == buffCount) {try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + count);notEmpty.signal();} finally {lock.unlock();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}lock.lock();try {while (count == 0) {try {notEmpty.await();} catch (InterruptedException e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:"+ count);notFull.signal();} finally {lock.unlock();}}}}public static void main(String[] args) {LockTest lockTest = new LockTest();new Thread(lockTest.new Producer()).start();new Thread(lockTest.new Consumer()).start();new Thread(lockTest.new Producer()).start();new Thread(lockTest.new Consumer()).start();new Thread(lockTest.new Producer()).start();new Thread(lockTest.new Consumer()).start();}
}

使用阻塞队列BlockingQueue实现

BlockingQueue即阻塞队列,拥有队列先进先出的特性,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  • 当队列满了的时候进行入队列操作
  • 当队列空了的时候进行出队列操作

阻塞队列是线程安全的,因为:

  • 当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,
  • 当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。

public class BlockingQueueTest {private static int count = 0;private final BlockingQueue blockingQueue = new LinkedBlockingQueue(10);class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {blockingQueue.put(1);count++;System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {blockingQueue.take();count--;System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:"+ count);} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) {BlockingQueueTest blockingQueueTest = new BlockingQueueTest();new Thread(blockingQueueTest.new Producer()).start();new Thread(blockingQueueTest.new Consumer()).start();new Thread(blockingQueueTest.new Producer()).start();new Thread(blockingQueueTest.new Consumer()).start();new Thread(blockingQueueTest.new Producer()).start();new Thread(blockingQueueTest.new Consumer()).start();}
}

信号量Semaphore的实现

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,在操作系统中是一个非常重要的问题,用来解决哲学家就餐问题。

Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用acquire()方法获得一个许可,当许可不足时会被阻塞,release()添加一个许可。

在下列代码中,还加入了另外一个mutex信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行


public class SemaphoreTest {private static int count = 0;//创建三个信号量private final Semaphore notFull = new Semaphore(10);private final Semaphore notEmpty = new Semaphore(0);private final Semaphore mutex = new Semaphore(1);class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {notFull.acquire();//获取许可mutex.acquire();count++;System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + count);} catch (InterruptedException e) {e.printStackTrace();}finally {mutex.release();//释放notEmpty.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {notEmpty.acquire();mutex.acquire();count--;System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:"+ count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notFull.release();}}}}public static void main(String[] args) {SemaphoreTest semaphoreTest = new SemaphoreTest();new Thread(semaphoreTest.new Producer()).start();new Thread(semaphoreTest.new Consumer()).start();new Thread(semaphoreTest.new Producer()).start();new Thread(semaphoreTest.new Consumer()).start();new Thread(semaphoreTest.new Producer()).start();new Thread(semaphoreTest.new Consumer()).start();}
}

管道输入输出流实现

在java的io包下,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。
它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。

  • 先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,
  • 用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,
  • 这样就可以实现了不同线程间的相互通讯,
  • 但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的,多个生成者和多个消费者者之间则不行

public class PipedTest {private final PipedInputStream  pis = new PipedInputStream();private final PipedOutputStream pos = new PipedOutputStream();{try {pis.connect(pos);} catch (IOException e) {e.printStackTrace();}}class Producer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = (int) (Math.random() * 255);System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字,该数字为: " + num);pos.write(num);pos.flush();}} catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = pis.read();System.out.println("消费者消费了一个数字,该数字为:" + num);}} catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {PipedTest pipedTest = new PipedTest();new Thread(pipedTest.new Producer()).start();new Thread(pipedTest.new Consumer()).start();}
}

http://chatgpt.dhexx.cn/article/UIglQLb4.shtml

相关文章

最长上升子序列和最长公共子序列

文章目录 文章目录 文章目录一、基本知识二、最长上升子序列1.朴素版2.二分版 三、最长公共子序列 一、基本知识 1.子串和子序列的区别&#xff1a; 子串必须连续,子序列可以不连续。 2.最长上升子序列(LIS)&#xff1a; 是指一个序列中最长的单调递增的子序列。 3.最长公共…

求最长子序列及回溯

D - 最长公共子序列问题 Description 给定两个序列 X{x1,x2,…,xm} 和 Y{y1,y2,…,yn}&#xff0c;找出X和Y的最长公共子序列。 Input 输入数据有多组&#xff0c;每组有两行 &#xff0c;每行为一个长度不超过500的字符串&#xff08;输入全是大写英文字母&#xff08;A,Z…

算法14-求最长子序列

题目&#xff1a; 给定数组arr。求最长的有序子序列的长度&#xff0c;返回长度int 分析&#xff1a; 1 要求的子串是有序的&#xff0c;就要比大小 2 用最暴力大方法&#xff0c;看成窗口问题&#xff0c;每一个元素求出它左边的最长序列子串&#xff0c;写入一个数组dp&…

最长子序列——动态规划

动态规划算法通常用于求解具有某种最优性质的问题。动态规划算法与分治法类似&#xff0c;其基本思想也是将待求解问题分解成若干个子问题&#xff0c;先求解子问题&#xff0c;然后从这些子问题的解得到原问题的解。与分治法不同的是&#xff0c;适合于用动态规划求解的问题&a…

最长公共子串与最长子序列

一 序 本文属于《图解算法》系列&#xff0c;上一篇整理了动态规划&#xff0c;动态规划可以帮助我们解决给定约束条件下找到最优解&#xff0c;例如背包问题。 在问题可分解为彼此独立且离散的子问题时&#xff0c;就可使用动态规划来解决。 在看个例子&#xff0c;求两个字…

动态规划:最长子序列

最长公共子序列&#xff1a; 链接&#xff1a;https://www.nowcoder.com/questionTerminal/9ae56e5bdf4f480387df781671db5172 题目&#xff1a; 我们有两个字符串m和n&#xff0c;如果它们的子串a和b内容相同&#xff0c;则称a和b是m和n的公共子序列。子串中的字符不一定在原字…

C++最长子序列

LeedCode-300. 最长上升子序列 LeetCode-300. 最长上升子序列 方法一&#xff1a;O(n^2)可能会超时&#xff1b;方法二&#xff1a;贪心二分法&#xff0c;使用lower_bound()&#xff1b; 下面是贪心二分算法&#xff1a; 由于要得到最长的递增子序列&#xff0c;就要让序列…

数组:最长子序列问题四种解法

数组&#xff1a;最长子序列问题四种解法 问题描述&#xff1a; 给定一个整数数组 nums &#xff0c;找到一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 示例 1 : 输入: [-2,1,-3,4,-1,2,1,-5,4], 输出: 6 解释: 连续子…

动态规划:最长子序列问题

关于动态规划中的最长子序列问题有很多优秀的解读&#xff0c;在这里推荐一位博主的关于最长子序列的文章&#xff0c;非常不错&#xff0c;配有大量的图片和文字解答&#xff0c;在这里推荐给大家。本文章转载自这里 1.基本概念 首先需要科普一下&#xff0c;最长公共子序列…

一道有趣的最长子序列问题

一道有趣的最长子序列问题 – 潘登同学的金融经济学笔记 文章目录 一道有趣的最长子序列问题 -- 潘登同学的金融经济学笔记 来源求解递推公式算法实现 来源 前几天在刷视频的时候&#xff0c;发现了这样一道题 所谓子序列就是一个序列 a i 1 , a i 2 , ⋯ , a i n a_{i1},a_{…

最长子序列最长子串的题型汇总

1.最长公共子序列的长度 题目&#xff1a;对于两个字符串&#xff0c;请设计一个高效算法&#xff0c;求他们的最长公共子序列的长度&#xff0c;这里的最长公共子序列定义为有两个序列U1,U2,U3...Un和V1,V2,V3...Vn,其中Ui&ltUi1&#xff0c;Vi&ltVi1。且A[Ui] B[Vi…

动态规划解最长子序列问题

动态规划法 经常会遇到复杂问题不能简单地分解成几个子问题&#xff0c;而会分解出一系列的子问题。简单地采用把大问题分解成子问题&#xff0c;并综合子问题的解导出大问题的解的方法&#xff0c;问题求解耗时会按问题规模呈幂级数增加。 为了节约重复求相同子问题的时间&…

最长****子序列

&#xff08;在研读大佬写的博客后&#xff0c;打算记录下自己学习过程&#xff09; 通过最长上升子序列的拓展了解到&#xff0c;其实这是一个系列的问题主要代表有&#xff1a; 1 最长上升子序列 2 最长不上升子序列 3 最长下降子序列 4 最长不下降子序列 就以最长上升子…

最长公共子序列

最长公共子序列&#xff08;Longest Common Subsequence&#xff0c;简称 LCS&#xff09;是一道非常经典的面试题目&#xff0c;因为它的解法是典型的二维动态规划&#xff0c;大部分比较困难的字符串问题都和这个问题一个套路&#xff0c;比如说编辑距离。而且&#xff0c;这…

最长公共子序列(LCS) 过程图解

1.基本概念 首先需要科普一下&#xff0c;最长公共子序列&#xff08;longest common sequence&#xff09;和最长公共子串&#xff08;longest common substring&#xff09;不是一回事儿。什么是子序列呢&#xff1f;即一个给定的序列的子序列&#xff0c;就是将给定序列中零…

最长子序列问题详解

提到最长子序列问题&#xff0c;想必大家都不陌生&#xff0c;今天我主要想分享一下我对子序列问题的一些理解&#xff1a; 先拿最长上升子序列问题来说吧&#xff1a; 很明显这是一个动态规化问题&#xff0c;仔细想想也不难得出其状态转移方程 首先介绍一下dp[]数组的含义…

如何查看pip版本

Windows系统如何查看pip版本 直接运行pip show pip

python之pip版本问题

我们在默认安装python软件之后&#xff0c;其自行安装的pip的版本可能比较落后。我们使用pip安装模块的时候经常提示安装失败&#xff0c;查看安装失败原因&#xff0c;部分是因为pip不是最新版本需要升级版本才能继续安装&#xff0c;因此写下一点小心得&#xff0c;让我们都能…

pip 查看可安装版本

pip 查看可安装版本 pip版本&#xff1a; pip -V pip 20.1.1 from /usr/lib/python2.7/site-packages/pip (python 2.7) 查看pip可安装版本 import pip._internal.utils.compatibility_tags print(pip._internal.utils.compatibility_tags.get_supported())

python如何查看pip版本并且升级pip

第一次写Python的学习经历 我之前也安装过Python,今天&#xff0c;终于重新安装了64位的windows的Python,于是在命令行输入&#xff1a;“pip list”出现以下的提示&#xff1a; 这个提示以前也出现过&#xff0c;但是看不懂&#xff0c;也不知道怎么处理&#xff0c;然后又胡…