Java多种方式解决生产者消费者问题(十分详细)

article/2025/11/10 0:41:54

一、问题描述

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

示意图:
生产者消费者


二、解决方法

思路

  1. 采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。

  2. 在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

解决问题的核心

   保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java能实现的几种方法

  1. wait() / notify()方法

  2. await() / signal()方法

  3. BlockingQueue阻塞队列方法

  4. 信号量

  5. 管道


三、代码实现

1. wait() / notify()方法

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

仓库Storage.java

import java.util.LinkedList;public class Storage {// 仓库容量private final int MAX_SIZE = 10;// 仓库存储的载体private LinkedList<Object> list = new LinkedList<>();public void produce() {synchronized (list) {while (list.size() + 1 > MAX_SIZE) {System.out.println("【生产者" + Thread.currentThread().getName()+ "】仓库已满");try {list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.add(new Object());System.out.println("【生产者" + Thread.currentThread().getName()+ "】生产一个产品,现库存" + list.size());list.notifyAll();}}public void consume() {synchronized (list) {while (list.size() == 0) {System.out.println("【消费者" + Thread.currentThread().getName() + "】仓库为空");try {list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.remove();System.out.println("【消费者" + Thread.currentThread().getName()+ "】消费一个产品,现库存" + list.size());list.notifyAll();}}
}

生产者

public class Producer implements Runnable{private Storage storage;public Producer(){}public Producer(Storage storage){this.storage = storage;}@Overridepublic void run(){while(true){try{Thread.sleep(1000);storage.produce();}catch (InterruptedException e){e.printStackTrace();}}}
}

消费者

public class Consumer implements Runnable{private Storage storage;public Consumer(){}public Consumer(Storage storage){this.storage = storage;}@Overridepublic void run(){while(true){try{Thread.sleep(3000);storage.consume();}catch (InterruptedException e){e.printStackTrace();}}}
}

主函数

public class Main {public static void main(String[] args) {Storage storage = new Storage();Thread p1 = new Thread(new Producer(storage));Thread p2 = new Thread(new Producer(storage));Thread p3 = new Thread(new Producer(storage));Thread c1 = new Thread(new Consumer(storage));Thread c2 = new Thread(new Consumer(storage));Thread c3 = new Thread(new Consumer(storage));p1.start();p2.start();p3.start();c1.start();c2.start();c3.start();}
}

运行结果

【生产者p1】生产一个产品,现库存1
【生产者p2】生产一个产品,现库存2
【生产者p3】生产一个产品,现库存3
【生产者p1】生产一个产品,现库存4
【生产者p2】生产一个产品,现库存5
【生产者p3】生产一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【消费者c1】消费一个产品,现库存7
【生产者p3】生产一个产品,现库存8
【消费者c2】消费一个产品,现库存7
【消费者c3】消费一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【生产者p3】生产一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p2】仓库已满
【生产者p3】仓库已满
【生产者p1】仓库已满
【消费者c1】消费一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p3】仓库已满
。。。。。。以下省略

一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

注意:

notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

2. await() / signal()方法

在JDK5中,用ReentrantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

在这里只需改动Storage类

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class Storage {// 仓库最大存储量private final int MAX_SIZE = 10;// 仓库存储的载体private LinkedList<Object> list = new LinkedList<Object>();// 锁private final Lock lock = new ReentrantLock();// 仓库满的条件变量private final Condition full = lock.newCondition();// 仓库空的条件变量private final Condition empty = lock.newCondition();public void produce(){// 获得锁lock.lock();while (list.size() + 1 > MAX_SIZE) {System.out.println("【生产者" + Thread.currentThread().getName()+ "】仓库已满");try {full.await();} catch (InterruptedException e) {e.printStackTrace();}}list.add(new Object());System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());empty.signalAll();lock.unlock();}public void consume(){// 获得锁lock.lock();while (list.size() == 0) {System.out.println("【消费者" + Thread.currentThread().getName()+ "】仓库为空");try {empty.await();} catch (InterruptedException e) {e.printStackTrace();}}list.remove();System.out.println("【消费者" + Thread.currentThread().getName()+ "】消费一个产品,现库存" + list.size());full.signalAll();lock.unlock();}
}

运行结果与wait()/notify()类似

3. BlockingQueue阻塞队列方法

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

