生产者消费者模型详解

article/2025/9/12 16:43:04

生产者消费者模型

文章目录

  • 生产者消费者模型
    • 什么是生产者消费者模型
    • 基于BlockingQueue的生产者消费者模型
      • 单生产者单消费者模型
      • 多生产者多消费者模型

什么是生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

我们首先举个例子便于理解生产者消费者模型:

我们平常都会去超市买一些需要的物品,给我们提供产品的是供应商,我们是消费者,我们是去超市消费,给我们提供产品的而是供应商,为什么我们会去超市呢?因为超市给我们提供了交易场所,生产者消费者模型本质是生产者把产品放到超市里面,消费者把产品从超市中拿出去。

计算机中的数据相当于产品,数据的加工处理是CPU以任务形式加工处理,生产者和消费者代表的是多线程或者多进程,超市相当于一段内存

生活中生产者将产品放在一个交易场所,让我们自己去拿,为什么这样呢?是因为效率高,比如菜鸟驿站的场景,快递公司把一批包裹放进菜鸟驿站,然后我们去拿就好了,提高了效率

厂商将交易场所当作缓冲区来用,生产者和消费者一般是进程和线程,空间或者交易场所:一块"内存块",产品相当于数据

生产者消费者321原则(自己总结:3种关系,2种角色。一个交易场所):

3种关系,生产者和生产者,生产者和消费者,消费者和消费者

生产者和生产者竞争关系(互斥),生产者和消费者(同步),消费者和消费者竞争关系(互斥)

2种角色,生产者和消费者

1个交易场所:一块"内存块"

int add(int a,int b)
{return a+b;
}
int main()
{int a = 10;int b = 20;int c = add(a,b);
}

方式一:

image-20220405161145924

方式二:

image-20220405161949769

第一种方式main函数生产a和b后调用add,main函数需要等待add函数执行完才能继续工作,这种称为强耦合,这种工作方式是串行,这种方式耦合度高,而第二种方式是线程1去执行main函数生成a和b,线程2去执行add函数,它们直接有一块区域来给线程1放数据和给线程2取数据,当线程2在add的时候,线程1也可以放数据,这种方式称为并行,也称之为解耦

并行是一种需求,表示有很多业务活动同时进行。异步是一种代码编写方式,同步和串行:同步是一种代码编写方式,串行是一种需求

基于BlockingQueue的生产者消费者模型

阻塞队列有什么用?

image-20220405163412522

如果不把数据往阻塞队列里面放,假设数据通过网络给服务器用了10ms,服务器让线程1把数据写入数据库用了5ms,一共15ms,用户一共等待15ms,如果把数据往阻塞队列里面放,因为阻塞队列是在内存当中的,而数据库是文件是需要进行IO的,放在阻塞队列的时间要比放在数据库的时间少一些,所以用户等待的时间就少一些。

下面我们写一个单生产者单消费者模型:

单生产者单消费者模型

Makefile的编写:

main:main.ccg++ $^ -o $@ -lpthread
.PHONY:clean
clean:rm -f main

BlockQueue.hpp

