实现生产者消费者模式的三种方式

article/2025/9/29 9:27:05

什么是生产者消费者模式

简单来说,生产者消费者模式就是缓冲区。

image.png
那么这么做有两个好处,一个是解耦,第二个是平衡生产能力和消费能力的差,因为生产者和消费者的速度是不一样的,有了这个缓冲区就可以平衡这样一个落差,达到动态平衡。

那么这个缓冲区其实就是一个队列,它的规则就是当队列是满的时候,生产者会被阻塞。当队列为空的时候,消费者会被阻塞, 在java中实现生产者消费者模式有多种方式,主要是线程间的通信,这里介绍三种:wait/notify、await/signal和阻塞队列。

wait/notify

wait的时候阻塞当前线程,然后释放锁,这时候消费者就有机会抢占到锁了,代码如下:

//waitnotify模式的生产者消费者,泛型类
public class WaitNotifyProducerConsumer<T> {//储存元素的队列public ArrayList<T> list = new ArrayList<>();//队列最大长度5private int maxSize = 5;//生产者public void put(T item) throws InterruptedException {//锁住synchronized (list) {//如果已经达到5了,证明满了,阻塞线程,释放锁if (list.size() == maxSize) {list.wait();}//添加队列元素list.add(item);System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + list.size() + "个元素");//通知消费者list.notify();}}public T take() throws InterruptedException {//锁住,和生产者共用一把锁synchronized (list) {//如果队列为空,阻塞线程释放锁,等待生产者放进去了唤醒if (list.isEmpty()) {list.wait();}//队列,先进先出,取出第一个元素T item = list.remove(0);System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + list.size() + "个元素");//通知生产者list.notify();return item;}}public static void main(String[] args) throws InterruptedException {WaitNotifyProducerConsumer<String> wnpc = new WaitNotifyProducerConsumer<>();//生产者线程,十轮Thread t1 = new Thread(() -> {Random random = new Random();for (int i = 1; i <= 10; i++) {//元素String item = "item-" + i;try {//如果队列满了,put会阻塞wnpc.put(item);//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();//让生产者先启动生产数据Thread.sleep(1000);Thread t2 = new Thread(() -> {Random random = new Random();//自旋for (; ; ) {try {//消费元素String item = wnpc.take();//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t2.start();}
}

执行一下,生产者和消费者轮流生产消费元素。

image.png

await/signal

await和signal是Condition里面的,作用和synchronized一样,await让线程阻塞,并且释放锁,signal唤醒阻塞的线程,代码如下:

//Condition实现生产者消费者模式,泛型类
public class ConditionProducerConsumer<T> {//锁private Lock lock = new ReentrantLock();//储存元素的队列public ArrayList<T> list = new ArrayList<>();//队列最大长度5private int maxSize = 5;//使用两个条件队列condition来实现精确通知,生产者conditionprivate final Condition producerCondition = lock.newCondition();//消费者conditionprivate final Condition consumerCondition = lock.newCondition();//生产者public void put(T item) throws InterruptedException {//锁住lock.lock();try {//如果队列已满,阻塞当前线程,添加到生产者Condition等待队列,释放锁if (list.size() == maxSize) {producerCondition.await();}//添加元素list.add(item);System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + list.size() + "个元素");//唤醒消费者Condition队列阻塞的线程consumerCondition.signal();} finally {lock.unlock();}}public T take() throws InterruptedException {//加锁lock.lock();try {//如果队列为空,释放锁,阻塞线程,添加到消费者等待队列if (list.isEmpty()) {consumerCondition.await();}//取出第一个元素T item = list.remove(0);System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + list.size() + "个元素");//唤醒生产者队列阻塞的线程producerCondition.signal();return item;} finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {ConditionProducerConsumer<String> cpc = new ConditionProducerConsumer<>();//生产者线程,十轮Thread t1 = new Thread(() -> {Random random = new Random();for (int i = 1; i <= 10; i++) {//元素String item = "item-" + i;try {//如果队列满了,put会阻塞cpc.put(item);//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();//让生产者先启动生产数据Thread.sleep(1000);Thread t2 = new Thread(() -> {Random random = new Random();//自旋for (; ; ) {try {//消费元素String item = cpc.take();//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t2.start();}}

运行一下:

image.png

阻塞队列

JUC中的阻塞队列

  • ArrayBlockingQueue 基于数组结构
  • LinkedBlockingQueue 基于链表结构
  • PriorityBlcokingQueue 基于优先级队列
  • DelayQueue 允许延时执行的队列
  • SynchronousQueue 没有任何存储结构的的队列,它的应用场景是newCachedThreadPool()线程池。可以处理非常大请求的任务,1000个任务过来,那么线程池需要分配1000个线程来执行。关于线程池的原理可以查看《线程池源码精讲》 。

image.png

阻塞队列中的方法

添加元素

