Linux生产者消费者模型

article/2025/9/12 16:48:59

文章目录

  • 生产者消费者模型
    • 生产者消费者模型的概念
    • 生产者消费者模型的特点
    • 生产者消费者模型优点
  • 基于BlockingQueue的生产者消费者模型
    • 基于阻塞队列的生产者消费者模型
    • 模拟实现基于阻塞队列的生产消费模型

生产者消费者模型

生产者消费者模型的概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器实际上就是用来给生产者和消费者解耦的。
在这里插入图片描述

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)

我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。

其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

生产者和消费者之间为什么会存在同步关系?

  • 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
  • 反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

生产者消费者模型优点

  • 解耦。
  • 支持并发。
  • 支持忙闲不均。

如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。

对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。

基于BlockingQueue的生产者消费者模型

基于阻塞队列的生产者消费者模型

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

在这里插入图片描述
其与普通的队列的区别在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。

模拟实现基于阻塞队列的生产消费模型

为了方便理解,下面我们以单生产者、单消费者为例进行实现。
在这里插入图片描述
其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现。

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>#define NUM 5template<class T>
class BlockQueue
{
private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}
public:BlockQueue(int cap = NUM): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}//向阻塞队列插入数据(生产者调用)void Push(const T& data){pthread_mutex_lock(&_mutex);while (IsFull()){//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}//从阻塞队列获取数据(消费者调用)void Pop(T& data){pthread_mutex_lock(&_mutex);while (IsEmpty()){//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程}
private:std::queue<T> _q; //阻塞队列int _cap; //阻塞队列最大容器数据个数pthread_mutex_t _mutex;pthread_cond_t _full;pthread_cond_t _empty;
};

相关说明:

  • 由于我们实现的是单生产者、单消费者的生产者消费者模型,因此我们不需要维护生产者和生产者之间的关系,也不需要维护消费者和消费者之间的关系,我们只需要维护生产者和消费者之间的同步与互斥关系即可。
  • 将BlockingQueue当中存储的数据模板化,方便以后需要时进行复用。
  • 这里设置BlockingQueue存储数据的上限为5,当阻塞队列中存储了五组数据时生产者就不能进行生产了,此时生产者就应该被阻塞。
  • 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
  • 生产者线程要向阻塞队列当中Push数据,前提是阻塞队列里面有空间,若阻塞队列已经满了,那么此时该生产者线程就需要进行等待,直到阻塞队列中有空间时再将其唤醒。
  • 消费者线程要从阻塞队列当中Pop数据,前提是阻塞队列里面有数据,若阻塞队列为空,那么此时该消费者线程就需要进行等待,直到阻塞队列中有新的数据时再将其唤醒。
  • 因此在这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,要进行生产的生产者线程就应该在full条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在empty条件变量下进行等待。
  • 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。
  • 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后需要唤醒在empty条件变量下等待的消费者线程。
  • 同样的,当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在full条件变量下进行等待,因此当消费者消费完数据后需要唤醒在full条件变量下等待的生产者线程。

判断是否满足生产消费条件时不能用if,而应该用while:

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  • 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  • 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。

#include "BlockQueue.hpp"void* Producer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;}
}
int main()
{srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;//创建生产者线程和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bqreturn 0;
}

相关说明:

  • 阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。
  • 代码中生产者生产数据就是将获取到的随机数Push到阻塞队列,而消费者消费数据就是从阻塞队列Pop数据,为了便于观察,我们可以将生产者生产的数据和消费者消费的数据进行打印输出。

生产者消费者步调一致

由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。
在这里插入图片描述
小贴士:.hpp为后缀的文件也是头文件,该头文件同时包含类的定义与实现,调用者只需include该hpp文件即可。因为开源项目一般不需要进行保护,所以在开源项目中用的比较多。

生产者生产的快,消费者消费的慢

我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费。

void* Producer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;}
}

此时由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列打满了,此时生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
在这里插入图片描述

生产者生产的慢,消费者消费的快

当然,我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费。

void* Producer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;}
}

虽然消费者消费的很快,但一开始阻塞队列中是没有数据的,因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的。
在这里插入图片描述

满足某一条件时再唤醒对应的生产者或消费者

我们也可以当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。

