c++ 多线程并发

article/2025/10/7 18:10:51

基于C++11的线程池(threadpool),简洁且可以带任意多的参数 - _Ong - 博客园

C++11多线程编程(六)——线程池的实现

1.

① thread

#include <iostream>
#include <thread>class A
{
public:void operator()() {std::cout << "11111\n";}
};int main()
{A a;std::thread t(a);if(t.joinable()){std::cout << "t.joinable() == true\n";t.join();}else{std::cout << "t.joinable() == false\n";}std::thread t0((A())); // 多组括号if(t0.joinable()){std::cout << "t0.joinable() == true\n";t0.join();}else{std::cout << "t0.joinable() == false\n";}std::thread t1{(A())}; // 统一的初始化语法if(t1.joinable()){std::cout << "t1.joinable() == true\n";t1.join();}else{std::cout << "t1.joinable() == false\n";}std::thread t2([]{std::cout << "11111\n";}); // Lamda表达式if(t2.joinable()){std::cout << "t2.joinable() == true\n";t2.join();}else{std::cout << "t2.joinable() == false\n";}return 0;
}

 

下面这个相当于声明了一个名为t1的函数,这个函数带有一个参数(函数指针指向没有参数并返回A对象的函数),返回一个std::thread对象的函数:

 

 

 

2.竞争冒险(竞态条件)

程序的输出依赖于一个或多个线程的执行顺序,最常见的例子是多个线程同时向同一个cout输出。就像下面这样,竞态条件会使得两个线程交替打印,打出来的信息会混到一起:

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;void func1()    {for (int i = 0; i > -10; i--)cout << "t1 thread: "<< i << endl;
}int main()  {thread t1(func1);for (int i = 0; i < 10; i++)cout << "main thread: " << i << endl;t1.join();return 0;
}

① 使用mutex(互斥锁)解决

解决竞态条件,可以理解为进(线)程同步的方法。

每次调用cout时都使用mutex mu,就能使得每个时间点都只有一个线程在使用mutex,就能防止每行打印混乱。

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mu;void shared_print(string msg, int id){mu.lock();cout << msg << id << endl;mu.unlock();
}void func1() {for (int i = 0; i > -10; i--)shared_print("t1 thread: ", i);// cout << "t1 thread: "<< i << endl;
}int main()  {thread t1(func1);for (int i = 0; i < 10; i++)shared_print("main thread: ", i);// cout << "main thread: " << i << endl;t1.join();return 0;
}

② 使用lock_guard解决

直接使用mutex是有缺陷的。如果mutex.lock()和mutex.unlock()之间的代码出现异常的话,那么mutex就不会被unlock,就会死锁了。因此,不推荐直接使用mutex。因此,推荐使用std::lock_guard<std::mutex> guard(mu)

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mu;void shared_print(string msg, int id){lock_guard<mutex> guard(mu); // RAII// mu.lock();cout << msg << id << endl;// mu.unlock();
}void func1() {for (int i = 0; i > -10; i--)shared_print("t1 thread: ", i);// cout << "t1 thread: "<< i << endl;
}int main()  {thread t1(func1);for (int i = 0; i < 10; i++)shared_print("main thread: ", i);// cout << "main thread: " << i << endl;t1.join();return 0;
}

注:lock_guard在离开作用域时,会自动释放锁,因此使用大括号包围lock之后,调用完lock_guard语句之后会自动析构,释放锁。

虽然lock_guard挺好用的,但是有个很大的缺陷,在定义lock_guard的地方会调用构造函数加锁,在离开定义域的话lock_guard就会被销毁,调用析构函数解锁。这就产生了一个问题,如果这个定义域范围很大的话,那么锁的粒度就很大,很大程序上会影响效率。所以为了解决lock_guard锁的粒度过大的原因,unique_lock就出现了 unique_lock<mutex> unique(mt);

