Python 中 concurrent.futures 模块使用说明

article/2025/8/23 13:25:28

Python 中 concurrent.futures 模块使用说明

转载请注明出处:https://blog.csdn.net/jpch89/article/details/87643972


文章目录

  • Python 中 concurrent.futures 模块使用说明
    • 0. 参考资料
    • 1. 概述
    • 2. Executor Object 执行器对象
    • 3. ThreadPoolExecutor 线程池执行器
    • 4. ThreadPoolExecutor 例子
    • 5. ProcessPoolExecutor 进程池执行器
    • 6. ProcessPoolExecutor 例子
    • 7. Future 对象
    • 8. 模块函数
    • 9. 待完善


0. 参考资料

  • concurrent.futures — Launching parallel tasks
  • concurrent.futures — Manage Pools of Concurrent Tasks

1. 概述

concurrent.futures3.2 中引入的新模块,它为异步执行可调用对象提供了高层接口。
可以使用 ThreadPoolExecutor 来进行多线程编程,ProcessPoolExecutor 进行多进程编程,两者实现了同样的接口,这些接口由抽象类 Executor 定义。
这个模块提供了两大类型,一个是执行器类 Executor,另一个是 Future 类。
执行器用来管理工作池,future 用来管理工作计算出来的结果,通常不用直接操作 future 对象,因为有丰富的 API


2. Executor Object 执行器对象

concurrent.futures.Executor
这个抽象类提供了一系列方法,可以用于异步执行调用。
它不能直接使用,只能通过子类化出来的具体类来使用。

它定义的方法有:
submit(fn, *args, **kwargs)
安排可调用对象 fnfn(*args, **kwargs) 的形式执行,并返回 Future 对象来表示它的执行。

with ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(pow, 323, 1235)print(future.result())

map(func, *iterables, timeout=None, chunksize=1)
类似内置函数 map(func, *iterables),但是有两点不同:

  1. 立即获取 iterables 而不会惰性获取;
  2. 异步执行 func,并支持多次并发调用。

它返回一个迭代器。
从调用 Executor.map() 开始的 timeout 秒之后,如果在迭代器上调用了 __next__() 并且无可用结果的话,迭代器会抛出 concurrent.futures.TimeoutError 异常。
timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间。

如果 func 调用抛出了异常,那么该异常会在从迭代器获取值的时候抛出。

当使用 ProcessPoolExecutor 的时候,这个方法会把 iterables 划分成多个块,作为独立的任务提交到进程池。这些块的近似大小可以通过给 chunksize 指定一个正整数。对于很长的 iterables,使用较大的 chunksize 而不是采用默认值 1,可以显著提高性能。对于 ThreadPoolExecutorchunksize 不起作用。

chunksize3.5 加入的新参数。

注意:不管并发任务的执行次序如何,map 总是基于输入顺序来返回值。map 返回的迭代器,在主程序迭代的时候,会等待每一项的响应。

shutdown(wait=True)
告诉执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源。
shutdown 之后再调用 Executor.submit()Executor.map() 会报运行时错误 RuntimeError
如果 waitTrue,那么这个方法会在所有等待的 future 都执行完毕,并且属于执行器 executor 的资源都释放完之后才会返回。
如果 waitFalse,本方法会立即返回。属于执行器的资源会在所有等待的 future 执行完毕之后释放。
不管 wait 取值如何,整个 Python 程序在等待的 future 执行完毕之前不会退出。
你可以通过 with 语句来避免显式调用本方法。with 语句会用 wait=True 的默认参数调用 Executor.shutdown() 方法。

import shutil
with ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, 'src1.txt', 'dest1.txt')e.submit(shutil.copy, 'src2.txt', 'dest2.txt')e.submit(shutil.copy, 'src3.txt', 'dest3.txt')e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

执行器类 Executor 实现了上下文协议,可以用做上下文管理器。它能并发执行任务,等待它们全部完成。当上下文管理器退出时,自动调用 shutdown() 方法。