#ifndef __BLOCK_QUEUE_H__
#define __BLOCK_QUEUE_H__
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
class BlockQueue
{
private:std::queue<int> q;size_t _cap;pthread_mutex_t lock;pthread_cond_t c_cond;//消费者的条件不满足时,将来消费者在该条件变量下等pthread_cond_t p_cond;//生产者的条件不满足时,将来生产者在该条件下等
public:bool IsFull(){return q.size() >= _cap;}bool IsEmpty(){return q.empty();}void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void WakeUpComsumer(){pthread_cond_signal(&c_cond);}void WakeUpProductor(){pthread_cond_signal(&p_cond);}void ProducterWait(){pthread_cond_wait(&p_cond,&lock);//这里为什么要传锁,我们在等待时肯定是条件不满足了,我们通过判断才知道条件满不满足,//判断就需要保证进入临界区,我们是持有锁进入的,wait的时候必须要释放锁//在调用该函数的时候,自动会释放lock//当该函数被返回时,返回到了临界区内,所以,该函数会让该线程重新持有该锁}void ComsumerWait(){pthread_cond_wait(&c_cond,&lock);//在消费者释放锁时,生产者正申请锁,而消费者在等待}
public:BlockQueue(size_t cap):_cap(cap){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&c_cond,nullptr);pthread_cond_init(&p_cond,nullptr);}void Put(int in){LockQueue();//if(isFull())while(isFull()){WakeUpComsumor();//唤醒消费者ProducterWait();//生产者等待}q.push(in);UnLockQueue();}void Get(int& out){LockQueue();//if(IsEmpty)while(IsEmpty()){WakeUpProductor();ComsumerWait();//消费者者等待}out = q.front();q.pop();LockQueue();}~BlockQueue(){pthread_cond_destroy(&lcok);pthread_cond_destroy(&c_cond);pthread_cond_destroy(&p_cond);   }
};
#endif

main.cc

#include"BlockQueue.hpp"
using namespace std;
void* consumer_run(void* arg)
{BlockQueue *bq = (BlockQueue*)arg;while(true){int n = 0;bq->Get(n);cout<<"consumer data is : " << n <<endl;}
}
void* productor_run(void* arg)
{BlockQueue *bq = (BlockQueue*)arg;while(true){int data = rand()%10+1;bq->Put(data);cout<<"product data is : "<<data<<endl;}
}
int main()
{BlockQueue *bq = new BlockQueue(5);pthread_t c,p;pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0;
}
image-20220405173132183

上面代码有个细节不能用if判断满和空,万一被提前唤醒或者等待函数调用失败,它们会继续执行push数据和pop数据,此时就是非法操作了

多生产者多消费者模型

上面是单生产者单消费者模型,只解决了生产者和消费者之间的关系,如果是多生产者多消费者模型那么需要添加另外两种关系:还有生产者和生产者的关系(互斥),以及消费者和消费者的关系(互斥)

所以我们只需要在外面生产者和消费者的例程中放数据和取数据加锁即可:

#include"BlockQueue.hpp"
using namespace std;
pthread_mutex_t c_lock;
pthread_mutex_t p_lock;void* consumer_run(void* arg)
{BlockQueue *bq = (BlockQueue*)arg;while(true){int n = 0;pthread_mutex_lock(&c_lock);bq->Get(n);pthread_mutex_unlock(&c_lock);cout<<"consumer data is : " << n <<endl;}
}
void* productor_run(void* arg)
{BlockQueue *bq = (BlockQueue*)arg;while(true){pthread_mutex_lock(&p_lock);int data = rand()%10+1;bq->Put(data);pthread_mutex_unlock(&p_lock);cout<<"product data is : "<<data<<endl;}
}
int main()
{BlockQueue *bq = new BlockQueue(5);pthread_t c,p;pthread_mutex_init(&c_lock,nullptr);pthread_mutex_init(&p_lock,nullptr);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete(bq);pthread_mutex_destroy(&c_lock);pthread_mutex_destroy(&p_lock);return 0;
}

那么上面做的事情有什么用呢?下面我们来看下面这个代码:

#ifndef __QUEUE_BLOCK_H__
#define __QUEUE_BLOCK_H__
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
class Task
{
public:int _x;int _y;
public:Task(){}Task(int x,int y):_x(x),_y(y){}int Run(){return _x+_y;}~Task(){}
};
class BlockQueue
{
private:std::queue<Task> q;size_t _cap;pthread_mutex_t lock;pthread_cond_t c_cond;//消费者的条件不满足时,将来消费者在该条件变量下等pthread_cond_t p_cond;//生产者的条件不满足时,将来生产者在该条件下等
public:bool IsFull(){return q.size() >= _cap;}bool IsEmpty(){return q.empty();}void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void WakeUpComsumer(){pthread_cond_signal(&c_cond);}void WakeUpProductor(){pthread_cond_signal(&p_cond);}void ProducterWait(){pthread_cond_wait(&p_cond,&lock);//这里为什么要传锁,我们在等待时肯定是条件不满足了,我们通过判断才知道条件满不满足,//判断就需要保证进入临界区,我们是持有锁进入的,wait的时候必须要释放锁//在调用该函数的时候,自动会释放lock//当该函数被返回时,返回到了临界区内,所以,该函数会让该线程重新持有该锁}void ComsumerWait(){pthread_cond_wait(&c_cond,&lock);//在消费者释放锁时,生产者正申请锁,而消费者在等待}
public:BlockQueue(size_t cap):_cap(cap){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&c_cond,nullptr);pthread_cond_init(&p_cond,nullptr);}void Put(Task t){LockQueue();//if(isFull())while(isFull()){WakeUpComsumor();//唤醒消费者ProducterWait();//生产者等待}q.push(t);UnLockQueue();}void Get(Task& t){LockQueue();//if(IsEmpty)while(IsEmpty()){WakeUpProductor();ComsumerWait();//消费者者等待}t = q.front();q.pop();UnLockQueue();}~BlockQueue(){pthread_cond_destroy(&lcok);pthread_cond_destroy(&c_cond);pthread_cond_destroy(&p_cond);   }
};#endif
#include"BlockQueue.hpp"pthread_mutex_t c_lock;
pthread_mutex_t p_lock;
void* r1(void* arg)
{//生产者BlockQueue* bq =(BlockQueue*)arg;while(true){pthread_mutex_lock(&p_lock);int x = rand()%10+1;int y = rand()%100+1;Task t(x,y);bq->Put(t);pthread_mutex_unlock(&p_lock);cout<<"Product Task is: "<<x<<'+'<<y<<"= ?"<<endl;}
}
void* r2(void* arg)
{//消费者BlockQueue* bq = (BlockQueue*)arg;while(true){//取数据Task t;pthread_mutex_lock(&c_lock);bq->Get(t);pthread_mutex_unlock(&c_lock);cout<<"Consumer: "<<t._x<<"+"<<t._y<<"="<<t.Run()<<endl;}
}
int main()
{BlockQueue* bq = new BlockQueue(5);pthread_t c,p;pthread_mutex_init(&p_lock,NULL);pthread_mutex_init(&c_lock,NULL);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_join(p,nullptr);pthread_join(c,nullptr);pthread_mutex_destroy(&c_lock);pthread_mutex_destroy(&p_lock);delete bq;return 0;
}

image-20220405190040430
这样就做到了一些线程放数据,另一些线程帮我们计算数据,也就是完成相应的任务。


http://chatgpt.dhexx.cn/article/aPQNVQQ0.shtml

相关文章

Python -- 生产者消费者

代码 # -*- coding: utf-8 -*- # Author : markadc # Time : 2021/4/14 11:43from queue import Queue import time import threading# maxsize: 指定队列最大长度 q Queue(maxsize10)# 生产者 def product(name):count 0while True:# 只要队列没有满&#xff0c;就一直…

生产者与消费者

生产者和消费者 目录 生产者和消费者1.什么是生产者和消费者2.生产者和消费者(不加唤醒机制)3.生产者和消费者(加唤醒机制)4.解决虚假唤醒5.使用lock锁6.面试题 1.什么是生产者和消费者 ​ 在日常生活中&#xff0c;我们去商店买东西&#xff0c;我们就是消费者&#xff0c;商…

三种方式实现生产者-消费者模型

前言 生产者消费者问题&#xff08;英语&#xff1a;Producer-consumer problem&#xff09;&#xff0c;也称有限缓冲问题&#xff08;英语&#xff1a;Bounded-buffer problem&#xff09;&#xff0c;是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的…

生产者消费者模型

目录 一、生产者消费者模型的概念 二、生产者消费者模型的特点 三、生产者消费者模型优点 四、基于BlockingQueue的生产者消费者模型 4.1 基本认识 4.2 模拟实现 五、POSIX信号量 5.1 信号量概念 5.2 信号量函数 5.2.1 初始化信号量 5.2.2 销毁信号量 5.2.3 等待信…

打家劫舍问题

打家劫舍问题 最近碰见这种问题实在是太多了,感觉还是有必要学习一下打家劫舍以及其变种问题这一类问题采用的都是动态规划的解法 一些练习题目 6378. 最小化旅行的价格总和 198. 打家劫舍I 213. 打家劫舍 II 337. 打家劫舍 III 2560. 打家劫舍 IV 1 、打家劫舍I 题目…

经典动态规划:打家劫舍系列问题

打家劫舍系列总共有三道&#xff0c;难度设计非常合理&#xff0c;层层递进。第一道是比较标准的动态规划问题&#xff0c;而第二道融入了环形数组的条件&#xff0c;第三道更绝&#xff0c;让盗贼在二叉树上打劫. House Robber | public int rob(int[] nums);题目很容易理解…

【算法】动态规划(三)——打家劫舍系列问题

目录 一、前言 二、打家劫舍 &#xff08;1&#xff09;198. 打家劫舍Ⅰ • 整体代码&#xff1a; &#xff08;2&#xff09;213. 打家劫舍 II • 题目分析 • 整体代码&#xff1a; &#xff08;3&#xff09;337. 打家劫舍Ⅲ • 思路分析 • 整体代码&#xff1a; 三、补充知…

动态规划之打家劫舍系列

前言 打家劫舍问题是一种非常经典的有限制条件的动态规划问题&#xff0c;按理说&#xff0c;不是一种特殊的类型&#xff0c;但是因为力扣上纯纯的出了三道题&#xff08;1&#xff0c;2&#xff0c;3&#xff09;来考察&#xff0c;题目的难度是依次递进的&#xff0c;还结合…

动态规划之打家劫舍

动态规划之打家劫舍 文章目录 动态规划之打家劫舍1. "198. 打家劫舍"2. "198. 打家劫舍&#xff08;变种&#xff1a;输出路径&#xff09;"3. "213. 打家劫舍 II"4. "337. 打家劫舍 III" 1. “198. 打家劫舍” dp数组定义&#xff1a…

oracle 根据部分字段去重

问题&#xff1a;在oracle中使用group by分组&#xff0c;group by子句中必须包含所有的select中的字段和order by子句中的字段。 在不使用group by子句的情况下&#xff0c;进行分组。&#xff08;根据部分字段分组&#xff09; over()分析函数 原sql SELECTIM. ID mediaGrou…

oracle字段去重查询,oracle怎么去重查询

oracle去重查询的方法是&#xff1a; oracle 数据库多字段去重 方法介绍&#xff1a;distinct 关键字、group by 、row_number ()over(partition by 列 order by 列 desc) 我的需求是&#xff1a;根据某几列去重 查询出去重后的全部信息。最后我选择的是第三种方法。 我的想法&…

oracle 数据去重方法

1. 创建表&#xff1a; -- Create table create table TEST_USER (user_id NUMBER(3),user_name VARCHAR2(20),user_age NUMBER(3) ) tablespace GUAN_TABLESPACEpctfree 10initrans 1maxtrans 255storage(initial 64Knext 1Mminextents 1maxextents unlimited);--测试数据…

oracle 字符串去重

select regexp_replace(1,1,3,5,5, ([^,])(,\1)*(,|$), \1\3) from dual;注意&#xff1a; 但是&#xff0c;这个去重&#xff0c;必须建立在排序的基础上&#xff0c;如果listagg拼接出来的数值像 a, b, a, c 这时候&#xff0c;该正则就会失效。

MYSQL/ORACLE多字段去重-根据某字段去重

通过百度上的答案多数无效 自己搞了个 使用oracle row_number()函数&#xff0c;给每个同名的加一个序号&#xff0c;最后筛选第n个想同的即可 oracle与mysql不同 1.oracel 多字段distinct(字段名去重) group by去重失效 可以用row_number() over(partition) 给同名列加个序号…

Oracle 数据去重

在Oracle数据库中删除重复数据 一&#xff0c;查询及删除重复记录的SQL语句 Person01表&#xff1a; 1. 查询表中多余的重复数据&#xff0c;根据ID字段来判断是否重复 SELECT * FROM PERSON01 WHERE ID IN (SELECT ID FROM PERSON01 GROUP BY ID HAVING COUNT(ID) > 1)…

Oracle根据多列去重

&#xff08;1&#xff09;distinct 关键词 distinct用于返回唯一不同的值&#xff0c;可作用于单列和多列 但必须将其放在开头&#xff0c;否则会提示错误 而若在其后添加多个变量名&#xff0c;则返回的将是这多个变量名不同时重复的列&#xff0c;因而使用distinct筛选某…

oracle 数据库去重查询

oracle数据库中有如下一张表&#xff0c;包含id,loginid,name,researchtime等字段&#xff0c;其中name字段中的数据有重复&#xff0c;查询数据时要重复数据只取一条&#xff0c;利用row_number ()over(partition by 列 order by 列 desc)方法实现 1:select a.,row_number() o…

oracle去重函数

1、distinct &#xff08;1&#xff09;、常用的distinct select distinct column from table; &#xff08;2&#xff09;、统计去重后数量 select count(distinct column) from table;–查去重后数量 &#xff08;3&#xff09;、distinct必须放在开头 select id, distinct n…

oracle 数据库 去重查询

oracle 数据库多字段去重 方法介绍&#xff1a;distinct 关键字、group by 、row_number ()over(partition by 列 order by 列 desc) 我的需求是&#xff1a;根据某几列去重 查询出去重后的全部信息。最后我选择的是第三种方法。 我的想法&#xff1a;我想找出一种更简单的方…

Oracle实现去重的两种方式总结

业务场景 需要查询某数据&#xff0c;由于需要三张表关联查询&#xff0c;查询结果如下&#xff1a; 原始SQL语句 SELECT D.ORDER_NUM AS "申请单号" ,D.CREATE_TIME ,D.EMP_NAME AS "申请人",(SELECT extractvalue(t1.row_data,/root/row/FI13_wasteNam…