[Python] 使用futures模块处理并发(超好用的并发库)

2024-01-25 03:08

本文主要是介绍[Python] 使用futures模块处理并发(超好用的并发库),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用futures模块处理并发

concurrent.futures模块的主要特色是ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。ThreadPoolExecutor和ProcessPoolExecutor的API接口一样,本文重点讲解ThreadPoolExecutor的用法。

一、用法示例

1. 使用 map() 处理多个任务

使用两行代码,就能帮我们调用多线程完成任务:

with futures.ThreadPoolExecutor(max_workers=5) as executor: 	# max_workers 最大线程数res = executor.map(fn, params)		# fn 是被调用函数,params 是 fn 的参数

例子如下:

from concurrent import futuresdef download_source(url):print(url)return url * 100urls = range(1, 20)with futures.ThreadPoolExecutor(max_workers=5) as executor:res = executor.map(download_source, urls)print(list(res))
1
2
3
4
...	# 省略了输出
18
19
[100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900]

注意事项: map() 返回结果的顺序和调用开始的顺序一直,如果第一个调用生成结果用时10秒,而其他调用只用1秒,代码会阻塞10秒以获取第一个结果。在此之后,获取后续结果不会阻塞,因为后续调用已经结束。更可取的方式是,不管提交顺序如何,只要有结果就获取。而使用 Executor.submit 方法和 futures.as_completed 函数可以帮我们完成这种方式。

2. 使用 submit() 提交单个任务

map() 可以帮我们快速完成一批任务,但是我们有时候需要的是单个处理,可以使用 submit():

import time
from concurrent import futuresdef sleep_and_print(number):time.sleep(1)print('I am number', number)return number * 100numbers = range(1, 100)with futures.ThreadPoolExecutor(max_workers=5) as executor:for num in numbers:future = executor.submit(sleep_and_print, num)	# 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。

此处 submit() 返回的 future 并不是结果,而是指待完成的操作。future 封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果后可以获取结果(或者异常)。

3. 使用 as_completed() 逐个获取已完成的任务

as_completed() 返回一个包含 fs 所指定的 Future 实例(可能由不同的 Executor 实例创建)的迭代器,这些实例会在完成时生成 future 对象(包括正常结束或被取消的 future 对象)。 任何由 fs 所指定的重复 future 对象将只被返回一次。 任何在 as_completed() 被调用之前完成的 future 对象将优先被生成。 如果 __next__() 被调用并且在对 as_completed() 的原始调用 timeout 秒之后结果仍不可用,则返回的迭代器将引发 concurrent.futures.TimeoutErrortimeout 可以为整数或浮点数。 如果 timeout 未指定或为 None,则不限制等待时间。

3.1 获取运行结果

我们尝试获取结果:

with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []		# 待完成的队列,用于保存所有 futurefor _i in numbers:future = executor.submit(sleep_and_print, _i)	# 加入执行to_do.append(future)results = []	for future in futures.as_completed(to_do):	# 等待完成res = future.result()	# 接收结果results.append(res)print("Already Finished", res)print(results)
3.2 获取异常
import time
from concurrent import futuresclass DemoException(Exception):		# 自定义异常passdef sleep_and_print(number):time.sleep(1)print('I am number', number)raise DemoException				# 产出异常numbers = range(1, 10)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  to_do.append(future)exceptions = []for future in futures.as_completed(to_do):res = future.exception()	# future.exception() 获取异常exceptions.append(res)print(exceptions)
I am numberI am number 1 3I am number 2
I am number 4
I am number 5
I am number 6
I am numberI am number I am number 7
8 9[DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException()]

4. 使用 Lock 解决(打印混乱)争用问题

上面的代码是有问题的,我们访问同一个打印控制台,打印会造成混乱。ThreadPoolExecutor 内部是使用的原生的threading去创建线程,所以我们使用 threading.Lock 就可以解决争用问题:

from concurrent import futures
from threading import Lock
import timeg_Lock = Lock()     # 多线程互斥访问 print 打印控制台,需要加锁def sleep_and_print(number):time.sleep(1)with g_Lock:	# 尝试获取锁print('I am number', number)return number * 100numbers = range(1, 1000)with futures.ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(sleep_and_print, numbers)

5. 使用 add_done_callback() 回调函数

