生产者消费者模型java实现

article/2025/9/28 7:31:11

做题的时候遇到了生产者消费者问题,这个问题可以说是线程学习的经典题目了,就忍不住研究了一波。它描述是有一块缓冲区(队列实现)作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。在Java中这个数组线程阻塞的问题,多个用户同时发送多个请求,怎么保证不发生线程死锁,是我们要考虑的问题。

生产者消费者模式说明:

1.生产者只在仓库未满时进行生产,仓库满时生产者进程被阻塞;

2.消费者只在仓库非空时进行消费,仓库为空时消费者进程被阻塞;

3.当消费者发现仓库为空时会通知生产者生产;

3.当生产者发现仓库满时会通知消费者消费;

实现的关键:

我们知道在JAVA环境中,线程Thread有如下几个状态:

1.新建状态

2.就绪状态

3.运行状态

4.阻塞状态

5.死亡状态

生产者消费者问题就是要控制线程的阻塞状态,保证生产者和消费者进程在一定条件下,一直稳定运行,不出现没有商品但是消费者还是一直购买,商品满了但是生产者还是不断生产导致浪费的情况。

 

我们考虑线程常用的Sychronized、RetrenLock还有阻塞队列来实现。

(1)Object的wait() / notify()方法 

wait(): wait()方法可以让线程进入等待状态,当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。

notify():notify随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态。当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

 

 

代码实现:

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;/*** 生产者消费者模式:使用Object.wait() / notify()方法实现*/
public class ProducerConsumer {private static final int CAPACITY = 5;
//申请一个容量最大的仓库public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;//队列作为仓库String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){
//while(condition)为自旋锁,为防止该线程没有收到notify()调用也从wait()中返回
//(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全
//地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行
//了,自旋锁当终止条件满足时,才会停止自旋,这里设置了一直执行,直到程序手动停
//止。synchronized(queue){//给队列加锁,保证线程安全while(queue.size() == maxSize){//当队列是满的时候,生产者线程等待,由消费者线程进行操作try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}//队列不为空的时候,生产者被唤醒进行操作System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//因此如果想在一个满的队列中加入一个新项,调用 add() 方法就会抛出一//个 unchecked 异常,而调用 offer() 方法会返回 falsequeue.notifyAll();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){synchronized(queue){while(queue.isEmpty()){try {//队列为空,说明没有生产者生产的商品,消费者进行等待System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();//如果队列元素为空,调用remove() 的行为与 Collection 接口的版本相似会抛出异常,这里是模拟消费者取走商品的过程// 但是新的 poll() 方法在用空集合调用时只是返回 null。因此新的方法更适合容易出现异常条件的情况。System.out.println("[" + name + "] Consuming value : " + x);queue.notifyAll();//唤醒所有队列,消费者和生产者根据队列情况进行操作try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}
}

 

2. 使用Lock和Condition的await() / signal()方法

Condition接口的await()和signal()是用来做同步的两种方法,它们的功能基本上和Object的wait()/ nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

代码实现:

import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生产者消费者模式:使用Lock和Condition实现*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//队列满的条件private static final Condition emptyCondition = lock.newCondition();//队列空的条件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//这里可以和wait()进行对比,两种控制线程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁,Lock不同于Sychronized,需要手动释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");//队列为空满足条件,消费者线程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}

(3)BlockingQueue阻塞队列方法 

我们采用一个阻塞队列来实现。

 

通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

我们这里使用LinkedBlockingQueue,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()/ signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。

  • put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
  • take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

 

 

代码实现:

import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生产者消费者模式:使用Lock和Condition实现*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//队列满的条件private static final Condition emptyCondition = lock.newCondition();//队列空的条件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//这里可以和wait()进行对比,两种控制线程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);
//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁,Lock不同于Sychronized,需要手动释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
//队列为空满足条件,消费者线程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}

