什么!学Python多进程的你还不知道multiprocessing模块?该充电了>_(Python编程 | 系统编程 | 并行系统工具 | multiprocessing模块)

article/2025/7/27 21:42:05

我要学习

文章目录

  • `multiprocessing`模块
    • 基本操作:进程和锁
      • 关于实现和用法的规则
    • *IPC*工具:管道、共享内存和队列
      • 管道
      • 共享内存和全局对象
      • 队列和子类
    • 启动独立程序
    • 其他更多

multiprocessing模块

Python标准库中的multiprocessing模块允许脚本通过与threading模块非常类似的API来派生进程,且在UnixWindows下皆可工作,因为它支持一个基本上与平台无关的进程派生模型。

基本操作:进程和锁

multiprocessing模块的Process类被设计为模拟threading.Thread类,它允许启动一个与调用者脚本并行运行函数的进程。

示例:multi_1.py

#!/usr/bin/env python
"multiprocessing模块基本操作"from multiprocessing import Process, Lock
import osdef who_am_i(label_str, lock):"打印信息"msg_str = '{}: name={} process={}'.format(label_str, __name__, os.getpid())with lock:print(msg_str)def main():lock = Lock()who_am_i('function call', lock)process = Process(target=who_am_i, args=('spawned child', lock))process.start()process.join()for i_id_int in range(5):Process(target=who_am_i, args=('run process {}'.format(i_id_int), lock)).start()with lock:print('Main process exit')if __name__ == '__main__':main()

输出:multi_1.py

function call: name=__main__ process=10416
spawned child: name=__main__ process=10417
run process 0: name=__main__ process=10418
run process 1: name=__main__ process=10419
run process 2: name=__main__ process=10420
Main process exit
run process 3: name=__main__ process=10421
run process 4: name=__main__ process=10422
  • 注意一下,这个脚本最后派生出的5个进程其中部分比其父进程运行的时间更长。
  • Process.start:该方法在一个新进程中调用其run方法
  • Process.run:该方法仅仅调用传入的目标函数
  • Process.join:方法等待派生进程退出
  • 我们可以如示例一样传入target,也可以定义子类的run方法。

关于实现和用法的规则

multiprocessing模块针对不同的平台有不同的可移植方案:

  • UNIX下,它分支一个新的子进程并在其中调用Process对象的run方法。
  • Windows下,它通过Windows下特有的进程创建工具来派生一个新的解释器,通过管道向新进程中传入pickle后的Process对象,并在新进程中运行python -c命令行,后者运行这个包里一个特殊的Python编码的函数来读取和unpickle这个Process对象并调用其run方法。

multiprocessing模块的基本构架仍然可以对您能够使用它的方法产生微妙的影响:

  • Windows下,主进程的逻辑业务通常嵌套在一个__name__==__main__的测试中。
  • Windows下子进程访问全局对象的时候,后者的值可能与其在父进程的起始时间不同。
  • Windows下,Process接受的所有参数必须能接受pickle操作,因此target不能是绑定或者非绑定对象的方法,也不能是lambda语句创建的函数,还不能是带有系统状态的对象,如连接上的套接字。
  • Windows下,定制的Process子类中的所有东西也必须是可被pickle的。
  • 这个包中的IPC对象,如PipeQueue也只接受可pickle的对象。
  • UNIX下,最好将对象作为参数传入子进程的构造器,这样对Windows来说有更好的可移植性,如果这种对象是父进程收集的垃圾的话,还能避免某些潜在问题。

IPC工具:管道、共享内存和队列

multiprocessing模块为它派生的进程提供了可跨平台移植的消息传递工具:

  • 模块的Pipe对象提供了一个可连接两个进程的双向匿名管道,返回两个Connection对象,代表管道的两端,发送和接受任何可被pickle操作的Python对象。目前在UNIX下,它们在内部有一对连接上的套接字或我们之前看到的os.pipe调用得以实现,而在Windows下由平台特异的具名管道实现。
  • 模块的ValueArray对象实现共享的进程、线程安全的内存以用于进程间通信。这些调用返回基于ctypes模块并在共享内存中创建的标量和数组对象,默认带有访问同步化设置。
  • 模块的Queue对象可用作Python对象一个先进先出(FIFO)的列表,其中允许多个生产者和消费者。从本质上来说,队列是一个管道加上用于协调随机访问的锁机制,并继承了Pipe可进行pickle操作的限制。