3. ThreadPoolExecutor 线程池执行器

ThreadPoolExecutor 线程池执行器是 Executor 执行器的子类,通过线程池来执行异步调用。它管理一组工作线程,当工作线程有富余的时候,给它们传递任务。
当属于一个 Future 对象的可调用对象等待另一个 Future 的返回时,会发生死锁 deadlock
举个例子:

import time
def wait_on_b():time.sleep(5)print(b.result())  # b will never complete because it is waiting on a.return 5def wait_on_a():time.sleep(5)print(a.result())  # a will never complete because it is waiting on b.return 6executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

再举一例:

def wait_on_future():f = executor.submit(pow, 5, 2)# This will never complete because there is only one worker thread and# it is executing this function.print(f.result())executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
这个 Executor 子类最多用 max_workers 个线程来异步执行调用。

initializer 是一个可选的可调用对象,会在每个 worker 线程启动之前调用。
initargs 是传递给 initializer 的参数元组。
如果 initializer 抛出了异常,那么当前所有等待的任务都会抛出 BrokenThreadPool 异常,继续提交 submit 任务也会抛出此异常。

3.5 的变化:如果 max_worker 没有指定或者为 None,则默认为本机处理器数量乘以 5

3.6 新特性:添加了 thread_name_prefix 参数,可以控制由线程池创建的工作线程名称,便于调试。

3.7 的变化:添加了 initializerinitargs 参数。


4. ThreadPoolExecutor 例子

import concurrent.futures
import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']# Retrieve a single page and report the URL and contents
def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# Start the load operations and mark each future with its URLfuture_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))

5. ProcessPoolExecutor 进程池执行器

ProcessPoolExecutor 进程池执行器类是 Executor 执行器类的子类,使用进程池来异步执行调用。
ProcessPoolExecutor 使用了 multiprocessing 模块,这允许它可以规避 Global Interpreter Lock,但是也意味着只能执行和返回可序列化的(picklable)对象。

__main__ 模块必须被 worker 子进程导入,这意味着 ProcessPoolExecutor 在交互解释器中无法工作。

在已经被提交到 ProcessPoolExecutor 中的可调用对象内使用 Executor 或者 Future 方法会导致死锁。

concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
这个 Executor 子类最多用 max_workers 个进程来异步执行调用。
如果不指定 max_workers 或者为 None,它默认为本机的处理器数量。
如果 max_workers 小于等于 0,会抛出 ValueError 异常。
mp_context 是多进程上下文(multiprocessing context)或者 None,它会被用来启动 workers。如果不指定 mp_context 或者为 None,会使用默认的多进程上下文环境。

initializer 是一个可选的可调用对象,会在每个 worker 进程启动之前调用。
initargs 是传递给 initializer 的参数元组。
如果 initializer 抛出了异常,那么当前所有等待的任务都会抛出 BrokenProcessPool 异常,继续提交 submit 任务也会抛出此异常。

3.3 版本的变化:任意一个工作进程突然中止时,会抛出 BrokenProcessPool 异常。之前版本中,行为是未定义的,而且对于执行器或者它的 future 对象的操作通常会无响应或者死锁。

3.7 版本的变化:加入了 mp_context 参数,允许用户控制由进程池创建的工作进程的 start_method 方法。该版本还加入了 initializerinitargs 参数。


6. ProcessPoolExecutor 例子

import concurrent.futures
import mathPRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]def is_prime(n):if n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))if __name__ == '__main__':main()

7. Future 对象

Future 类封装了可调用对象的异步执行。
Future 实例通过 Executor.submit() 创建。

concurrent.futures.Future
封装了可调用对象的异步执行。
Future 实例通过 Executor.submit() 创建,除非用于测试,不应该直接手动创建。

  • cancel() 尝试取消调用,如果该调用正在执行中,无法取消,本方法返回 False,其他情况下调用会被取消,并返回 True