小结:三种实现形式,其实理念都是相同的,都是控制阻塞状态,根据条件去控制线程的运行状态和阻塞状态。生产者消费者模式 为信息传输开辟了一个崭新的概念,因为它的优先级最高,所以即使网络发生堵塞时它也会最先通过,最大程度的保证了设备的安全。也有缺点,就是在网络中的个数是有限制的。生产者消费者模式在设置时比较简单,使用方便安全,在将来的自动化行业必定会大大被人们所认同。

参考资料:

https://blog.csdn.net/u010983881/article/details/78554671#commentBox


http://chatgpt.dhexx.cn/article/Q6o4bnYp.shtml

相关文章

生产者消费者模型详解以及实现

生产者消费者模式 我们先来看看什么是生产者消费者模式&#xff0c;生产者消费者模式是程序设计中非常常见的一种设计模式&#xff0c;被广泛运用在解耦、消息队列等场景。在现实世界中&#xff0c;我们把生产商品的一方称为生产者&#xff0c;把消费商品的一方称为消费者&…

Java生产者消费者模型的五种实现方式

转自&#xff1a;https://juejin.im/entry/596343686fb9a06bbd6f888c 前言 生产者和消费者问题是线程模型中的经典问题&#xff1a;生产者和消费者在同一时间段内共用同一个存储空间&#xff0c;生产者往存储空间中添加产品&#xff0c;消费者从存储空间中取走产品&#xff0c…

生产者消费者模型---详解及代码实现

概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯&#xff0c;而通过阻塞队列来进行通讯&#xff0c;所以生产者生产完数据之后不用等待消费者处理&#xff0c;直接扔给阻塞队列&#xff0c;消费者不找生产者要数据…

生产消费者模型

生产消费者模型中包含三个部分&#xff0c;生产者、消费者和交易场所。其中涉及如下的关系&#xff1a; &#xff08;1&#xff09;生产者和生产者之间的关系&#xff1a;由于生产者的生产面向的都是交易场所&#xff0c;所以生产者之间是存在竞争关系的&#xff0c;就像一家超…

生产者-消费者模型

什么是生产者消费者模型 生产者 - 消费者模型&#xff08; Producer-consumer problem&#xff09; 是一个非常经典的多线程并发协作的模型&#xff0c;在分布式系统里非常常见。 这个模型由两类线程和一个缓冲区组成来组成 生产者线程&#xff1a;生产数据&#xff0c;并把…

最长上升子序列和最长公共子序列

文章目录 文章目录 文章目录一、基本知识二、最长上升子序列1.朴素版2.二分版 三、最长公共子序列 一、基本知识 1.子串和子序列的区别&#xff1a; 子串必须连续,子序列可以不连续。 2.最长上升子序列(LIS)&#xff1a; 是指一个序列中最长的单调递增的子序列。 3.最长公共…

求最长子序列及回溯

D - 最长公共子序列问题 Description 给定两个序列 X{x1,x2,…,xm} 和 Y{y1,y2,…,yn}&#xff0c;找出X和Y的最长公共子序列。 Input 输入数据有多组&#xff0c;每组有两行 &#xff0c;每行为一个长度不超过500的字符串&#xff08;输入全是大写英文字母&#xff08;A,Z…

算法14-求最长子序列

题目&#xff1a; 给定数组arr。求最长的有序子序列的长度&#xff0c;返回长度int 分析&#xff1a; 1 要求的子串是有序的&#xff0c;就要比大小 2 用最暴力大方法&#xff0c;看成窗口问题&#xff0c;每一个元素求出它左边的最长序列子串&#xff0c;写入一个数组dp&…

最长子序列——动态规划

动态规划算法通常用于求解具有某种最优性质的问题。动态规划算法与分治法类似&#xff0c;其基本思想也是将待求解问题分解成若干个子问题&#xff0c;先求解子问题&#xff0c;然后从这些子问题的解得到原问题的解。与分治法不同的是&#xff0c;适合于用动态规划求解的问题&a…

最长公共子串与最长子序列