管道

示例:multi_2.py

#!/usr/bin/env python
"测试multiprocessing.Pipe"from multiprocessing import Pipe, Processdef sender(pipe):"通过管道向父进程发送对象"pipe.send(['spam'] + [42, 'eggs'])pipe.close()def talker(pipe):"通过管道向父进程发送和接受对象"pipe.send(dict(name='Bob', spam=42))print('talker got:', pipe.recv())def main():parent_pipe, child_pipe = Pipe()Process(target=sender, args=(child_pipe,)).start()print('parent got:', parent_pipe.recv())parent_pipe.close()  # 或者在程序运行完后自动关闭parent_pipe, child_pipe = Pipe()child_process = Process(target=talker, args=(child_pipe,))child_process.start()print('parent got:', parent_pipe.recv())parent_pipe.send([x_str * 2 for x_str in 'spam'])child_process.join()print('Parent exit')if __name__ == '__main__':main()

输出:multi_2.py

parent got: ['spam', 42, 'eggs']
parent got: {'name': 'Bob', 'spam': 42}
talker got: ['ss', 'pp', 'aa', 'mm']
Parent exit

共享内存和全局对象

示例:multi_3.py

#!/usr/bin/env python
"测试multiprocessing的共享内存"import os
from multiprocessing import Process, Value, ArrayNUMS_PROC_INT = 3  # 每个进程各自的全局对象,并非共享
COUNT_INT = 0def show_data(label_str, value, array):"打印数据"msg = '{}: pid={} global={} value={} array={}'.format(label_str, os.getpid(), COUNT_INT, value.value, list(array))print(msg)def updater(value, array):"更新数据"global COUNT_INTCOUNT_INT += 1  # 全局计数器,非共享value.value += 1  # 传入的对象是共享的for i_index_int in range(NUMS_PROC_INT):array[i_index_int] += 1def main():global COUNT_INTvalue = Value('i', 0)  # 共享内存是进程、线程安全的array = Array('d', NUMS_PROC_INT)  # ctypes中的类型代码:就像int和doubleshow_data('parent start', value, array)  # 在父进程中显示初始值# 派生子进程,传入共享内存process = Process(target=show_data, args=('child', value, array))process.start()process.join()# 传入父进程中更新过的共享内存,等待每次进程结束# 每个子进程看到了父进程到现在为止对args的更新(但全局变量看不到)print('\nloop 1 (updates in parent, serial children)...')for i_int in range(NUMS_PROC_INT):COUNT_INT += 1value.value += 1array[i_int] += 1process = Process(target=show_data, args=('process {}'.format(i_int), value, array))process.start()process.join()# 同上,不过允许所有子进程并行运行# 所有进程都看到了最近一次迭代的结果,因为它们都共享这个对象print('\nloop 2 (updates in parent, parallel children)...')listprocess = []for i_int in range(NUMS_PROC_INT):COUNT_INT += 1value.value += 1array[i_int] += 1process = Process(target=show_data, args=('process {}'.format(i_int), value, array))process.start()listprocess.append(process)for process in listprocess:process.join()# 共享内存在派生子进程中进行更新,等待每个更新结束print('\nloop 3 (updates in serial children)...')for i_int in range(NUMS_PROC_INT):process = Process(target=updater, args=(value, array))process.start()process.join()show_data('parent temp', value, array)  # 仅在父进程中全局变量COUNT_INT=6# 同上,不过允许子进程并行的更新print('\nloop 4 (updates in parallel children)...')listprocess = []for i_int in range(NUMS_PROC_INT):process = Process(target=updater, args=(value, array))process.start()listprocess.append(process)for process in listprocess:process.join()show_data('parent temp', value, array)  # 在此打印最终结果# value=12:父进程+6,6个子进程+6# array=[8.0, 8.0, 8.0]:父进程+2,6个子进程+6if __name__ == '__main__':main()

输出:multi_3.py

