Java生产者消费者模型的五种实现方式

article/2025/9/28 8:07:35

转自:https://juejin.im/entry/596343686fb9a06bbd6f888c

 

前言

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
生产者消费者.png
现在用四种方式来实现生产者消费者模型

wait()和notify()方法的实现

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

/*** 生产者和消费者,wait()和notify()的实现* @author ZGJ* @date 2017年6月22日*/
public class Test1 {private static Integer count = 0;private static final Integer FULL = 10;private static String LOCK = "lock";public static void main(String[] args) {Test1 test1 = new Test1();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();new Thread(test1.new Producer()).start();new Thread(test1.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}synchronized (LOCK) {while (count == FULL) {try {LOCK.wait();} catch (Exception e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);LOCK.notifyAll();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (LOCK) {while (count == 0) {try {LOCK.wait();} catch (Exception e) {}}count--;System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);LOCK.notifyAll();}}}}
}

结果:

Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-7消费者消费,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-1消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-1消费者消费,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1

可重入锁ReentrantLock的实现

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。
可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*** 生产者和消费者,ReentrantLock的实现* * @author ZGJ* @date 2017年6月22日*/
public class Test2 {private static Integer count = 0;private static final Integer FULL = 10;//创建一个锁对象private Lock lock = new ReentrantLock();//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();public static void main(String[] args) {Test2 test2 = new Test2();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}//获取锁lock.lock();try {while (count == FULL) {try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);//唤醒消费者notEmpty.signal();} finally {//释放锁lock.unlock();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}lock.lock();try {while (count == 0) {try {notEmpty.await();} catch (Exception e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);notFull.signal();} finally {lock.unlock();}}}}
}

阻塞队列BlockingQueue的实现

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作
    因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
    从上可知,阻塞队列是线程安全的。
    下面是BlockingQueue接口的一些方法:
操作抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
移除remove(o)poll(o)take(o)poll(timeout, timeunit)
检查element(o)peek(o)  

这四类方法分别对应的是:
1 . ThrowsException:如果操作不能马上进行,则抛出异常
2 . SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
3 . Blocks:如果操作不能马上进行,操作会被阻塞
4 . TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false
下面来看由阻塞队列实现的生产者消费者模型,这里我们使用take()和put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/*** 使用BlockingQueue实现生产者消费者模型* @author ZGJ* @date 2017年6月29日*/
public class Test3 {private static Integer count = 0;//创建一个阻塞队列final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);public static void main(String[] args) {Test3 test3 = new Test3();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}try {blockingQueue.put(1);count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {blockingQueue.take();count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}
}

信号量Semaphore的实现

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,在操作系统中是一个非常重要的问题,可以用来解决哲学家就餐问题。Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用acquire()方法获得一个许可,当许可不足时会被阻塞,release()添加一个许可。在下列代码中,还加入了另外一个mutex信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行

import java.util.concurrent.Semaphore;
/*** 使用semaphore信号量实现* @author ZGJ* @date 2017年6月29日*/
public class Test4 {private static Integer count = 0;//创建三个信号量final Semaphore notFull = new Semaphore(10);final Semaphore notEmpty = new Semaphore(0);final Semaphore mutex = new Semaphore(1);public static void main(String[] args) {Test4 test4 = new Test4();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}try {notFull.acquire();mutex.acquire();count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notEmpty.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {notEmpty.acquire();mutex.acquire();count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notFull.release();}}}}
}

管道输入输出流PipedInputStream和PipedOutputStream实现

在java的io包下,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。
它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。
使用方法:先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯,但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的,多个生成者和多个消费者者之间则不行

/*** 使用管道实现生产者消费者模型* @author ZGJ* @date 2017年6月30日*/
public class Test5 {final PipedInputStream pis = new PipedInputStream();final PipedOutputStream pos = new PipedOutputStream();{try {pis.connect(pos);} catch (IOException e) {e.printStackTrace();}}class Producer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = (int) (Math.random() * 255);System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字,该数字为: " + num);pos.write(num);pos.flush();} } catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = pis.read();System.out.println("消费者消费了一个数字,该数字为:" + num);}} catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {Test5 test5 = new Test5();new Thread(test5.new Producer()).start();new Thread(test5.new Consumer()).start();}
}

http://chatgpt.dhexx.cn/article/o5MCRIR2.shtml

相关文章

生产者消费者模型---详解及代码实现

概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯&#xff0c;而通过阻塞队列来进行通讯&#xff0c;所以生产者生产完数据之后不用等待消费者处理&#xff0c;直接扔给阻塞队列&#xff0c;消费者不找生产者要数据…

生产消费者模型

生产消费者模型中包含三个部分&#xff0c;生产者、消费者和交易场所。其中涉及如下的关系&#xff1a; &#xff08;1&#xff09;生产者和生产者之间的关系&#xff1a;由于生产者的生产面向的都是交易场所&#xff0c;所以生产者之间是存在竞争关系的&#xff0c;就像一家超…

生产者-消费者模型

什么是生产者消费者模型 生产者 - 消费者模型&#xff08; Producer-consumer problem&#xff09; 是一个非常经典的多线程并发协作的模型&#xff0c;在分布式系统里非常常见。 这个模型由两类线程和一个缓冲区组成来组成 生产者线程&#xff1a;生产数据&#xff0c;并把…

最长上升子序列和最长公共子序列

文章目录 文章目录 文章目录一、基本知识二、最长上升子序列1.朴素版2.二分版 三、最长公共子序列 一、基本知识 1.子串和子序列的区别&#xff1a; 子串必须连续,子序列可以不连续。 2.最长上升子序列(LIS)&#xff1a; 是指一个序列中最长的单调递增的子序列。 3.最长公共…

求最长子序列及回溯