import java.util.concurrent.LinkedBlockingQueue;public class Storage {// 仓库存储的载体private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);public void produce() {try{list.put(new Object());System.out.println("【生产者" + Thread.currentThread().getName()+ "】生产一个产品,现库存" + list.size());} catch (InterruptedException e){e.printStackTrace();}}public void consume() {try{list.take();System.out.println("【消费者" + Thread.currentThread().getName()+ "】消费了一个产品,现库存" + list.size());} catch (InterruptedException e){e.printStackTrace();}}
}

可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

4. 信号量

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行。)。

import java.util.LinkedList;
import java.util.concurrent.Semaphore;public class Storage {// 仓库存储的载体private LinkedList<Object> list = new LinkedList<Object>();// 仓库的最大容量final Semaphore notFull = new Semaphore(10);// 将线程挂起,等待其他来触发final Semaphore notEmpty = new Semaphore(0);// 互斥锁final Semaphore mutex = new Semaphore(1);public void produce(){try {notFull.acquire();mutex.acquire();list.add(new Object());System.out.println("【生产者" + Thread.currentThread().getName()+ "】生产一个产品,现库存" + list.size());}catch (Exception e) {e.printStackTrace();} finally {mutex.release();notEmpty.release();}}public void consume(){try {notEmpty.acquire();mutex.acquire();list.remove();System.out.println("【消费者" + Thread.currentThread().getName()+ "】消费一个产品,现库存" + list.size());} catch (Exception e) {e.printStackTrace();} finally {mutex.release();notFull.release();}}
}

5. 管道

一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。

这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

1. PipedInputStream / PipedOutputStream (操作字节流)

Producer

import java.io.IOException;
import java.io.PipedOutputStream;public class Producer implements Runnable {private PipedOutputStream pipedOutputStream;public Producer() {pipedOutputStream = new PipedOutputStream();}public PipedOutputStream getPipedOutputStream() {return pipedOutputStream;}@Overridepublic void run() {try {for (int i = 1; i <= 5; i++) {pipedOutputStream.write(("This is a test, Id=" + i + "!\n").getBytes());}pipedOutputStream.close();} catch (IOException e) {e.printStackTrace();}}
}

Consumer

import java.io.IOException;
import java.io.PipedInputStream;public class Consumer implements Runnable {private PipedInputStream pipedInputStream;public Consumer() {pipedInputStream = new PipedInputStream();}public PipedInputStream getPipedInputStream() {return pipedInputStream;}@Overridepublic void run() {int len = -1;byte[] buffer = new byte[1024];try {while ((len = pipedInputStream.read(buffer)) != -1) {System.out.println(new String(buffer, 0, len));}pipedInputStream.close();} catch (IOException e) {e.printStackTrace();}}
}

主函数

import java.io.IOException;public class Main {public static void main(String[] args) {Producer p = new Producer();Consumer c = new Consumer();Thread t1 = new Thread(p);Thread t2 = new Thread(c);try {p.getPipedOutputStream().connect(c.getPipedInputStream());t2.start();t1.start();} catch (IOException e) {e.printStackTrace();}}
}
2. PipedReader / PipedWriter (操作字符流)

Producer

import java.io.IOException;
import java.io.PipedWriter;public class Producer implements Runnable {private PipedWriter pipedWriter;public Producer() {pipedWriter = new PipedWriter();}public PipedWriter getPipedWriter() {return pipedWriter;}@Overridepublic void run() {try {for (int i = 1; i <= 5; i++) {pipedWriter.write("This is a test, Id=" + i + "!\n");}pipedWriter.close();} catch (IOException e) {e.printStackTrace();}}
}

Consumer

import java.io.IOException;
import java.io.PipedReader;public class Consumer implements Runnable {private PipedReader pipedReader;public Consumer() {pipedReader = new PipedReader();}public PipedReader getPipedReader() {return pipedReader;}@Overridepublic void run() {int len = -1;char[] buffer = new char[1024];try {while ((len = pipedReader.read(buffer)) != -1) {System.out.println(new String(buffer, 0, len));}pipedReader.close();} catch (IOException e) {e.printStackTrace();}}
}

主函数