import time
from concurrent import futures
from threading import Lockg_Lock = Lock()def sleep_and_print(number):time.sleep(1)with g_Lock:print('I am number', number)return number * 100def _done(f):		# f 是 future 对象,是唯一参数with g_Lock:print(f, 'Done')numbers = range(1, 1000)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  # 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。to_do.append(future)for future in to_do:future.add_done_callback(_done)		# 回调函数
I am number 1
<Future at 0x25154d1ae08 state=finished returned int> Done
I am number 2
<Future at 0x25154d6a508 state=finished returned int> Done
I am number 3
<Future at 0x25154d75108 state=finished returned int> Done
I am number 4
<Future at 0x25154d753c8 state=finished returned int> Done
I am number 5
I am number 6
<Future at 0x25154d75748 state=finished returned int> Done
<Future at 0x25154d75588 state=finished returned int> Done
...

6. 使用 wait() 获取所有 future 的状态

wait() 会返回一个命名2元组,包含两个 set(),一个 set 包含已完成的 future (finished or cancelled) ,另一个包含未完成的 future:

import time
from concurrent import futures
from threading import Lockg_Lock = Lock()def sleep_and_print(number):time.sleep(1)with g_Lock:print('I am number', number)return number * 100numbers = range(1, 10)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  # 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。to_do.append(future)time.sleep(2)with g_Lock:print(futures.wait(to_do, timeout=1))
I am number 2
I am number 1
I am number 3
I am number 5
DoneAndNotDoneFutures(done={<Future at 0x2724b629748 state=finished returned int>, <Future at 0x2724b635488 state=finished returned int>, <Future at 0x2724b5b1ac8 state=finished returned int>, <Future at 0x2724b62cfc8 state=finished returned int>}, not_done={<Future at 0x2724b635648 state=running>, <Future at 0x2724b6352c8 state=running>, <Future at 0x2724b635b08 state=pending>, <Future at 0x2724b635948 state=pending>, <Future at 0x2724b6357c8 state=running>})
I am number 6
I am number 4
I am number 7
I am number 9
I am number 8

二、核心内容

1. Executor 对象详解

concurrent.futures 模块中的 Executor 提供了异步执行调用方法,是 ThreadPoolExecutor 和 ProcessPoolExecutor 的父类。Executor 提供了如下方法:

  • submit(fn, *args, **kwargs):调度可调用对象 fn,以 fn(*args **kwargs) 方式执行并返回 Future 对象代表可调用对象的执行。
  • map(func, *iterables, timeout=None, chunksize=1) :类似于全局函数 map(func, *iterables), 将 iterables 和 func 组成任务立即丢入任务池,生成多线程或多进程从任务池中取任务。func 会被异步执行。
  • shutdown(wait=True):关闭 Executor,释放资源。不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。

2. future 对象详解

future 是 concurrent.futures 模块和 asyncio 包的重要组件。

future 封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)。Future 提供了如下方法:

  • future.result(timeout=None) :返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常。result()会阻塞调用方所在的线程,直到有结果可返回,但是result()可以接收可选的timeout参数,如果在指定的时间内future没有运行完毕,会抛出TimeoutError异常

    try:print(to_do[9].result(timeout=1))
    except futures.TimeoutError:print('TimeoutError')
    
  • future.add_done_callback(fn) :当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。

  • future.cancel() :尝试取消调用。 如果调用正在执行或已结束运行不能被取消则该方法将返回 False,否则调用会被取消并且该方法将返回 True

  • future.done():返回布尔值,指明future链接的可调用对象是否已经执行,不会阻塞

  • future.cancelled():返回布尔值,指明future链接的可调用对象是否已经成功取消,不会阻塞

  • future.running():返回布尔值,指明future链接的可调用对象是否已经正在执行而且不能被取消,不会阻塞

3. map() 源码剖析

我们知道 map() 是这么调用的:

with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_source, urls)

前面说过,map() 返回结果的顺序和调用开始的顺序一直,如果第一个调用生成结果用时10秒,而其他调用只用1秒,代码会阻塞10秒以获取第一个结果。map() 封装很优秀,忍不住看了看源码,Executor.map() 定义在 Lib.concurrent.futures._base 中:

def map(self, fn, *iterables, timeout=None, chunksize=1):...fs = [self.submit(fn, *args) for args in zip(*iterables)]	# 将可迭代对象使用submit()构造生成futuresdef result_iterator():...while fs:if timeout is None:yield fs.pop().result()		# fs.pop()按顺序返回一个future; 而future.result()会阻塞else:yield fs.pop().result(end_time - time.monotonic()) ...return result_iterator()

可以看到,Executor.map() 内部也是使用了 submit() 和 result(),但是它返回一个迭代器,迭代器中包含的是各个 future 的结果,而非 future 本身。

4. ThreadPoolExecutor 核心工作源码剖析

[Python] future线程处理并发的核心源码详解

这篇关于[Python] 使用futures模块处理并发(超好用的并发库)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/641899

相关文章

Java中的ConcurrentBitSet使用小结

《Java中的ConcurrentBitSet使用小结》本文主要介绍了Java中的ConcurrentBitSet使用小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、核心澄清:Java标准库无内置ConcurrentBitSet二、推荐方案:Eclipse

Go语言结构体标签(Tag)的使用小结

《Go语言结构体标签(Tag)的使用小结》结构体标签Tag是Go语言中附加在结构体字段后的元数据字符串,用于提供额外的属性信息,这些信息可以通过反射在运行时读取和解析,下面就来详细的介绍一下Tag的使... 目录什么是结构体标签?基本语法常见的标签用途1.jsON 序列化/反序列化(最常用)2.数据库操作(

Java中ScopeValue的使用小结

《Java中ScopeValue的使用小结》Java21引入的ScopedValue是一种作用域内共享不可变数据的预览API,本文就来详细介绍一下Java中ScopeValue的使用小结,感兴趣的可以... 目录一、Java ScopedValue(作用域值)详解1. 定义与背景2. 核心特性3. 使用方法

spring中Interceptor的使用小结

《spring中Interceptor的使用小结》SpringInterceptor是SpringMVC提供的一种机制,用于在请求处理的不同阶段插入自定义逻辑,通过实现HandlerIntercept... 目录一、Interceptor 的核心概念二、Interceptor 的创建与配置三、拦截器的执行顺

基于Python编写一个git自动上传的脚本(打包成exe)

《基于Python编写一个git自动上传的脚本(打包成exe)》这篇文章主要为大家详细介绍了如何基于Python编写一个git自动上传的脚本并打包成exe,文中的示例代码讲解详细,感兴趣的小伙伴可以跟... 目录前言效果如下源码实现利用pyinstaller打包成exe利用ResourceHacker修改e

Python在二进制文件中进行数据搜索的实战指南

《Python在二进制文件中进行数据搜索的实战指南》在二进制文件中搜索特定数据是编程中常见的任务,尤其在日志分析、程序调试和二进制数据处理中尤为重要,下面我们就来看看如何使用Python实现这一功能吧... 目录简介1. 二进制文件搜索概述2. python二进制模式文件读取(rb)2.1 二进制模式与文本

Python中Tkinter GUI编程详细教程

《Python中TkinterGUI编程详细教程》Tkinter作为Python编程语言中构建GUI的一个重要组件,其教程对于任何希望将Python应用到实际编程中的开发者来说都是宝贵的资源,这篇文... 目录前言1. Tkinter 简介2. 第一个 Tkinter 程序3. 窗口和基础组件3.1 创建窗

Django调用外部Python程序的完整项目实战

《Django调用外部Python程序的完整项目实战》Django是一个强大的PythonWeb框架,它的设计理念简洁优雅,:本文主要介绍Django调用外部Python程序的完整项目实战,文中通... 目录一、为什么 Django 需要调用外部 python 程序二、三种常见的调用方式方式 1:直接 im

Python字符串处理方法超全攻略

《Python字符串处理方法超全攻略》字符串可以看作多个字符的按照先后顺序组合,相当于就是序列结构,意味着可以对它进行遍历、切片,:本文主要介绍Python字符串处理方法的相关资料,文中通过代码介... 目录一、基础知识:字符串的“不可变”特性与创建方式二、常用操作:80%场景的“万能工具箱”三、格式化方法

Spring Boot 处理带文件表单的方式汇总

《SpringBoot处理带文件表单的方式汇总》本文详细介绍了六种处理文件上传的方式,包括@RequestParam、@RequestPart、@ModelAttribute、@ModelAttr... 目录方式 1:@RequestParam接收文件后端代码前端代码特点方式 2:@RequestPart接