D - 最长公共子序列问题 Description 给定两个序列 X{x1,x2,…,xm} 和 Y{y1,y2,…,yn}&#xff0c;找出X和Y的最长公共子序列。 Input 输入数据有多组&#xff0c;每组有两行 &#xff0c;每行为一个长度不超过500的字符串&#xff08;输入全是大写英文字母&#xff08;A,Z…

算法14-求最长子序列

题目&#xff1a; 给定数组arr。求最长的有序子序列的长度&#xff0c;返回长度int 分析&#xff1a; 1 要求的子串是有序的&#xff0c;就要比大小 2 用最暴力大方法&#xff0c;看成窗口问题&#xff0c;每一个元素求出它左边的最长序列子串&#xff0c;写入一个数组dp&…

最长子序列——动态规划

动态规划算法通常用于求解具有某种最优性质的问题。动态规划算法与分治法类似&#xff0c;其基本思想也是将待求解问题分解成若干个子问题&#xff0c;先求解子问题&#xff0c;然后从这些子问题的解得到原问题的解。与分治法不同的是&#xff0c;适合于用动态规划求解的问题&a…

最长公共子串与最长子序列

一 序 本文属于《图解算法》系列&#xff0c;上一篇整理了动态规划&#xff0c;动态规划可以帮助我们解决给定约束条件下找到最优解&#xff0c;例如背包问题。 在问题可分解为彼此独立且离散的子问题时&#xff0c;就可使用动态规划来解决。 在看个例子&#xff0c;求两个字…

动态规划:最长子序列

最长公共子序列&#xff1a; 链接&#xff1a;https://www.nowcoder.com/questionTerminal/9ae56e5bdf4f480387df781671db5172 题目&#xff1a; 我们有两个字符串m和n&#xff0c;如果它们的子串a和b内容相同&#xff0c;则称a和b是m和n的公共子序列。子串中的字符不一定在原字…

C++最长子序列

LeedCode-300. 最长上升子序列 LeetCode-300. 最长上升子序列 方法一&#xff1a;O(n^2)可能会超时&#xff1b;方法二&#xff1a;贪心二分法&#xff0c;使用lower_bound()&#xff1b; 下面是贪心二分算法&#xff1a; 由于要得到最长的递增子序列&#xff0c;就要让序列…

数组:最长子序列问题四种解法

数组&#xff1a;最长子序列问题四种解法 问题描述&#xff1a; 给定一个整数数组 nums &#xff0c;找到一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 示例 1 : 输入: [-2,1,-3,4,-1,2,1,-5,4], 输出: 6 解释: 连续子…

动态规划:最长子序列问题

关于动态规划中的最长子序列问题有很多优秀的解读&#xff0c;在这里推荐一位博主的关于最长子序列的文章&#xff0c;非常不错&#xff0c;配有大量的图片和文字解答&#xff0c;在这里推荐给大家。本文章转载自这里 1.基本概念 首先需要科普一下&#xff0c;最长公共子序列…

一道有趣的最长子序列问题

一道有趣的最长子序列问题 – 潘登同学的金融经济学笔记 文章目录 一道有趣的最长子序列问题 -- 潘登同学的金融经济学笔记 来源求解递推公式算法实现 来源 前几天在刷视频的时候&#xff0c;发现了这样一道题 所谓子序列就是一个序列 a i 1 , a i 2 , ⋯ , a i n a_{i1},a_{…

最长子序列最长子串的题型汇总

1.最长公共子序列的长度 题目&#xff1a;对于两个字符串&#xff0c;请设计一个高效算法&#xff0c;求他们的最长公共子序列的长度&#xff0c;这里的最长公共子序列定义为有两个序列U1,U2,U3...Un和V1,V2,V3...Vn,其中Ui&ltUi1&#xff0c;Vi&ltVi1。且A[Ui] B[Vi…

动态规划解最长子序列问题

动态规划法 经常会遇到复杂问题不能简单地分解成几个子问题&#xff0c;而会分解出一系列的子问题。简单地采用把大问题分解成子问题&#xff0c;并综合子问题的解导出大问题的解的方法&#xff0c;问题求解耗时会按问题规模呈幂级数增加。 为了节约重复求相同子问题的时间&…

最长****子序列

&#xff08;在研读大佬写的博客后&#xff0c;打算记录下自己学习过程&#xff09; 通过最长上升子序列的拓展了解到&#xff0c;其实这是一个系列的问题主要代表有&#xff1a; 1 最长上升子序列 2 最长不上升子序列 3 最长下降子序列 4 最长不下降子序列 就以最长上升子…

最长公共子序列

最长公共子序列&#xff08;Longest Common Subsequence&#xff0c;简称 LCS&#xff09;是一道非常经典的面试题目&#xff0c;因为它的解法是典型的二维动态规划&#xff0c;大部分比较困难的字符串问题都和这个问题一个套路&#xff0c;比如说编辑距离。而且&#xff0c;这…

最长公共子序列(LCS) 过程图解

1.基本概念 首先需要科普一下&#xff0c;最长公共子序列&#xff08;longest common sequence&#xff09;和最长公共子串&#xff08;longest common substring&#xff09;不是一回事儿。什么是子序列呢&#xff1f;即一个给定的序列的子序列&#xff0c;就是将给定序列中零…

最长子序列问题详解

提到最长子序列问题&#xff0c;想必大家都不陌生&#xff0c;今天我主要想分享一下我对子序列问题的一些理解&#xff1a; 先拿最长上升子序列问题来说吧&#xff1a; 很明显这是一个动态规化问题&#xff0c;仔细想想也不难得出其状态转移方程 首先介绍一下dp[]数组的含义…

如何查看pip版本

Windows系统如何查看pip版本 直接运行pip show pip