import java.io.IOException;public class Main {public static void main(String[] args) {Producer p = new Producer();Consumer c = new Consumer();Thread t1 = new Thread(p);Thread t2 = new Thread(c);try {p.getPipedWriter().connect(c.getPipedReader());t2.start();t1.start();} catch (IOException e) {e.printStackTrace();}}
}

想查看上面几种方式的完整代码,请点击这里:生产者消费者问题的一些实验


参考

《Java多线程编程核心技术》 高洪岩
生产者/消费者问题的多种Java实现方式
Producer–consumer problem – Wikipedia
Semaphore的一种使用方法
Semaphore实现的生产者消费者程序


http://chatgpt.dhexx.cn/article/OhZWd9VW.shtml

相关文章

生产者消费者问题(多生产多消费,java实现)

生产者消费者问题有多种,本文阐述的是多个生产者生产商品,多个消费者消费商品,缓冲区中有多个商品,这种情况下应该怎么处理线程安全问题 首先,具体用一张图描述一下这种情形,达到的效果是,多个生产者一边生产,多个生产者一边消费。 需要注意两个临界情况 1.缓冲区满的…

Java可视化实现生产者消费者问题

引言:生产者消费者问题是一个十分经典的多线程问题。为了更加形象地描述这个问题&#xff0c;采用可视化的形式展示此过程。 1、问题重述 生产者消费者问题也称有限缓冲问题。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”在实际运行时会发生的…

C语言 生产者消费者问题

目录 生产者消费者问题算法设计实现源程序测试日志总结 生产者消费者问题 算法设计 实现 1.编写所需头文件 #include<stdio.h> #include<Windows.h>2.定义全局变量 #define productor 2 //生产者数目 为2 #define consumers 3 //消费者数目 为3 #define buffe…

操作系统_生产者消费者问题

目录 1&#xff0c;生产者消费者问题 问题的提出 初步思考 进程资源共享关系和同步关系分析 问题的具体解决 第一搏 存在的问题 第二搏 多维度思考 1&#xff0c;单生产者、单消费者、多缓冲区 2&#xff0c;多生产者、多消费者、单缓冲 3&#xff0c;单生产者、单…

超详解“生产者消费者问题”【操作系统】

目录 一.生产者消费者问题&#xff08;问题描述&#xff09; 二.问题分析 三.背景知识 四.代码实现 五.实验结论 一.生产者消费者问题&#xff08;问题描述&#xff09; 有一个生产者在生产产品&#xff0c;这些产品将提供给一个消费者去消费&#xff0c;为了使生产者和消…

生产者消费者问题

文章目录 1.生产者消费者问题1.1 问题描述1.2 问题分析1.3 如何实现1.4 思考① -> ② -> ③③ -> ④ -> ① 1.5 小结 2.多生产者 - 多消费者2.1 问题描述2.2 问题分析2.3 如何实现2.4 小结 1.生产者消费者问题 1.1 问题描述 系统中有一组生产者进程和一组消费者进…

《操作系统》-生产者消费者问题

什么是生产者消费者问题&#xff1f; 系统中有一组生产者进程和一组消费者进程。生产者进程每次生产一个产品放入缓冲区&#xff0c;消费者进程每次从缓冲区中取出一个进程并使用&#xff0c;那么他们之间具有这样一层关系 生产者、消费者共享一个初始为空、大小为n的缓冲区。…

生产者-消费者问题(操作系统)

生产者-消费者问题从特殊到一般(从易到难)可以分3种形式&#xff1a; 一个生产者、一个消费者、一个缓冲区的问题&#xff1b; 一个生产者、一个消费者、n个缓冲区的问题&#xff1b; k个生产者、m个消费者、n个缓冲区的问题&#xff1b; ★当缓冲区空时&#xff0c;生产者可…

Java多线程——生产者消费者问题

创建多个线程去执行不同的任务&#xff0c;如果这些任务之间有着某种关系&#xff0c;那么线程之间必须能够通信来协调完成工作。 生产者消费者问题&#xff08;英语&#xff1a;Producer-consumer problem&#xff09;就是典型的多线程同步案例&#xff0c;它也被称为有限缓冲…

生产者-消费者问题(详解)