  • add -> 如果队列满了,抛出异常
  • offer -> true/false , 添加成功返回true,否则返回false
  • put -> 如果队列满了,则一直阻塞
  • offer(timeout) -> 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。

移除元素

  • element-> 队列为空,抛异常
  • peek -> true/false , 移除成功返回true,否则返回false
  • take -> 一直阻塞
  • poll(timeout) -> 如果超时了,还没有元素,则返回null

实现生产者消费者

上面用Condition中的await和signal其实是就是在手写阻塞队列,但是我们可以不用这么麻烦,直接用java中封装好的阻塞队列,这些阻塞队列的源码点进去看其实用的就是await和signal,只不过封装好的代码比我们自己写的代码要健壮。我们这里用的是阻塞队列提供的put()和take()方法,代码如下:

public class BlockingQueueProducerConsumer<T> {//阻塞队列,长度是5BlockingQueue<T> blockingQueue = new ArrayBlockingQueue(5);//生产者,调阻塞队列的put方法public void put(T item) throws InterruptedException {blockingQueue.put(item);System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + blockingQueue.size() + "个元素");}//消费者,调阻塞队列的take方法public T take() throws InterruptedException {T item = blockingQueue.take();System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + blockingQueue.size() + "个元素");return item;}public static void main(String[] args) throws InterruptedException {BlockingQueueProducerConsumer<String> bqpc = new BlockingQueueProducerConsumer<>();//生产者线程,十轮Thread t1 = new Thread(() -> {Random random = new Random();for (int i = 1; i <= 10; i++) {//元素String item = "item-" + i;try {//如果队列满了,put会阻塞bqpc.put(item);//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();//让生产者启动生产数据Thread.sleep(1000);Thread t2 = new Thread(() -> {Random random = new Random();//自旋for (; ; ) {try {//消费元素String item = bqpc.take();//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t2.start();}
}

运行一下:
image.png

我们可以看到用java中自带的阻塞队列实现生产者消费者是最简单的,两行代码搞定,但是另外两种写法也得懂,主要是线程间的通信,毕竟要知其然知其所以然,最后感谢大家收看~


http://chatgpt.dhexx.cn/article/ivBDhYiH.shtml

相关文章

生产者消费者模式最佳实践

测试环境&#xff1a;ubuntu18.04opencv4.2Qt 一个生产者-消费者模式下的视频处理框架。基础结构&#xff1a;视频读取类线程不断读取视频帧&#xff0c;处理类线程对图像进行处理&#xff0c;之后通过信号与槽机制在主线程中显示。特点&#xff1a;视频读取、处理为独立线程&a…

生产者、消费者模式

架构设计&#xff1a;生产者/消费者模式[0]&#xff1a;概述 今天打算来介绍一下“生产者&#xff0f;消费者模式”&#xff0c;这玩意儿在很多开发领域都能派上用场。由于该模式很重要&#xff0c;打算分几个帖子来介绍。今天这个帖子先来扫盲一 把。如果你对这个模式已经比较…

生产者和消费者模式详解

★简介 生产者消费者模式并不是GOF提出的23种设计模式之一&#xff0c;23种设计模式都是建立在面向对象的基础之上的&#xff0c;但其实面向过程的编程中也有很多高效的编程模式&#xff0c;生产者消费者模式便是其中之一&#xff0c;它是我们编程过程中最常用的一种设计模式。…

生产者消费者模式+代码实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。 为什么要使用生产者和消费者模式 在线程世界里&#xff0c;生产者就是生产数据的线程&#xff0c;消费者就是消费数据的线程。在…

生产者消费者模式详解(转载)

★简介 在实际的软件开发过程中&#xff0c;经常会碰到如下场景&#xff1a;某个模块负责产生数据&#xff0c;这些数据由另一个模块来负责处理&#xff08;此处的模块是广义的&#xff0c;可以是类、函数、线程、进程等&#xff09;。产生数据的模块&#xff0c;就形象地称为生…

多线程之生产者消费者模式

文章目录 基本组成阻塞队列有界队列与无界队列ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue 流量控制与信号量(Semaphore)双缓冲与Exchanger 基本组成 生产者&#xff1a;生产者的任务是生产产品&#xff0c;产品可以是数据&#xff0c;也可以是任务。(将产品存入传…

java 生产者消费者模式

java的生产者消费者模式&#xff0c;有三个部分组成&#xff0c;一个是生产者&#xff0c;一个是消费者&#xff0c;一个是缓存。 这么做有什么好处呢&#xff1f; 1.解耦(去依赖)&#xff0c;如果是消费者直接调用生产者&#xff0c;那如果生产者的代码变动了&#xff0c;消费…

生产者消费者模式浅析

由于最近工作中&#xff0c;涉及到生产者消费者设计模式&#xff0c;对此有一些体会&#xff0c;所以总结一下&#xff0c;与大家分享。 什么是生产者消费者模式 在工作中&#xff0c;大家可能会碰到这样一种情况&#xff1a;某个模块负责产生数据&#xff0c;这些数据由另一个…

java实现生产者消费者模式

一: 什么是生产者消费者模型 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯&#xff0c;而通过阻塞队列来进行通讯&#xff0c;所以生产者生产完数据之后不用等待消费者处理&#xff0c;直接扔给阻塞队列&#xff0c;消费…

【Java】生产者消费者模式的实现

前言 生产者消费者问题是线程模型中的经典问题&#xff1a;生产者和消费者在同一时间段内共用同一存储空间&#xff0c;生产者向空间里生产数据&#xff0c;而消费者取走数据。 阻塞队列就相当于一个缓冲区&#xff0c;平衡了生产者和消费者的处理能力。这个阻塞队列就是用来…

生产者/消费者模式

[0]&#xff1a;概述 今天打算来介绍一下“生产者&#xff0f;消费者模式”&#xff0c;这玩意儿在很多开发领域都能派上用场。由于该模式很重要&#xff0c;打算分几个帖子来介绍。今天这个帖子先来扫盲一把。如果你对这个模式已经比较了解&#xff0c;请跳过本扫盲帖&#x…

(四)生产者消费者模式

&#xff08;一)生产者消费者模式原理&#xff1a; 在一个系统中&#xff0c;存在生产者和消费者两种角色&#xff0c;他们通过内存缓冲区进行通信&#xff0c;生产者生产消费者需要的资料&#xff0c;消费者把资料做成产品。生产消费者模式如下图&#xff1a; &#xff08;二…

【C++】【设计模式之】生产者-消费者模型(理论讲解及实现)

一、什么是生产者-消费者模型 1、简单理解生产者-消费者模型 假设有两个进程&#xff08;或线程&#xff09;A、B和一个固定大小的缓冲区&#xff0c;A进程生产数据放入缓冲区&#xff0c;B进程从缓冲区中取出数据进行计算&#xff0c;这就是一个简单的生产者-消费者模型。这里…

设计模式——生产者消费者模式

1 基本概括 2 主要介绍 2.1 概念 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯&#xff0c;而通过阻塞队列来进行通讯&#xff0c;所以生产者生产完数据之后不用等待消费者处理&#xff0c;直接扔给阻塞队列&#xff…

生产者消费者模式三种实现方式

目录 1.什么是生产者消费者模式&#xff1a;2.生产者消费者模型的实现&#xff1a;第一种&#xff1a;使用 synchronized和wait、notify第二种&#xff1a;使用 Lock和await、signal第三种&#xff1a;使用 阻塞队列 BlockingQueue 1.什么是生产者消费者模式&#xff1a; 生产…

t-SNE算法

t-SNE(t-distributed stochastic neighbor embedding)是用于降维的一种机器学习算法&#xff0c;是由 Laurens van der Maaten 和 Geoffrey Hinton在 08 年提出来。t-SNE 是一种非线性降维算法&#xff0c;非常适用于高维数据降维到 2 维或者 3 维&#xff0c;进行可视化。在实…

t-SNE概述

为了循序渐进, 先来学习SNE. SNE 无论是多维数据还是词向量, 都是一个个散落在空间中的点, 点与点之间距离近的, 就可以看作属于同一分类或近义词. 衡量两点距离有很多种手段, 但最常用的还是欧式距离, 所以欧氏距离与相似度的关系可以用某种公式近似表达, 这样就可以把空间信…

机器学习笔记 - 什么是t-SNE?

1、t-SNE概述 t-Distributed Stochastic Neighbor Embedding (t-SNE) 是一种无监督的非线性技术,主要用于数据探索和高维数据的可视化。简单来说,t-SNE 让您对数据在高维空间中的排列方式有一种感觉或直觉。它由 Laurens van der Maatens 和 Geoffrey Hinton 于 2008 年提出。…

可视化降维方法 t-SNE

本篇主要介绍很好的降维方法t-SNE的原理 详细介绍了困惑度perplexity对有效点的影响首先介绍了SNE然后在SNE的基础上进行改进&#xff1a;1.使用对称式。2.低维空间概率计算使用t分布 t-SNE&#xff08;t分布和SNE的组合&#xff09; 以前的方法有个问题&#xff1a;只考虑相…

t-SNE非线性降维

TSNE&#xff08;t-Distributed Stochastic Neighbor Embedding &#xff09;是对SNE的改进&#xff0c;SNE最早出现在2002年&#xff0c;改变了MDN和ISOMAP中基于距离不变的思想&#xff0c;将高维映射到低维的同时&#xff0c;尽量保证相互之间的分布概率不变&#xff0c;SNE…