理解:当一个 Future 实例已经被提交但是还没开始执行时,可以通过调用该实例的 cancel() 方法取消。

  • cancelled() 如果调用已经被成功取消,返回 True
  • running() 如果调用正在执行,无法被取消,则返回 True
  • done() 如果调用成功被取消或者已经执行完毕,返回 True
  • result(timeout=None) 返回调用的返回值。如果调用还没有完成,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutErrortimeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。如果 future 在完成之前被取消了,会抛出 CancelledError 异常。
    如果调用抛出异常,这个方法会抛出同样的异常。

理解result() 阻塞直到任务完成,或者被取消。如果需要按照顺序访问结果,使用执行器的 map 方法,如果不需要按照顺序访问结果,可以使用模块函数 as_completed()

  • exception(timeout=None)
    返回被调用抛出的异常。如果调用还没有执行完毕,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutErrortimeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。
    如果 future 在完成之前被取消了,会抛出 CancelledError 异常。
    如果调用完成并且没有抛出异常,返回 None
  • add_done_callback(fn)
    future 附加可调用对象 fn。当 future 运行完毕或者被取消时,它会被用作 fn 的唯一参数,并调用 fn
    可调用对象按照添加顺序依次调用,并且总是在添加时所处进程的一个线程内调用它。如果该可调用对象抛出了属于 Exception 子类的异常,它会被记录并忽略。如果它抛出了属于 BaseException 子类的异常,该行为未定义。
    如果 future 已经完成或者已经取消,fn 会被立即调用。

理解:这个方法不用显式地等待结果返回,可以事先指定当 Future 完成之后应当执行的调用。fn 是一个单参数的可调用对象,使用传入的 Future 对象之前最好先检查它的状态,因为不管是正常结束、抛出异常还是被取消,Future 对象都会被认为执行完毕。


8. 模块函数

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待 Future 实例完成,这些实例可能由多个不同的执行器实例创建,通过 fs 指定这些 Future 实例。返回具名元组,该元组有两个元素,每个元素都是一个集合。第一个元素名叫 done,该集合包括已完成的 futures;第二个元素名叫 not_done,该集合包括未完成的 futures
timeout 用来控制返回之前等待的最大秒数,可以是整数或者浮点数。如果不指定或为 None,不限制等待时间。
return_when 指明函数何时应该返回。它必须是下列常量之一:

  • FIRST_COMPLETED:函数在任意一个 future 完成或者被取消时返回。
  • FIRST_EXCEPTION:函数在任意一个 future 因为异常而结束时返回。如果没有 future 抛出异常,它等价于 ALL_COMPLETED
  • ALL_COMPLETED:当所有 future 完成或者被取消时函数才会返回。

concurrent.futures.as_completed(fs, timeout=None)
当通过 fs 指定的 Future 实例全部执行完毕或者被取消后,返回这些 Future 实例组成的迭代器。fs 中的 Future 实例可以被不同的执行器创建。任何在 as_completed() 调用之前就已经完成的 Future 实例会被最先生成。

查看源码发现,实际上这是一个用到了 yield from 的生成器函数,所以调用返回一个生成器。

如果从 as_completed() 调用开始,经过 timeout 秒之后,对返回的迭代器调用 __next__() 时结果仍不可用,则会抛出 concurrent.futures.TimeoutError 异常。timeout 可以是整数或者浮点数,如果 timeout 没有指定或者为 None,则不限制等待时间。


9. 待完善

  • Future 内部方法:用于单元测试和实现自定义执行器的方法。
    • set_running_or_notify_cancel()
    • set_result(result)
    • set_exception(exception)
  • 相关异常类

完成于 2019.02.18

公众号


http://chatgpt.dhexx.cn/article/WcGof8PW.shtml

相关文章

【ruoyi】java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoo

前言 ruoyi 4.6.0jdk1.8 错误 11:48:16.879 [http-nio-9031-exec-25] INFO c.r.f.s.r.UserRealm - [doGetAuthenticationInfo,128] - 对用户[admin]进行登录验证..验证未通过{} java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThre…

【学习积累】Queue 与 ConcurrentQueue性能测试

在 C# 中&#xff0c;关于队列&#xff08;Queue&#xff09;有两种&#xff0c;一种就是我们普通使用的队列&#xff0c;另一种是线程安全的队列 ConcurrentQueue<T> 。 ConcurrentQueue表示线程安全的先进先出 (FIFO) 集合。https://learn.microsoft.com/zh-cn/dotnet…

Python报错ModuleNotFoundError: No module named ‘concurrent‘

在测试Python的多线程时&#xff0c;根据官方的说法&#xff0c;concurrent.futures在Python3中已经内置了&#xff0c;不需要下载安装&#xff0c;如果是Python2则需要运行pip install futures进行安装。。。 这样导入&#xff0c;两种写法均可 import concurrent.futures #…

go语言工具_Concurrent Map

Concurrent Map 背景 map是平时项目中经常用到的数据类型&#xff0c;但是如果多个协程去读写同一个map时&#xff0c;为了不发生数据错误&#xff0c;经常去将其和锁封装成一个新的map。像以下两种示例。 type LockMap struct { m map[interface{}]interface{} l…

C#线程安全队列ConcurrentQueue

ConcurrentQueue成员函数 入队(EnQueue) 、出队(TryDequeue) 、是否为空(IsEmpty)、获取队列内元素数量(Count)。 void Enqueue(T item) 入队函数&#xff0c;当队列已满时会自动增加队列容量。 bool TryDequeue(T* result) 尝试移除并返回并发队列开头处对象&#xff0c;…

项目优化>C++,concurrentqueue(高性能并发队列)

项目中的数据队列基于轮询和selep的实时性及CUP性能差&#xff0c;需要进行优化&#xff0c;尝试使用concurrentqueue进行优化。网上有一些资料介绍,可供参考。 使用后的个人理解:一个线程安全的queue&#xff0c;并且concurrentqueue的线程安全并不是一味的加锁&#xff0c;它…

ConcurrentMap

ConcurrentMap&#xff0c;它是一个接口&#xff0c;是一个能够支持并发访问的java.util.map集合&#xff1b; ConcurrentHashMap是一个线程安全&#xff0c;并且是一个高效的HashMap。 spring 缓存注解 通过查看源代码发现将数据存在ConcurrentMap中 1 Map并发集合 1.1 Co…

学习线程安全队列ConcurrentQueue

首先,基本使用&#xff1a;入队(EnQueue) 、出队(TryDequeue) 、是否为空(IsEmpty)、获取队列内元素数量(Count)。 一、ConcurrentQueue内部结构: 1.实现原理 众所周知&#xff0c;在普通的非线程安全队列有两种实现方式: 1.使用数组实现的循环队列。 2.使用链表实现的队列…

并发系列(六)-----concurrent的简单介绍

一 简介 concurrent包是jdk1.5引入的重要的包&#xff0c;主要代码由大牛Doug Lea完成。这个包下的一些类如果用好了可以很方便的保证数据在多线程下操作的正确性。就比如说线程共享的i&#xff0c;如果使用concurrent包下的Atomic系列类可以很方便的解决这个问题。这篇文章简单…

python并发之concurrent快速入门

导读&#xff1a;我很笨&#xff0c;但是我很快——计算机之所以计算能力如此出众&#xff0c;不在于其有多智能&#xff0c;而是因为它超快的执行速度&#xff0c;而多核心则可以进一步成倍的提高效率。在python中&#xff0c;concurrent库就是用于完成并发的模块之一。 01 初…

Java 并发工具包(concurrent)详解