一 序 本文属于《图解算法》系列&#xff0c;上一篇整理了动态规划&#xff0c;动态规划可以帮助我们解决给定约束条件下找到最优解&#xff0c;例如背包问题。 在问题可分解为彼此独立且离散的子问题时&#xff0c;就可使用动态规划来解决。 在看个例子&#xff0c;求两个字…

动态规划:最长子序列

最长公共子序列&#xff1a; 链接&#xff1a;https://www.nowcoder.com/questionTerminal/9ae56e5bdf4f480387df781671db5172 题目&#xff1a; 我们有两个字符串m和n&#xff0c;如果它们的子串a和b内容相同&#xff0c;则称a和b是m和n的公共子序列。子串中的字符不一定在原字…

C++最长子序列

LeedCode-300. 最长上升子序列 LeetCode-300. 最长上升子序列 方法一&#xff1a;O(n^2)可能会超时&#xff1b;方法二&#xff1a;贪心二分法&#xff0c;使用lower_bound()&#xff1b; 下面是贪心二分算法&#xff1a; 由于要得到最长的递增子序列&#xff0c;就要让序列…

数组:最长子序列问题四种解法

数组&#xff1a;最长子序列问题四种解法 问题描述&#xff1a; 给定一个整数数组 nums &#xff0c;找到一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 示例 1 : 输入: [-2,1,-3,4,-1,2,1,-5,4], 输出: 6 解释: 连续子…

动态规划:最长子序列问题

关于动态规划中的最长子序列问题有很多优秀的解读&#xff0c;在这里推荐一位博主的关于最长子序列的文章&#xff0c;非常不错&#xff0c;配有大量的图片和文字解答&#xff0c;在这里推荐给大家。本文章转载自这里 1.基本概念 首先需要科普一下&#xff0c;最长公共子序列…

一道有趣的最长子序列问题

一道有趣的最长子序列问题 – 潘登同学的金融经济学笔记 文章目录 一道有趣的最长子序列问题 -- 潘登同学的金融经济学笔记 来源求解递推公式算法实现 来源 前几天在刷视频的时候&#xff0c;发现了这样一道题 所谓子序列就是一个序列 a i 1 , a i 2 , ⋯ , a i n a_{i1},a_{…

最长子序列最长子串的题型汇总

1.最长公共子序列的长度 题目&#xff1a;对于两个字符串&#xff0c;请设计一个高效算法&#xff0c;求他们的最长公共子序列的长度&#xff0c;这里的最长公共子序列定义为有两个序列U1,U2,U3...Un和V1,V2,V3...Vn,其中Ui&ltUi1&#xff0c;Vi&ltVi1。且A[Ui] B[Vi…

动态规划解最长子序列问题

动态规划法 经常会遇到复杂问题不能简单地分解成几个子问题&#xff0c;而会分解出一系列的子问题。简单地采用把大问题分解成子问题&#xff0c;并综合子问题的解导出大问题的解的方法&#xff0c;问题求解耗时会按问题规模呈幂级数增加。 为了节约重复求相同子问题的时间&…

最长****子序列

&#xff08;在研读大佬写的博客后&#xff0c;打算记录下自己学习过程&#xff09; 通过最长上升子序列的拓展了解到&#xff0c;其实这是一个系列的问题主要代表有&#xff1a; 1 最长上升子序列 2 最长不上升子序列 3 最长下降子序列 4 最长不下降子序列 就以最长上升子…

最长公共子序列

最长公共子序列&#xff08;Longest Common Subsequence&#xff0c;简称 LCS&#xff09;是一道非常经典的面试题目&#xff0c;因为它的解法是典型的二维动态规划&#xff0c;大部分比较困难的字符串问题都和这个问题一个套路&#xff0c;比如说编辑距离。而且&#xff0c;这…

最长公共子序列(LCS) 过程图解

1.基本概念 首先需要科普一下&#xff0c;最长公共子序列&#xff08;longest common sequence&#xff09;和最长公共子串&#xff08;longest common substring&#xff09;不是一回事儿。什么是子序列呢&#xff1f;即一个给定的序列的子序列&#xff0c;就是将给定序列中零…