文章目录
- `multiprocessing`模块
- 基本操作:进程和锁
- 关于实现和用法的规则
- *IPC*工具:管道、共享内存和队列
- 管道
- 共享内存和全局对象
- 队列和子类
- 启动独立程序
- 其他更多
multiprocessing
模块
Python标准库中的multiprocessing
模块允许脚本通过与threading
模块非常类似的API来派生进程,且在Unix和Windows下皆可工作,因为它支持一个基本上与平台无关的进程派生模型。
基本操作:进程和锁
multiprocessing
模块的Process
类被设计为模拟threading.Thread
类,它允许启动一个与调用者脚本并行运行函数的进程。
示例:multi_1.py
#!/usr/bin/env python
"multiprocessing模块基本操作"from multiprocessing import Process, Lock
import osdef who_am_i(label_str, lock):"打印信息"msg_str = '{}: name={} process={}'.format(label_str, __name__, os.getpid())with lock:print(msg_str)def main():lock = Lock()who_am_i('function call', lock)process = Process(target=who_am_i, args=('spawned child', lock))process.start()process.join()for i_id_int in range(5):Process(target=who_am_i, args=('run process {}'.format(i_id_int), lock)).start()with lock:print('Main process exit')if __name__ == '__main__':main()
输出:multi_1.py
function call: name=__main__ process=10416
spawned child: name=__main__ process=10417
run process 0: name=__main__ process=10418
run process 1: name=__main__ process=10419
run process 2: name=__main__ process=10420
Main process exit
run process 3: name=__main__ process=10421
run process 4: name=__main__ process=10422
- 注意一下,这个脚本最后派生出的5个进程其中部分比其父进程运行的时间更长。
Process.start
:该方法在一个新进程中调用其run
方法Process.run
:该方法仅仅调用传入的目标函数Process.join
:方法等待派生进程退出- 我们可以如示例一样传入
target
,也可以定义子类的run
方法。
关于实现和用法的规则
multiprocessing
模块针对不同的平台有不同的可移植方案:
- 在UNIX下,它分支一个新的子进程并在其中调用
Process
对象的run
方法。 - 在Windows下,它通过Windows下特有的进程创建工具来派生一个新的解释器,通过管道向新进程中传入
pickle
后的Process
对象,并在新进程中运行python -c
命令行,后者运行这个包里一个特殊的Python编码的函数来读取和unpickle
这个Process
对象并调用其run
方法。
multiprocessing
模块的基本构架仍然可以对您能够使用它的方法产生微妙的影响:
- 在Windows下,主进程的逻辑业务通常嵌套在一个
__name__==__main__
的测试中。 - 当Windows下子进程访问全局对象的时候,后者的值可能与其在父进程的起始时间不同。
- 在Windows下,
Process
接受的所有参数必须能接受pickle
操作,因此target
不能是绑定或者非绑定对象的方法,也不能是lambda
语句创建的函数,还不能是带有系统状态的对象,如连接上的套接字。 - 在Windows下,定制的
Process
子类中的所有东西也必须是可被pickle
的。 - 这个包中的IPC对象,如
Pipe
和Queue
也只接受可pickle
的对象。 - 在UNIX下,最好将对象作为参数传入子进程的构造器,这样对Windows来说有更好的可移植性,如果这种对象是父进程收集的垃圾的话,还能避免某些潜在问题。
IPC工具:管道、共享内存和队列
multiprocessing
模块为它派生的进程提供了可跨平台移植的消息传递工具:
- 模块的
Pipe
对象提供了一个可连接两个进程的双向匿名管道,返回两个Connection
对象,代表管道的两端,发送和接受任何可被pickle
操作的Python
对象。目前在UNIX下,它们在内部有一对连接上的套接字或我们之前看到的os.pipe
调用得以实现,而在Windows下由平台特异的具名管道实现。 - 模块的
Value
和Array
对象实现共享的进程、线程安全的内存以用于进程间通信。这些调用返回基于ctypes
模块并在共享内存中创建的标量和数组对象,默认带有访问同步化设置。 - 模块的
Queue
对象可用作Python对象一个先进先出(FIFO)的列表,其中允许多个生产者和消费者。从本质上来说,队列是一个管道加上用于协调随机访问的锁机制,并继承了Pipe
可进行pickle
操作的限制。
管道
示例:multi_2.py
#!/usr/bin/env python
"测试multiprocessing.Pipe"from multiprocessing import Pipe, Processdef sender(pipe):"通过管道向父进程发送对象"pipe.send(['spam'] + [42, 'eggs'])pipe.close()def talker(pipe):"通过管道向父进程发送和接受对象"pipe.send(dict(name='Bob', spam=42))print('talker got:', pipe.recv())def main():parent_pipe, child_pipe = Pipe()Process(target=sender, args=(child_pipe,)).start()print('parent got:', parent_pipe.recv())parent_pipe.close() # 或者在程序运行完后自动关闭parent_pipe, child_pipe = Pipe()child_process = Process(target=talker, args=(child_pipe,))child_process.start()print('parent got:', parent_pipe.recv())parent_pipe.send([x_str * 2 for x_str in 'spam'])child_process.join()print('Parent exit')if __name__ == '__main__':main()
输出:multi_2.py
parent got: ['spam', 42, 'eggs']
parent got: {'name': 'Bob', 'spam': 42}
talker got: ['ss', 'pp', 'aa', 'mm']
Parent exit
共享内存和全局对象
示例:multi_3.py
#!/usr/bin/env python
"测试multiprocessing的共享内存"import os
from multiprocessing import Process, Value, ArrayNUMS_PROC_INT = 3 # 每个进程各自的全局对象,并非共享
COUNT_INT = 0def show_data(label_str, value, array):"打印数据"msg = '{}: pid={} global={} value={} array={}'.format(label_str, os.getpid(), COUNT_INT, value.value, list(array))print(msg)def updater(value, array):"更新数据"global COUNT_INTCOUNT_INT += 1 # 全局计数器,非共享value.value += 1 # 传入的对象是共享的for i_index_int in range(NUMS_PROC_INT):array[i_index_int] += 1def main():global COUNT_INTvalue = Value('i', 0) # 共享内存是进程、线程安全的array = Array('d', NUMS_PROC_INT) # ctypes中的类型代码:就像int和doubleshow_data('parent start', value, array) # 在父进程中显示初始值# 派生子进程,传入共享内存process = Process(target=show_data, args=('child', value, array))process.start()process.join()# 传入父进程中更新过的共享内存,等待每次进程结束# 每个子进程看到了父进程到现在为止对args的更新(但全局变量看不到)print('\nloop 1 (updates in parent, serial children)...')for i_int in range(NUMS_PROC_INT):COUNT_INT += 1value.value += 1array[i_int] += 1process = Process(target=show_data, args=('process {}'.format(i_int), value, array))process.start()process.join()# 同上,不过允许所有子进程并行运行# 所有进程都看到了最近一次迭代的结果,因为它们都共享这个对象print('\nloop 2 (updates in parent, parallel children)...')listprocess = []for i_int in range(NUMS_PROC_INT):COUNT_INT += 1value.value += 1array[i_int] += 1process = Process(target=show_data, args=('process {}'.format(i_int), value, array))process.start()listprocess.append(process)for process in listprocess:process.join()# 共享内存在派生子进程中进行更新,等待每个更新结束print('\nloop 3 (updates in serial children)...')for i_int in range(NUMS_PROC_INT):process = Process(target=updater, args=(value, array))process.start()process.join()show_data('parent temp', value, array) # 仅在父进程中全局变量COUNT_INT=6# 同上,不过允许子进程并行的更新print('\nloop 4 (updates in parallel children)...')listprocess = []for i_int in range(NUMS_PROC_INT):process = Process(target=updater, args=(value, array))process.start()listprocess.append(process)for process in listprocess:process.join()show_data('parent temp', value, array) # 在此打印最终结果# value=12:父进程+6,6个子进程+6# array=[8.0, 8.0, 8.0]:父进程+2,6个子进程+6if __name__ == '__main__':main()
输出:multi_3.py
parent start: pid=26201 global=0 value=0 array=[0.0, 0.0, 0.0]
child: pid=26202 global=0 value=0 array=[0.0, 0.0, 0.0]loop 1 (updates in parent, serial children)...
process 0: pid=26203 global=1 value=1 array=[1.0, 0.0, 0.0]
process 1: pid=26204 global=2 value=2 array=[1.0, 1.0, 0.0]
process 2: pid=26205 global=3 value=3 array=[1.0, 1.0, 1.0]loop 2 (updates in parent, parallel children)...
process 0: pid=26206 global=4 value=6 array=[2.0, 2.0, 2.0]
process 1: pid=26207 global=5 value=6 array=[2.0, 2.0, 2.0]
process 2: pid=26208 global=6 value=6 array=[2.0, 2.0, 2.0]loop 3 (updates in serial children)...
parent temp: pid=26201 global=6 value=9 array=[5.0, 5.0, 5.0]loop 4 (updates in parallel children)...
parent temp: pid=26201 global=6 value=12 array=[8.0, 8.0, 8.0]
- 最后loop 4测试代表了共享内存最常用的的用例——在数个平行进程间分配计算工作,最后在父进程中统计结果
队列和子类
multiprocessing
模块还拥有以下特性:
- 允许模块的
Process
类创建子类,并提供架构和状态保留(很像threading.Thread
,不过是用于进程的)。 - 提供进程安全的
Queue
对象,可以再任意数量的进程间共享,满足更广泛的通信需求(很像queue.Queue
,不过是用于进程的)。
示例:multi_4.py
#!/usr/bin/env python
"multiprocessing模块的子类和队列"import os
import time
import queue
from multiprocessing import Process, Lock, Queueclass CounterProcess(Process):"Process的子类"label_str = '\t@'def __init__(self, start_int, queue, stdout_lock): # 为运行中的用处保留状态self.state_int = start_intself.post_queue = queueself.stdout_lock = stdout_lockProcess.__init__(self)def run(self):for i_int in range(3):time.sleep(1)self.state_int += 1with self.stdout_lock:print(self.label_str, self.pid, self.state_int) # self.pid为进程号self.post_queue.put([self.pid, self.state_int])with self.stdout_lock:print(self.label_str, self.pid, '-')def main():print('start', os.getpid())expected_int = 9post_queue = Queue()stdout_lock = Lock()a_counterprocess = CounterProcess(0, post_queue, stdout_lock)b_counterprocess = CounterProcess(100, post_queue, stdout_lock)c_counterprocess = CounterProcess(10000, post_queue, stdout_lock)a_counterprocess.start()b_counterprocess.start()c_counterprocess.start()while expected_int:time.sleep(0.5)try:data_listint = post_queue.get(block=False)except queue.Empty:with stdout_lock:print('no data...')else:with stdout_lock:print('posted:', data_listint)expected_int -= 1a_counterprocess.join()b_counterprocess.join()c_counterprocess.join()print('finish', os.getpid(), c_counterprocess.exitcode)if __name__ == '__main__':main()
输出:multi_4.py
start 33926
no data...@ 33927 1
posted: [33927, 1]@ 33928 101@ 33929 10001
posted: [33928, 101]@ 33927 2
posted: [33929, 10001]@ 33928 102@ 33929 10002
posted: [33927, 2]@ 33927 3@ 33927 -
posted: [33928, 102]@ 33928 103@ 33928 -@ 33929 10003@ 33929 -
posted: [33929, 10002]
posted: [33927, 3]
posted: [33928, 103]
posted: [33929, 10003]
finish 33926 0
- 生产者调用
time.sleep
是为了模拟长时运行任务。 - 所有4个进程共享一个输出流。
- 子进程结束后由其
exitcode
属性提供退出状态。
启动独立程序
在派生子进程中可以使用我们之前见过的os.exec
调用等工具来启动一个真正的独立程序。
示例:multi_5.py
#!/usr/bin/env python
"使用multiprocessing和os.exec起始一个新程序"import os
from multiprocessing import Processdef run_program(id_int):os.execlp('python', 'python', 'child.py', str(id_int))def main():for i_id_int in range(5):Process(target=run_program, args=(i_id_int,)).start()print('parent exit')if __name__ == '__main__':main()
输出:multi_5.py
parent exit
Hello from child! 39635 1
Hello from child! 39636 2
Hello from child! 39634 0
Hello from child! 39637 3
Hello from child! 39638 4
其他更多
示例:multi_6.py
#!/usr/bin/env python
"multiprocessing.Pool类"import os
from multiprocessing import Pooldef powers(x):'返回2的x次方'# print('\n\t@' + str(os.getpid()), x) # 能够监视子进程return 2 ** xdef main():workers_pool = Pool(processes=5)results_list = workers_pool.map(powers, [2] * 100)print(results_list[:16])print(results_list[-2:])results_list = workers_pool.map(powers, list(range(1, 101)))print(results_list[:16])print(results_list[-2:])if __name__ == '__main__':main()
输出:multi_6.py
[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
[4, 4]
[2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]
[633825300114114700748351602688, 1267650600228229401496703205376]
———————————————————————————————————————————
😃 学完博客后,是不是有所启发呢?如果对此还有疑问,欢迎在评论区留言哦。
如果还想了解更多的信息,欢迎大佬们关注我哦,也可以查看我的个人博客网站BeacherHou。