//向阻塞队列插入数据(生产者调用)
void Push(const T& data)
{pthread_mutex_lock(&_mutex);while (IsFull()){//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);if (_q.size() >= _cap / 2){pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}pthread_mutex_unlock(&_mutex);
}
//从阻塞队列获取数据(消费者调用)
void Pop(T& data)
{pthread_mutex_lock(&_mutex);while (IsEmpty()){//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();if (_q.size() <= _cap / 2){pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程}pthread_mutex_unlock(&_mutex);
}

我们仍然让生产者生产的快,消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。
在这里插入图片描述

基于计算任务的生产者消费者模型

当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。
由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。

例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个Run成员函数,该函数代表着我们想让消费者如何处理拿到的数据。

#pragma once
#include <iostream>class Task
{
public:Task(int x = 0, int y = 0, int op = 0): _x(x), _y(y), _op(op){}~Task(){}void Run(){int result = 0;switch (_op){case '+':result = _x + _y;break;case '-':result = _x - _y;break;case '*':result = _x * _y;break;case '/':if (_y == 0){std::cout << "Warning: div zero!" << std::endl;result = -1;}else{result = _x / _y;}break;case '%':if (_y == 0){std::cout << "Warning: mod zero!" << std::endl;result = -1;}else{result = _x % _y;}break;default:std::cout << "error operation!" << std::endl;break;}std::cout << _x << _op << _y << "=" << result << std::endl;}
private:int _x;int _y;char _op;
};

此时生产者放入阻塞队列的数据就是一个Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理。

void* Producer(void* arg)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;const char* arr = "+-*/%";//生产者不断进行生产while (true){int x = rand() % 100;int y = rand() % 100;char op = arr[rand() % 5];Task t(x, y, op);bq->Push(t); //生产数据std::cout << "producer task done" << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;//消费者不断进行消费while (true){sleep(1);Task t;bq->Pop(t); //消费数据t.Run(); //处理数据}
}

运行代码,当阻塞队列被生产者打满后消费者被唤醒,此时消费者在消费数据时执行的就是计算任务,当阻塞队列当中的数据被消费到低于一定阈值后又会唤醒生产者进行生产。
在这里插入图片描述
也就是说,此后我们想让生产者消费者模型处理某一种任务时,就只需要提供对应的Task类,然后让该Task类提供一个对应的Run成员函数告诉我们应该如何处理这个任务即可。


http://chatgpt.dhexx.cn/article/E6FdIyJg.shtml

相关文章

生产者消费者模型你知道多少

背景 进入正题之前先说点故事。从最开始学java的那里开始&#xff1a;我是从08年下半年开始学Java&#xff0c;在《我的六年程序之路》中提到了一些。当时比较简单&#xff0c;每天看尚学堂的视频&#xff08;对于初学者而言看视频好一些。&#xff09;&#xff0c;然后写代码。…

生产者消费者模型详解

生产者消费者模型 文章目录 生产者消费者模型什么是生产者消费者模型基于BlockingQueue的生产者消费者模型单生产者单消费者模型多生产者多消费者模型 什么是生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接…

Python -- 生产者消费者

代码 # -*- coding: utf-8 -*- # Author : markadc # Time : 2021/4/14 11:43from queue import Queue import time import threading# maxsize: 指定队列最大长度 q Queue(maxsize10)# 生产者 def product(name):count 0while True:# 只要队列没有满&#xff0c;就一直…

生产者与消费者

生产者和消费者 目录 生产者和消费者1.什么是生产者和消费者2.生产者和消费者(不加唤醒机制)3.生产者和消费者(加唤醒机制)4.解决虚假唤醒5.使用lock锁6.面试题 1.什么是生产者和消费者 ​ 在日常生活中&#xff0c;我们去商店买东西&#xff0c;我们就是消费者&#xff0c;商…

三种方式实现生产者-消费者模型

前言 生产者消费者问题&#xff08;英语&#xff1a;Producer-consumer problem&#xff09;&#xff0c;也称有限缓冲问题&#xff08;英语&#xff1a;Bounded-buffer problem&#xff09;&#xff0c;是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的…

生产者消费者模型

目录 一、生产者消费者模型的概念 二、生产者消费者模型的特点 三、生产者消费者模型优点 四、基于BlockingQueue的生产者消费者模型 4.1 基本认识 4.2 模拟实现 五、POSIX信号量 5.1 信号量概念 5.2 信号量函数 5.2.1 初始化信号量 5.2.2 销毁信号量 5.2.3 等待信…

打家劫舍问题

打家劫舍问题 最近碰见这种问题实在是太多了,感觉还是有必要学习一下打家劫舍以及其变种问题这一类问题采用的都是动态规划的解法 一些练习题目 6378. 最小化旅行的价格总和 198. 打家劫舍I 213. 打家劫舍 II 337. 打家劫舍 III 2560. 打家劫舍 IV 1 、打家劫舍I 题目…

经典动态规划:打家劫舍系列问题

打家劫舍系列总共有三道&#xff0c;难度设计非常合理&#xff0c;层层递进。第一道是比较标准的动态规划问题&#xff0c;而第二道融入了环形数组的条件&#xff0c;第三道更绝&#xff0c;让盗贼在二叉树上打劫. House Robber | public int rob(int[] nums);题目很容易理解…

【算法】动态规划(三)——打家劫舍系列问题

目录 一、前言 二、打家劫舍 &#xff08;1&#xff09;198. 打家劫舍Ⅰ • 整体代码&#xff1a; &#xff08;2&#xff09;213. 打家劫舍 II • 题目分析 • 整体代码&#xff1a; &#xff08;3&#xff09;337. 打家劫舍Ⅲ • 思路分析 • 整体代码&#xff1a; 三、补充知…

动态规划之打家劫舍系列

前言 打家劫舍问题是一种非常经典的有限制条件的动态规划问题&#xff0c;按理说&#xff0c;不是一种特殊的类型&#xff0c;但是因为力扣上纯纯的出了三道题&#xff08;1&#xff0c;2&#xff0c;3&#xff09;来考察&#xff0c;题目的难度是依次递进的&#xff0c;还结合…

动态规划之打家劫舍

动态规划之打家劫舍 文章目录 动态规划之打家劫舍1. "198. 打家劫舍"2. "198. 打家劫舍&#xff08;变种&#xff1a;输出路径&#xff09;"3. "213. 打家劫舍 II"4. "337. 打家劫舍 III" 1. “198. 打家劫舍” dp数组定义&#xff1a…

oracle 根据部分字段去重

问题&#xff1a;在oracle中使用group by分组&#xff0c;group by子句中必须包含所有的select中的字段和order by子句中的字段。 在不使用group by子句的情况下&#xff0c;进行分组。&#xff08;根据部分字段分组&#xff09; over()分析函数 原sql SELECTIM. ID mediaGrou…

oracle字段去重查询,oracle怎么去重查询

oracle去重查询的方法是&#xff1a; oracle 数据库多字段去重 方法介绍&#xff1a;distinct 关键字、group by 、row_number ()over(partition by 列 order by 列 desc) 我的需求是&#xff1a;根据某几列去重 查询出去重后的全部信息。最后我选择的是第三种方法。 我的想法&…

oracle 数据去重方法

1. 创建表&#xff1a; -- Create table create table TEST_USER (user_id NUMBER(3),user_name VARCHAR2(20),user_age NUMBER(3) ) tablespace GUAN_TABLESPACEpctfree 10initrans 1maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);--测试数据…

oracle 字符串去重

select regexp_replace(1,1,3,5,5, ([^,])(,\1)*(,|$), \1\3) from dual;注意&#xff1a; 但是&#xff0c;这个去重&#xff0c;必须建立在排序的基础上&#xff0c;如果listagg拼接出来的数值像 a, b, a, c 这时候&#xff0c;该正则就会失效。

MYSQL/ORACLE多字段去重-根据某字段去重

通过百度上的答案多数无效 自己搞了个 使用oracle row_number()函数&#xff0c;给每个同名的加一个序号&#xff0c;最后筛选第n个想同的即可 oracle与mysql不同 1.oracel 多字段distinct(字段名去重) group by去重失效 可以用row_number() over(partition) 给同名列加个序号…

Oracle 数据去重

在Oracle数据库中删除重复数据 一&#xff0c;查询及删除重复记录的SQL语句 Person01表&#xff1a; 1. 查询表中多余的重复数据&#xff0c;根据ID字段来判断是否重复 SELECT * FROM PERSON01 WHERE ID IN (SELECT ID FROM PERSON01 GROUP BY ID HAVING COUNT(ID) > 1)…

Oracle根据多列去重

&#xff08;1&#xff09;distinct 关键词 distinct用于返回唯一不同的值&#xff0c;可作用于单列和多列 但必须将其放在开头&#xff0c;否则会提示错误 而若在其后添加多个变量名&#xff0c;则返回的将是这多个变量名不同时重复的列&#xff0c;因而使用distinct筛选某…

oracle 数据库去重查询

oracle数据库中有如下一张表&#xff0c;包含id,loginid,name,researchtime等字段&#xff0c;其中name字段中的数据有重复&#xff0c;查询数据时要重复数据只取一条&#xff0c;利用row_number ()over(partition by 列 order by 列 desc)方法实现 1:select a.,row_number() o…

oracle去重函数

1、distinct &#xff08;1&#xff09;、常用的distinct select distinct column from table; &#xff08;2&#xff09;、统计去重后数量 select count(distinct column) from table;–查去重后数量 &#xff08;3&#xff09;、distinct必须放在开头 select id, distinct n…