目录 一、concurrent并发包 二、ReentrantLock&#xff08;可重入锁&#xff09; 1、锁状态中断与可重入 2、尝试非阻塞地获取锁 3、等待可中断 4、设置公平锁 三、CountDownLatch&#xff08;门栓&#xff09; 四、cyclicBarrier&#xff08;栅栏&#xff09; 五、…

JAVA中split函数的用法

JAVA中split函数的用法 只写经常使用的&#xff0c;并不完整。 1.基本用法&#xff0c;将字符串按照指定字符串进行分割&#xff0c;例如&#xff1a; public class Main {public static void main(String[] args) {String ss "abcabcdefg";String[] split ss.sp…

C语言实现split函数

实现类似JAVA编程语言中split函数&#xff1a; &#xff08;这里以空格为分隔符进行演示&#xff09; 函数的声明&#xff1a;void split(char *src,const char *separator,char **dest,int *num) {}变量&#xff1a; 1.*src&#xff1a;要进行分割的字符串地址&#xff0c; 2…

mysql实现自定义split函数

1、自定义split函数脚本 CREATE DEFINER root% FUNCTION tjdemo.fun_get_split_string_total(f_string varchar(1000),f_delimiter varchar(5)) RETURNS int(11) LANGUAGE SQL NOT DETERMINISTIC CONTAINS SQL SQL SECURITY DEFINER COMMENT BEGIN declare returnInt int(11…

Oracle实现split函数

创建TYPE CREATE OR REPLACE TYPE TYPE_SPLIT AS TABLE OF VARCHAR2 (4000);创建函数 CREATE OR REPLACE FUNCTION SPLIT(P_STRING VARCHAR2, P_SEP VARCHAR2 : ,)RETURN TYPE_SPLITPIPELINED ISIDX PLS_INTEGER;V_STRING VARCHAR2(4000) : P_STRING; BEGINLOOPIDX : INSTR(…

java split函数的用法_java中split函数用法以及注意事项

java中split函数用法以及注意事项 发布时间&#xff1a;2020-04-23 10:28:23 来源&#xff1a;亿速云 阅读&#xff1a;215 作者&#xff1a;小新 本篇文章和大家了解一下java中split函数用法以及注意事项。有一定的参考价值&#xff0c;有需要的朋友可以参考一下&#xff0c;希…

mysql 创建函数 split_在mysql中实现split函数的几种方法

在mysql中实现split函数的几种方法 关注:98 答案:2 mip版 解决时间 2021-02-07 11:27 提问者夜落花台 2021-02-07 02:11 在mysql中实现split函数的几种方法 最佳答案 二级知识专家蓝莓九栀 2021-02-07 03:28 mysql 5.* 的版本现在没有split 函数,以下是几个自定义的split函数…

Oracle split函数

一、创建split函数 1、创建TYPE CREATE OR REPLACE TYPE TYPE_SPLIT AS TABLE OF VARCHAR2 (4000); / 2、创建split函数 CREATE OR REPLACE FUNCTION SPLIT(P_STRING VARCHAR2, P_SEP VARCHAR2 : ,)RETURN TYPE_SPLITPIPELINED ISIDX PLS_INTEGER;V_STRING VARCHAR2(4000)…

mysql有split函数么_mysql中split函数

在mysql中并没有split函数,需要自己写: 1)获得按指定字符分割的字符串的个数: Sql代码 DELIMITER$$ DROP FUNCTION IFEXISTS`sims`.`func_get_split_string_total`$$ CREATE DEFINER=`root`@`localhost` FUNCTION `func_get_split_string_total`( f_strin 在mysql中并没有sp…

Python之split函数的详解

目录 一、split函数的官方定义 二、split函数的深刻理解 二、split函数的深刻理解 split函数主要应用场景是Python对字符串的处理中&#xff08;数据分析&#xff0c;数据处理&#xff09;&#xff0c;以及计算机二级考试的常考基础知识点。 一、split函数的官方定义 定义…