目录 1.问题描述 2.问题分析 3.问题实现 3.1 初始化 3.2 生产者 3.3 消费者 1.问题描述 要求如下&#xff1a; 只要缓冲区没满&#xff0c;生产者才能把产品放入缓冲区&#xff0c;否则必须等待。只有缓冲区不空时&#xff0c;消费者才能从中取出产品&#xff0c;否则必…

【操作系统】生产者消费者问题

生产者消费者模型 文章目录 生产者消费者模型 [toc]一、 生产者消费者问题二、 问题分析三、 伪代码实现四、代码实现&#xff08;C&#xff09;五、 互斥锁与条件变量的使用比较 一、 生产者消费者问题 生产者消费者问题&#xff08;英语&#xff1a;Producer-consumer proble…

Sublime Text实现代码自动生成,快速编写HTML/CSS代码

目录 下载Sublime Text安装emmet插件常用自动生成HTML代码实例初始化页面自动补全标签配对自动添加类名和id名自动填充文本内容自动生成同级标签自动生成嵌套标签自动生成提级标签自动生成分组标签自动生成多个元素自动生成带多个属性的元素自动生成隐式标签 常用自动生成CSS代…

MybatisPlus代码自动生成

这里写自定义目录标题 前言一. 什么是 MyBatis-Plus二.MybatisPlus 代码自动生成①idea 插件生成1. 插件2.连接数据源3.生成代码 ②配置工具类生成 前言 最开始&#xff0c;要在 Java 中使用数据库时&#xff0c;需要使用 JDBC&#xff0c;创建 Connection、ResultSet 等&…

Simulink自动代码生成:生成代码的基本设置

Simulink自动代码生成也被称作基于模型开发&#xff08;BMD&#xff09;&#xff0c;相比于传统的手写代码方式能够尽量减少人为错误。模型本身可以用于仿真&#xff0c;单元测试等&#xff0c;更便于提前发现逻辑错误。同时只要约定好模型接口&#xff0c;就可以多人协作&…

C语言代码自动生成工具

一、模型建模模块&#xff1a; 基于开源开发平台Eclipse&#xff0c;以图形方式创建和编辑模型元素&#xff0c;模型元素如下&#xff1a; 活动&#xff1a;初始活动、简单活动、复杂活动、结束活动&#xff1b;状态&#xff1a;初始状态、状态、结束状态&#xff1b;变迁&a…

前端代码自动生成器

场景 1.CodeFun是什么 CodeFun是一款UI 设计稿智能生成源代码的工具,支持微信小程序端、移动端H5和混合APP,上传 Sketch、PSD等形式的设计稿&#xff0c;通过智能化技术一键生成可维护的前端代码. 2.学习成本高吗&#xff1f; 对于前端工程师来说&#xff0c;几乎没有学习成本…

MATLAB/Simulink自动代码生成(一)

Simulink自带了种类繁多、功能强大的模块库&#xff0c;在基于模型设计的开发流程下&#xff0c;Simulink不仅通过仿真可以进行早期设计的验证&#xff0c;还可以生成C/C、PLC等代码直接应用于PC、MCU、DSP等平台。在嵌入式软件开发中发挥着重要的作用&#xff0c;本文以Simuli…

IDEA自动生成代码插件

官方介绍 基于IntelliJ IDEA开发的代码生成插件&#xff0c;支持自定义任意模板&#xff08;Java&#xff0c;html&#xff0c;js&#xff0c;xml&#xff09;。 只要是与数据库相关的代码都可以通过自定义模板来生成。支持数据库类型与java类型映射关系配置。 支持同时生成生…

Matlab/Simulink自动生成C代码实验

目录 0. 概要 1. Matlab /Simulink/Embedded Coder关系与区别 2. 搭建Simulink模型及仿真 2.1 搭建模型 2.2 仿真 3. 生成代码 3.1 求解器设置为定步长 3.2 安装 MinGW-w64 编译器 3.3 调出Simulink Coder 4. 工具都生成了啥呢&#xff1f; 0. 概要 Matlab网站提供了很多…

关于RuoYi自动代码生成功能的使用

为什么要使用代码生成&#xff1f; 答&#xff1a;因为在后端构建的过程中会有许多重复的类似的代码编写&#xff0c;而我们如果一个个去编写&#xff0c;会耗费大量时间与精力&#xff0c;所以我们可以设计一个功能去自动生成这些重复的&#xff0c;简单的代码。而若依系统就…