这个会在构造函数加锁,然后可以利用unique.unlock()来解锁,所以当你觉得锁的粒度太多的时候,可以利用这个来解锁,而析构的时候会判断当前锁的状态来决定是否解锁,如果当前状态已经是解锁状态了,那么就不会再次解锁,而如果当前状态是加锁状态,就会自动调用unique.unlock()来解锁。而lock_guard在析构的时候一定会解锁,也没有中途解锁的功能。当然,方便肯定是有代价的,unique_lock内部会维护一个锁的状态,所以在效率上肯定会比lock_guard慢。

所以,以上两种加锁解锁的方法,加上前面文章介绍的mutex方法,具体该使用哪一个,要依照具体的业务需求来决定,当然mt.lock()和mt.unlock()不管是哪种情况,是肯定都可以使用的。

③  原子操作

互斥锁可以实现数据的同步,但同时是以牺牲性能为代价的。举个例子:我们把0加一再减一,循环一定的次数,开启20个线程来观察,这个正确的结果应该还是等于0的。

首先是不加任何互斥锁同步:

#include <iostream>
#include <thread>
#include <atomic>
#include <time.h>
#include <mutex>
using namespace std;#define MAX 100000
#define THREAD_COUNT 20
int total = 0;void thread_task()
{for (int i = 0; i < MAX; i++){total += 1;total -= 1;}
}int main()
{clock_t start = clock();thread t[THREAD_COUNT];for (int i = 0; i < THREAD_COUNT; ++i){t[i] = thread(thread_task);}for (int i = 0; i < THREAD_COUNT; ++i){t[i].join();}clock_t finish = clock();cout << "result:" << total << endl;cout << "duration:" << finish - start << "ms" << endl;return 0;
}

可见,以上程序运行很快,但是结果却不正确的。

那么我们将线程加上互斥锁mutex再来看看:

#include <iostream>
#include <thread>
#include <atomic>
#include <time.h>
#include <mutex>
using namespace std;#define MAX 100000
#define THREAD_COUNT 20
int total = 0;
mutex mu;void thread_task()
{for (int i = 0; i < MAX; i++){mu.lock();total += 1;total -= 1;mu.unlock();}
}int main()
{clock_t start = clock();thread t[THREAD_COUNT];for (int i = 0; i < THREAD_COUNT; ++i){t[i] = thread(thread_task);}for (int i = 0; i < THREAD_COUNT; ++i){t[i].join();}clock_t finish = clock();cout << "result:" << total << endl;cout << "duration:" << finish - start << "ms" << endl;return 0;
}

我们可以看到运行结果是正确的,但是时间比原来慢太多了。虽然很无奈,但这也是没有办法的,因为只有在保证准确的前提才能去追求性能。

那有没有什么办法在保证准确的同时,又能提高性能呢?通过原子操作。