parent start: pid=26201 global=0 value=0 array=[0.0, 0.0, 0.0]
child: pid=26202 global=0 value=0 array=[0.0, 0.0, 0.0]loop 1 (updates in parent, serial children)...
process 0: pid=26203 global=1 value=1 array=[1.0, 0.0, 0.0]
process 1: pid=26204 global=2 value=2 array=[1.0, 1.0, 0.0]
process 2: pid=26205 global=3 value=3 array=[1.0, 1.0, 1.0]loop 2 (updates in parent, parallel children)...
process 0: pid=26206 global=4 value=6 array=[2.0, 2.0, 2.0]
process 1: pid=26207 global=5 value=6 array=[2.0, 2.0, 2.0]
process 2: pid=26208 global=6 value=6 array=[2.0, 2.0, 2.0]loop 3 (updates in serial children)...
parent temp: pid=26201 global=6 value=9 array=[5.0, 5.0, 5.0]loop 4 (updates in parallel children)...
parent temp: pid=26201 global=6 value=12 array=[8.0, 8.0, 8.0]
  • 最后loop 4测试代表了共享内存最常用的的用例——在数个平行进程间分配计算工作,最后在父进程中统计结果

队列和子类

multiprocessing模块还拥有以下特性:

  • 允许模块的Process类创建子类,并提供架构和状态保留(很像threading.Thread,不过是用于进程的)。
  • 提供进程安全的Queue对象,可以再任意数量的进程间共享,满足更广泛的通信需求(很像queue.Queue,不过是用于进程的)。

示例:multi_4.py

#!/usr/bin/env python
"multiprocessing模块的子类和队列"import os
import time
import queue
from multiprocessing import Process, Lock, Queueclass CounterProcess(Process):"Process的子类"label_str = '\t@'def __init__(self, start_int, queue, stdout_lock):  # 为运行中的用处保留状态self.state_int = start_intself.post_queue = queueself.stdout_lock = stdout_lockProcess.__init__(self)def run(self):for i_int in range(3):time.sleep(1)self.state_int += 1with self.stdout_lock:print(self.label_str, self.pid, self.state_int)  # self.pid为进程号self.post_queue.put([self.pid, self.state_int])with self.stdout_lock:print(self.label_str, self.pid, '-')def main():print('start', os.getpid())expected_int = 9post_queue = Queue()stdout_lock = Lock()a_counterprocess = CounterProcess(0, post_queue, stdout_lock)b_counterprocess = CounterProcess(100, post_queue, stdout_lock)c_counterprocess = CounterProcess(10000, post_queue, stdout_lock)a_counterprocess.start()b_counterprocess.start()c_counterprocess.start()while expected_int:time.sleep(0.5)try:data_listint = post_queue.get(block=False)except queue.Empty:with stdout_lock:print('no data...')else:with stdout_lock:print('posted:', data_listint)expected_int -= 1a_counterprocess.join()b_counterprocess.join()c_counterprocess.join()print('finish', os.getpid(), c_counterprocess.exitcode)if __name__ == '__main__':main()

输出:multi_4.py

start 33926
no data...@ 33927 1
posted: [33927, 1]@ 33928 101@ 33929 10001
posted: [33928, 101]@ 33927 2
posted: [33929, 10001]@ 33928 102@ 33929 10002
posted: [33927, 2]@ 33927 3@ 33927 -
posted: [33928, 102]@ 33928 103@ 33928 -@ 33929 10003@ 33929 -
posted: [33929, 10002]
posted: [33927, 3]
posted: [33928, 103]
posted: [33929, 10003]
finish 33926 0
  • 生产者调用time.sleep是为了模拟长时运行任务。
  • 所有4个进程共享一个输出流。
  • 子进程结束后由其exitcode属性提供退出状态。

启动独立程序

在派生子进程中可以使用我们之前见过的os.exec调用等工具来启动一个真正的独立程序。

示例:multi_5.py

#!/usr/bin/env python
"使用multiprocessing和os.exec起始一个新程序"import os
from multiprocessing import Processdef run_program(id_int):os.execlp('python', 'python', 'child.py', str(id_int))def main():for i_id_int in range(5):Process(target=run_program, args=(i_id_int,)).start()print('parent exit')if __name__ == '__main__':main()

输出:multi_5.py

parent exit
Hello from child! 39635 1
Hello from child! 39636 2
Hello from child! 39634 0
Hello from child! 39637 3
Hello from child! 39638 4

其他更多

示例:multi_6.py