#include <iostream>
#include <thread>
#include <atomic>
#include <time.h>
#include <mutex>
using namespace std;
#include <atomic>#define MAX 100000
#define THREAD_COUNT 20
// int total = 0;
atomic_int total(0); // 原子操作
mutex mu;void thread_task()
{for (int i = 0; i < MAX; i++){// mu.lock();total += 1;total -= 1;// mu.unlock();}
}int main()
{clock_t start = clock();thread t[THREAD_COUNT];for (int i = 0; i < THREAD_COUNT; ++i){t[i] = thread(thread_task);}for (int i = 0; i < THREAD_COUNT; ++i){t[i].join();}clock_t finish = clock();cout << "result:" << total << endl;cout << "duration:" << finish - start << "ms" << endl;return 0;
}

可以看到,我们在这里只需要定义atomic_int total(0)就可以实现原子操作了,就不需要互斥锁了。而性能的提升也是非常明显的,这就是原子操作的魅力所在。

④ 条件变量

当某个线程持有这把锁的时候(就是所谓的加锁),那么这个线程是独占所有的资源,这里的资源指的是执行的权限,其他要抢夺资源的线程都不得不等待。在很多情况下,这都容易适用,但是有些情况下,却会产生一些异常情况。

在生产消费者模型当中,肯定都会用到互斥锁的机制的,当生产者往队列中放数据的瞬间,消费者是不能取数据的,那这时候可能会碰见一个问题,如果生成者因为某些原因,放数据过慢,但是消费者取数据很快,当队列中没有数据了,消费者还去取的话,就会发生异常情况。有些人可能会说,加个条件判断一下队列是否为空不就可以了。

这个肯定是当然可以的,但是在队列依旧没有数据的这一段时间,是要不断的循环判断这个条件,CPU肯定是会飙升的,浪费了很多不必要的资源。这时候我们设想,能否设计这样的一种机制,如果在队列没有数据的时候,消费者线程能一直阻塞在那里,等待着别人给它唤醒,在生产者往队列中放入数据的时候通知一下这个等待线程,唤醒它,告诉它可以来取数据了。

条件变量是多线程数据同步的一种操作,不管是用哪种框架,哪种语言实现多线程的功能,条件变量都是不得不考虑的一种情况。C++中提供了#include <condition_variable>头文件,里面就包含了条件变量的相关类。其中有两个非常重要的接口,wait()和notify_one(),wait()可以让线程陷入休眠状态,意思就是不干活了,notify_one()就是唤醒真正休眠状态的线程,开始干活了。当然还有notify_all()这个接口,顾名思义,就是通知所有正在等待的线程,起来干活了。

#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;deque<int> q;
mutex mt;
condition_variable cond; // 条件变量void thread_producer()
{int count = 10;while (count > 0){unique_lock<mutex> unique(mt);q.push_front(count);unique.unlock();cout << "producer a value: " << count << endl;cond.notify_one(); // 唤醒this_thread::sleep_for(chrono::seconds(1));count--;}
}void thread_consumer()
{int data = 0;while (data != 1){unique_lock<mutex> unique(mt);while (q.empty())cond.wait(unique); // 休眠data = q.back();q.pop_back();cout << "consumer a value: " << data << endl;unique.unlock();}
}int main()
{thread t1(thread_consumer);thread t2(thread_producer);t1.join();t2.join();return 0;
}

生产者:首先生产者利用unique_lock来加锁,然后将生产的数据放入队列,打印,解锁,一旦解锁之后,消费者获得了执行机会 ——>

消费者:另一方面消费者就会通过unique_lock获得控制权,也就是获得锁,然后判断队列为空的话就一直盗用wait()函数阻塞在那里,等待其他线程来唤醒它。而阻塞该线程时,该函数会自动解锁,允许其他线程执行 ——>

生产者:再次回到生产者这里,生产者线程利用利用条件变量cond.notify_one()来通知阻塞的线程起来干活了 ——>

消费者:阻塞在那里的消费者线程一旦得到notify唤醒,该函数取消阻塞并获取锁,然后取出队列中的数据,并打印,最后解锁 ——>

生产者:再次回到生产者,然后生产者休眠1秒,这里休眠是为了模拟生产者生产慢的情况,实际开发的时候不要去休眠。最后减一,进入下一次生产。

以上就是利用条件变量来实现生产消费者模型,这个会大大降低CPU的占有率,当然代价就是编程稍微有点麻烦,但与这优化程序来比,这肯定是值的。

3.线程池

我们可以看到多线程提高了CPU的使用率和程序的工作效率,但是如果有大量的线程,就会影响性能,因为要大量的创建与销毁,因为CPU需要在它们之间切换。线程池可以想象成一个池子,它的作用就是让每一个线程结束后,并不会销毁,而是放回到线程池中成为空闲状态,等待下一个对象来使用。

但是让人遗憾的是,C++并没有在语言级别上支持线程池技术,虽然无法从语言级别上支持,但是我们可以利用条件变量和互斥锁自己实现一个线程池。这里就不得不啰嗦几句,条件变量和互斥锁就像两把利剑,几乎可以实现多线程技术中的大部分问题,不管是生产消费者模型,还是线程池,亦或是信号量,所以我们必须好好掌握好这两个工具。

思路:

  • 对于线程池ThreadPool,必须要有构造和析构函数,构造函数中,创建N个线程(这个自己指定),插入到工作线程当中,工作线程可以是vector结构。工作线程中的线程具体要做什么呢?进入线程的时候必要用unique_lock进程加锁处理,不能让其他线程以及主线程影响到要处理的这个线程。判断任务队列是否为空,如果为空,则利用条件变量中的wait函数来阻塞该线程,等待任务队列不为空之后唤醒它。然后取出任务队列中的任务,执行任务中的具体操作。

  • 接着将任务放入任务队列taskQueue,这里的任务是外部根据自己的业务自己定义的,可以是对象,可以是函数,结构体等等,而任务队列这里定义为queue结构,一定要记得将任务放入任务队列的时候,要在之前加锁,放入之后在解锁,这里的加锁解锁可以用unique_lock结构,当然也可以用mutex结构,而放入任务队列之后就可以用条件变量的notify_one函数通知阻塞的线程来取任务处理了。

  • 线程池的实现就有点像是生产消费者模型,append()就像是生产者,不断的将任务放入队列,run()函数就像消费者,不断的从任务队列中取出任务来处理,生产消费的两头分别用notify_one()和wait()来唤醒和阻塞。更加详细的介绍可以去看我的上一篇文章。

  • 最后写一个main文件来调用线程池的相关接口,main文件里定义一个任务对象,然后是main函数。

实现:

threadPool.h

/** @Author: error: error: git config user.name & please set dead value or install git && error: git config user.email & please set dead value or install git & please set dead value or install git* @Date: 2023-02-08 15:25:55* @LastEditors: error: error: git config user.name & please set dead value or install git && error: git config user.email & please set dead value or install git & please set dead value or install git* @LastEditTime: 2023-02-08 15:29:36* @FilePath: /cpp_test/threadPool.h* @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE*/
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <vector>
#include <queue>
#include <thread>
#include <iostream>
#include <condition_variable>
using namespace std;const int MAX_THREADS = 1000; //最大线程数目template <typename T>
class threadPool
{
public:threadPool(int number = 1);~threadPool();bool append(T *task);//工作线程需要运行的函数,不断的从任务队列中取出并执行static void *worker(void *arg);void run();private://工作线程vector<thread> workThread;//任务队列queue<T *> taskQueue;mutex mt;condition_variable condition;bool stop;
};template <typename T>
threadPool<T>::threadPool(int number) : stop(false)
{if (number <= 0 || number > MAX_THREADS)throw exception();for (int i = 0; i < number; i++){cout << "create thread:" << i << endl;workThread.emplace_back(worker, this);}
}template <typename T>
inline threadPool<T>::~threadPool()
{{unique_lock<mutex> unique(mt);stop = true;}condition.notify_all();for (auto &wt : workThread)wt.join();
}template <typename T>
bool threadPool<T>::append(T *task)
{//往任务队列添加任务的时候,要加锁,因为这是线程池,肯定有很多线程unique_lock<mutex> unique(mt);taskQueue.push(task);unique.unlock();//任务添加完之后,通知阻塞线程过来消费任务,有点像生产消费者模型condition.notify_one();return true;
}template <typename T>
void *threadPool<T>::worker(void *arg)
{threadPool *pool = (threadPool *)arg;pool->run();return pool;
}template <typename T>
void threadPool<T>::run()
{while (!stop){unique_lock<mutex> unique(this->mt);//如果任务队列为空,就停下来等待唤醒,等待另一个线程发来的唤醒请求while (this->taskQueue.empty())this->condition.wait(unique);      T *task = this->taskQueue.front();this->taskQueue.pop();if (task)task->process();}
}
#endif

main.cpp

#include "threadPool.h"
#include <string>
using namespace std;class Task
{
private:int total = 0;public:void process();
};//任务具体实现什么功能,由这个函数实现
void Task::process()
{//这里就输出一个字符串cout << "task successful!" << endl;this_thread::sleep_for(chrono::seconds(1));
}template class std::queue<Task>;
int main(void)
{threadPool<Task> pool(1);std::string str;while (1){Task *task = new Task();pool.append(task);delete task;}
}


http://chatgpt.dhexx.cn/article/JdisQvgS.shtml

相关文章

python多线程并发请求

再api测试时&#xff0c;避免不了高并发的测试情况。所以以下案例为线程并发请求代码&#xff0c;以请求百度为例 #!/usr/bin/env python #!coding:utf-8 from __future__ import division from threading import Thread import requests import matplotlib.pyplot as plt imp…

多线程并发的一些解决思路

一、利用不变性解决并发问题 不变性&#xff08;Immutability&#xff09;模式。所谓不变性&#xff0c;简单来讲&#xff0c;就是对象一旦被创建之后&#xff0c;状态就不再发生变化。换句话说&#xff0c;就是变量一旦被赋值&#xff0c;就不允许修改了&#xff08;没有写操…

python之多线程并发

前言 今天呢笔者想和大家来聊聊python多线程的并发&#xff0c;废话就不多说了咱们直接进入主题哟。 一、线程执行 python的内置模块提供了两个内置模块&#xff1a;thread和threading&#xff0c;thread是源生模块&#xff0c;threading是扩展模块&#xff0c;在thread的基础…

python多线程并发测试

python多线程 文章目录 python多线程一、并发与并行&#xff1f;二、同步与异步的概念&#xff1f;三、线程与进程的区别&#xff1f;需求1&#xff1a;多线程执行不同任务&#xff1a;需求2&#xff1a;多线程执行相同任务&#xff1a;1.threading并发性2.多线程并发---资源共…

JAVA多线程并发

JAVA并发知识库 JAVA线程实现/创建方式 1.继承Thread类 Thread类本质上时实现了Runnable接口的一个实例&#xff0c;代表一个现成的实例。启动线程的唯一方法就是通过Thread类的start()实例方法。start()方法是一个native方法&#xff0c;它将启动一个新线程&#xff0c;并执…

多线程 与并发

官方文档 https://docs.oracle.com/javase/tutorial/essential/concurrency/index.html 推荐《Java高并发编程详解&#xff1a;多线程与架构设计》 推荐《Java高并发编程详解&#xff1a;深入理解并发核心库》 有很多工具的基准测试 同步和异步 所谓同步就是一个任务的完成需…

多线程和并发问题详解

文章目录 一、进程与线程二、并发与并行1、线程安全问题&#xff1a;2、共享内存不可见性问题 三、创建线程四、Thread类详解五、其他方法六、实例 一、进程与线程 进程&#xff1a;是代码在数据集合上的一次运行活动&#xff0c;是系统进行资源分配和调度的基本单位。 线程&…

面试官:多线程问题你一问三不知,还要我怎么“放水”?

面试官:问你几个多线程相关的问题吧,说一下导致线程死锁的原因,怎么解除线程死锁? 程序员阿里:这个...死锁... (一分钟后) 面试官:不知道?那好,说一下Lock 和 Synchronized 的区别? 程序员阿里:Lock是锁... 面试官:...会就会,不会就说不会,节省一下时间,s…

多线程(并发执行)

一、概念区分 1、并行与并发 并行 ​ 当系统有一个以上CPU时&#xff0c;同一时刻&#xff0c;当一个CPU在执行一个任务时&#xff0c;另一个CPU在执行另一个任务&#xff0c;两个任务互不抢占CPU资源&#xff0c;可以同时进行&#xff08;多核CPU&#xff0c;一个CPU执行一…

2.多线程并发

1.说说你知道的创建线程的方式 1、继承Thread类&#xff0c;重写run方法。2、实现Runnable接口&#xff0c;重写run方法。3、实现Callable接口&#xff0c;重写call方法。4、通过线程池创建线程。 https://blog.csdn.net/u013541140/article/details/95225769 CachedThreadPoo…

C++多线程并发(一)--- 线程创建与管理

文章目录 前言一、何为并发1.1 并发与并行1.2 硬件并发与任务切换1.3 多线程并发与多进程并发 二、如何使用并发2.1 为什么使用并发2.2 在C中使用并发和多线程 三、C线程创建3.1 C11新标准多线程支持库3.2 线程创建的简单示例 更多文章&#xff1a; 前言 我们都听说过摩尔定律…

lrzsz

lrzsz是一款程序&#xff0c;在linux中可以代替ftp的上传和下载 安装lrzsz yum install -y lrzsz上传&#xff1a;rz 将文件上传到本地&#xff0c;默认上传到当前目录 该程序支持拖拽上传&#xff0c;如下图所示 上传成功后查看本地 下载&#xff1a;sz filename 例如&am…

linux之lrzsz

1、lrzsz介绍 我们利用lrzsz进行windows和linux间的文件上传下载 2、安装 在ubuntu系统下 sudo apt-get install lrzsz 在centos系统下 yum install lrzsz 3、使用 1)、windows上传文件到linux环境,使用如下命令 rz

文件传输工具rzsz

mac安装rz sz? 之前在item2上使用使用rz和sz时就直接夯住 发现是需要配置下 mac使用rz&#xff0c;sz进行文件传输(默认使用的终端为iterm2) 一、安装lrzsz brew install lrzsz 二、下载iterm2-zmodem 执行 git clone https://github.com/aikuyun/iterm2-zmodem.git cd …

【lrzsz】Linux上lrzsz的安装和使用

一、lrzsz简介 rz&#xff0c;sz是Linux/Unix同Windows进行ZModem文件传输的命令行工具。 rz 可以很方便的从客户端传文件到服务器&#xff0c;sz也可以很方便的从服务器传文件到客户端&#xff0c;就算中间隔着跳板机也不影响。 rz(receive Zmodem) sz(send Zmodem) 远程文…

Linux lrzsz 详解

和 FileZilla 功能类似用于上传文件&#xff0c;上传速度比较慢适用于比较小的文件 安装指令 $ sudo yum install lrzsz 使用方式 $ rz 注&#xff1a;1> rz 指令在那个目录就在在那个目录上传文件 2> 文件要上的目录一定要有权限&#xff0c;否则上传失败

linux rz命令安装

新搞的云服务器用SecureCRT不支持上传和下载&#xff0c;没有找到rz命令。记录一下如何安装rz/sz命令的方法。 一、工具说明 在SecureCRT这样的ssh登录软件里, 通过在Linux界面里输入rz/sz命令来上传/下载文件. 对于某些linux版本, rz/sz默认没有安装所以需要手工安装。 sz: 将…

rz sz

linux上很方便的上传下载文件工具rz和sz (本文适合linux入门的朋友) ######################################################### #《老男孩linux就业培训中心-初级班第七期第一节内容总结。 #linux上很方便的上传下载文件工具rz和sz #date:2011-06-15 #作者&#xff1a;老男…

Linux的rz命令

linux服务器rz命令上传文件 2016年09月10日 19:56:02 阅读数&#xff1a;11712 1、首先&#xff0c;要是服务器不支持rz命令的话&#xff0c;需要安装执行 yum -y install lrzsz 2、再输入rz -be命令&#xff0c;选择需要上传的本地文件

llrzsz

文章目录 官网解压配置编译安装复制到开发板并执行使用lrz从PC传开发板使用lsz从开发板传PC参考链接 今天给大家推荐一个很好用的工具。 你是不是感觉在没有网络时往linux中下载程序很麻烦&#xff08;SD卡插拔&#xff09;&#xff0c;今天就教大家怎么通过串口和linux文件系统…