#!/usr/bin/env python
"multiprocessing.Pool类"import os
from multiprocessing import Pooldef powers(x):'返回2的x次方'# print('\n\t@' + str(os.getpid()), x)  # 能够监视子进程return 2 ** xdef main():workers_pool = Pool(processes=5)results_list = workers_pool.map(powers, [2] * 100)print(results_list[:16])print(results_list[-2:])results_list = workers_pool.map(powers, list(range(1, 101)))print(results_list[:16])print(results_list[-2:])if __name__ == '__main__':main()

输出:multi_6.py

[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
[4, 4]
[2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]
[633825300114114700748351602688, 1267650600228229401496703205376]

———————————————————————————————————————————

😃 学完博客后,是不是有所启发呢?如果对此还有疑问,欢迎在评论区留言哦。
如果还想了解更多的信息,欢迎大佬们关注我哦,也可以查看我的个人博客网站BeacherHou。


http://chatgpt.dhexx.cn/article/b4Ii8TQa.shtml

相关文章

一文入门Python基础

Python基础 python中的输出函数 print()函数 可以输出的内容 数字字符串含有运算符的表达式(会返回表达式计算的结果) 内容输出的目的地 显示器文件 # 将数据输入文件中,注意点——所指的盘必须存在——使用filefp fp open(路径,模式) print(hello,file fp) fp.…

深入理解Linux进程管理(1.0)

学习方法论 写作原则 标题括号中的数字代表完成度与完善度 0.0-1.0 代表完成度,1.1-1.5 代表完善度 0.0 :还没开始写 0.1 :写了一个简介 0.3 :写了一小部分内容 0.5 :写了一半内容 0.9 :还有个别内容没写 1…

一、Linux系统编程:进程基础

1 进程基础 1.1 概念 定义 程序在计算机上的一次执行过程,执行中的程序。本质 1、程序在地址空间中按照代码逻辑控制流执行 2、资源分配最小单位 进程是一个抽象概念 1.2 进程与程序 区别 进程程序动态静态有生命周期指令集合只能对应一个程序可以对应多个进程…

YARN源码解析之NodeManager中的ContainerExecutor

在NodeManager中,有三种运行Container的方式,它们分别是: DefaultContainerExecutorLinuxContainerExecutorDockerContainerExecutor 从它们的名字中,我们就能看得出来,默认情况下,一定使用的是DefaultContainerExec…

linux进程状态怎么手动切换,二十六、Linux 进程与信号---system 函数 和进程状态切换...

26.1 system 函数 26.1.1 函数说明 system(执行shell 命令) 相关函数 fork,execve,waitpid,popen #include int system(const char * string); 函数功能:简化 exec 函数 函数说明 system()会调用 fork() 产生子进程,由…

进程控制(详解)

进程控制 上篇文章介绍了进程的相关概念,形如进程的内核数据结构task_struct 、进程是如何被操作系统管理的、进程的查看、进程标识符、进程状态、进程优先级、已经环境变量和进程地址空间等知识点; 本篇文章接着上篇文章继续对进程的控制进行展开&#…

c++跨平台技术学习(三)--使用标准API

Posix.1 API定义了大量的函数,在各方面的功能都很丰富,下面对其进行介绍 System V接口定义 它是一份描述了AT&T UNIX System V操作系统的文档,与POSIX.1保持一致,是它的一个超集。SVID由基础系统和扩展定义,它只…

BUUCTF刷题记录 Ping Ping Ping

[GXYCTF2019]Ping Ping Ping 进入页面 然后/?ip127.0.0.1|ls 进入 读取flag.php 再cat$IFS$1indnx.php 再变量拼接 ?ip127.0.0.1;ag;cat$IFS 1 f l a 1fla 1flaa.php 转至 http://1d22a0a5-c6a0-43f1-8e52-e5a33ec7e044.node3.buuoj.cn/?ip127.0.0.1;ag;cat$IFS 1 f l a …

JVM 答疑解惑

JVM是什么? 平常接触的东西都存在哪里? 类如何加载? 怎么运行? 清洁工怎么工作? JVM是什么? Java 虚拟机屏蔽了与具体操作系统平台相关的信息,使得 Java 语言编译程序只需生成在 Java 虚拟机上运行的目…

2021-CISCN西南赛区线下-misc-stealer

stealer 题目描述: stealer 那女孩对我说说我是一个小偷(本题flag 格式为 DASCTF{},提交时只需要提交括号中间的字符。flag需小写) hint:MISC-stealer: focus on DNS CRYPTO 知识点 1.DNS后面的base64 2.然后base6…

CVPR 2019|PoolNet:基于池化技术的显著性检测 论文解读

作者 | 文永亮 研究方向 | 目标检测、GAN 研究动机 ​ 这是一篇发表于CVPR2019的关于显著性目标检测的paper,在U型结构的特征网络中,高层富含语义特征捕获的位置信息在自底向上的传播过程中可能会逐渐被稀释,另外卷积神经网络的感受野大小与深…

请问做亚马逊,注册P卡是用个人名义还是公司名义?

请问做亚马逊,注册P卡是用个人名义还是公司名义?亚马逊将Payoneer作为亚马逊卖家平台里的推荐收款方式。24个不同国家的卖家能够使用亚马逊卖家中心唯一推荐的收款方式——Payoneer来收款,接收、使用亚马逊货款变得前所未有地简便。 个人、公…

解决CentOS7 Ping不了外网的问题 ping:baidu.com: 未知的名称或服务

在CentOS7中遇到个问题,ping外网地址时候提示“未知的名称或服务” 先使用该命令查看路由网关信息 route -n 发现没有配置网关地址: ​​​​​​​ 去虚拟机里的虚拟网络编辑器里查看下网关地址 然后将网关地址配置到路由中 (临时有效&…

BUUCTF Web [GXYCTF2019]Ping Ping Ping

「作者主页」:士别三日wyx 此文章已录入专栏《网络攻防》,持续更新热门靶场的通关教程 「未知攻,焉知收」,在一个个孤独的夜晚,你完成了几百个攻防实验,回过头来才发现,已经击败了百分之九十九…

[GXYCTF 2019]Ping Ping Ping

前言 之前没总结过关于命令执行的绕过姿势,借着今天做的这个命令执行的题目来总结一下。 先看题目 题目 题目很单一,目的就是为了让我们通过参数传入内容来执行代码。因为题目是与ping有关,当我们输入127.0.0.1时,它会进行ping…

[GXYCTF2019]Ping Ping Ping(命令执行)

命令执行绕过 常见写法 127.0.0.1&&code 只有在 && 左边的命令返回真(命令返回值 $? 0),&& 右边的命令才 会被执行。 127.0.0.1&code &表示将任务置于后台执行 127.0.0.1||code 只有在 || 左边的命令返回…

CTF_Web_[GXYCTF2019]Ping Ping Ping

一、题目 Ping Ping Ping 二、靶机信息链接 靶机信息 剩余时间: 10072s http://70284b15-7c4e-4548-8b04-aadbc6e669f5.node4.buuoj.cn:81 三、靶机链接页面 四、 分析 因题目是Ping...再加上靶机页面有“/?ip” 可能是 ping地址,尝试按照所给的内容在url中加…

BUUCTF [GXYCTF2019]Ping Ping Ping easywill

题目地址:BUUCTF在线评测 考点:ping命令相关命令执行 这里过滤了flag和空格 绕过空格可以使用 $IFS$1 使用ls命令查询目录 ?ip127.0.0.1;ls 发现有两个文件,一个是flag.php,另一个是index.php。 cat获取文件内容&#xff…

BUUctf [GXYCTF2019]Ping Ping Ping

根据题目和页面的提示猜测是命令执行漏洞 ;前面和后面命令都要执行,无论前面真假 |直接执行后面的语句 ||如果前面命令是错的那么就执行后面的语句,否则只执行前面的语句 &前面和后面命令都要执行,无论前面真假 &&如果前面为假&a…

BUUCTF——web([GXYCTF2019]Ping Ping Ping、[极客大挑战 2019]Knife、[极客大挑战 2019]Http)

BUUCTF-web [GXYCTF2019]Ping Ping Ping做题思路 [极客大挑战 2019]Knife做题思路 [极客大挑战 2019]Http做题思路 [GXYCTF2019]Ping Ping Ping 做题思路 打开看题目 熟悉的样子,ping本地加查看命令,得到两个php文件 接着查看一下文件内容 奇奇怪…