[Python] 使用futures模块处理并发(超好用的并发库)

2024-01-25 03:08

本文主要是介绍[Python] 使用futures模块处理并发(超好用的并发库),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用futures模块处理并发

concurrent.futures模块的主要特色是ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。ThreadPoolExecutor和ProcessPoolExecutor的API接口一样,本文重点讲解ThreadPoolExecutor的用法。

一、用法示例

1. 使用 map() 处理多个任务

使用两行代码,就能帮我们调用多线程完成任务:

with futures.ThreadPoolExecutor(max_workers=5) as executor: 	# max_workers 最大线程数res = executor.map(fn, params)		# fn 是被调用函数,params 是 fn 的参数

例子如下:

from concurrent import futuresdef download_source(url):print(url)return url * 100urls = range(1, 20)with futures.ThreadPoolExecutor(max_workers=5) as executor:res = executor.map(download_source, urls)print(list(res))
1
2
3
4
...	# 省略了输出
18
19
[100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900]

注意事项: map() 返回结果的顺序和调用开始的顺序一直,如果第一个调用生成结果用时10秒,而其他调用只用1秒,代码会阻塞10秒以获取第一个结果。在此之后,获取后续结果不会阻塞,因为后续调用已经结束。更可取的方式是,不管提交顺序如何,只要有结果就获取。而使用 Executor.submit 方法和 futures.as_completed 函数可以帮我们完成这种方式。

2. 使用 submit() 提交单个任务

map() 可以帮我们快速完成一批任务,但是我们有时候需要的是单个处理,可以使用 submit():

import time
from concurrent import futuresdef sleep_and_print(number):time.sleep(1)print('I am number', number)return number * 100numbers = range(1, 100)with futures.ThreadPoolExecutor(max_workers=5) as executor:for num in numbers:future = executor.submit(sleep_and_print, num)	# 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。

此处 submit() 返回的 future 并不是结果,而是指待完成的操作。future 封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果后可以获取结果(或者异常)。

3. 使用 as_completed() 逐个获取已完成的任务

as_completed() 返回一个包含 fs 所指定的 Future 实例(可能由不同的 Executor 实例创建)的迭代器,这些实例会在完成时生成 future 对象(包括正常结束或被取消的 future 对象)。 任何由 fs 所指定的重复 future 对象将只被返回一次。 任何在 as_completed() 被调用之前完成的 future 对象将优先被生成。 如果 __next__() 被调用并且在对 as_completed() 的原始调用 timeout 秒之后结果仍不可用,则返回的迭代器将引发 concurrent.futures.TimeoutErrortimeout 可以为整数或浮点数。 如果 timeout 未指定或为 None,则不限制等待时间。

3.1 获取运行结果

我们尝试获取结果:

with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []		# 待完成的队列,用于保存所有 futurefor _i in numbers:future = executor.submit(sleep_and_print, _i)	# 加入执行to_do.append(future)results = []	for future in futures.as_completed(to_do):	# 等待完成res = future.result()	# 接收结果results.append(res)print("Already Finished", res)print(results)
3.2 获取异常
import time
from concurrent import futuresclass DemoException(Exception):		# 自定义异常passdef sleep_and_print(number):time.sleep(1)print('I am number', number)raise DemoException				# 产出异常numbers = range(1, 10)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  to_do.append(future)exceptions = []for future in futures.as_completed(to_do):res = future.exception()	# future.exception() 获取异常exceptions.append(res)print(exceptions)
I am numberI am number 1 3I am number 2
I am number 4
I am number 5
I am number 6
I am numberI am number I am number 7
8 9[DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException()]

4. 使用 Lock 解决(打印混乱)争用问题

上面的代码是有问题的,我们访问同一个打印控制台,打印会造成混乱。ThreadPoolExecutor 内部是使用的原生的threading去创建线程,所以我们使用 threading.Lock 就可以解决争用问题:

from concurrent import futures
from threading import Lock
import timeg_Lock = Lock()     # 多线程互斥访问 print 打印控制台,需要加锁def sleep_and_print(number):time.sleep(1)with g_Lock:	# 尝试获取锁print('I am number', number)return number * 100numbers = range(1, 1000)with futures.ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(sleep_and_print, numbers)

5. 使用 add_done_callback() 回调函数

import time
from concurrent import futures
from threading import Lockg_Lock = Lock()def sleep_and_print(number):time.sleep(1)with g_Lock:print('I am number', number)return number * 100def _done(f):		# f 是 future 对象,是唯一参数with g_Lock:print(f, 'Done')numbers = range(1, 1000)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  # 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。to_do.append(future)for future in to_do:future.add_done_callback(_done)		# 回调函数
I am number 1
<Future at 0x25154d1ae08 state=finished returned int> Done
I am number 2
<Future at 0x25154d6a508 state=finished returned int> Done
I am number 3
<Future at 0x25154d75108 state=finished returned int> Done
I am number 4
<Future at 0x25154d753c8 state=finished returned int> Done
I am number 5
I am number 6
<Future at 0x25154d75748 state=finished returned int> Done
<Future at 0x25154d75588 state=finished returned int> Done
...

6. 使用 wait() 获取所有 future 的状态

wait() 会返回一个命名2元组,包含两个 set(),一个 set 包含已完成的 future (finished or cancelled) ,另一个包含未完成的 future:

import time
from concurrent import futures
from threading import Lockg_Lock = Lock()def sleep_and_print(number):time.sleep(1)with g_Lock:print('I am number', number)return number * 100numbers = range(1, 10)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  # 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。to_do.append(future)time.sleep(2)with g_Lock:print(futures.wait(to_do, timeout=1))
I am number 2
I am number 1
I am number 3
I am number 5
DoneAndNotDoneFutures(done={<Future at 0x2724b629748 state=finished returned int>, <Future at 0x2724b635488 state=finished returned int>, <Future at 0x2724b5b1ac8 state=finished returned int>, <Future at 0x2724b62cfc8 state=finished returned int>}, not_done={<Future at 0x2724b635648 state=running>, <Future at 0x2724b6352c8 state=running>, <Future at 0x2724b635b08 state=pending>, <Future at 0x2724b635948 state=pending>, <Future at 0x2724b6357c8 state=running>})
I am number 6
I am number 4
I am number 7
I am number 9
I am number 8

二、核心内容

1. Executor 对象详解

concurrent.futures 模块中的 Executor 提供了异步执行调用方法,是 ThreadPoolExecutor 和 ProcessPoolExecutor 的父类。Executor 提供了如下方法:

  • submit(fn, *args, **kwargs):调度可调用对象 fn,以 fn(*args **kwargs) 方式执行并返回 Future 对象代表可调用对象的执行。
  • map(func, *iterables, timeout=None, chunksize=1) :类似于全局函数 map(func, *iterables), 将 iterables 和 func 组成任务立即丢入任务池,生成多线程或多进程从任务池中取任务。func 会被异步执行。
  • shutdown(wait=True):关闭 Executor,释放资源。不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。

2. future 对象详解

future 是 concurrent.futures 模块和 asyncio 包的重要组件。

future 封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)。Future 提供了如下方法:

  • future.result(timeout=None) :返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常。result()会阻塞调用方所在的线程,直到有结果可返回,但是result()可以接收可选的timeout参数,如果在指定的时间内future没有运行完毕,会抛出TimeoutError异常

    try:print(to_do[9].result(timeout=1))
    except futures.TimeoutError:print('TimeoutError')
    
  • future.add_done_callback(fn) :当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。

  • future.cancel() :尝试取消调用。 如果调用正在执行或已结束运行不能被取消则该方法将返回 False,否则调用会被取消并且该方法将返回 True

  • future.done():返回布尔值,指明future链接的可调用对象是否已经执行,不会阻塞

  • future.cancelled():返回布尔值,指明future链接的可调用对象是否已经成功取消,不会阻塞

  • future.running():返回布尔值,指明future链接的可调用对象是否已经正在执行而且不能被取消,不会阻塞

3. map() 源码剖析

我们知道 map() 是这么调用的:

with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_source, urls)

前面说过,map() 返回结果的顺序和调用开始的顺序一直,如果第一个调用生成结果用时10秒,而其他调用只用1秒,代码会阻塞10秒以获取第一个结果。map() 封装很优秀,忍不住看了看源码,Executor.map() 定义在 Lib.concurrent.futures._base 中:

def map(self, fn, *iterables, timeout=None, chunksize=1):...fs = [self.submit(fn, *args) for args in zip(*iterables)]	# 将可迭代对象使用submit()构造生成futuresdef result_iterator():...while fs:if timeout is None:yield fs.pop().result()		# fs.pop()按顺序返回一个future; 而future.result()会阻塞else:yield fs.pop().result(end_time - time.monotonic()) ...return result_iterator()

可以看到,Executor.map() 内部也是使用了 submit() 和 result(),但是它返回一个迭代器,迭代器中包含的是各个 future 的结果,而非 future 本身。

4. ThreadPoolExecutor 核心工作源码剖析

[Python] future线程处理并发的核心源码详解

这篇关于[Python] 使用futures模块处理并发(超好用的并发库)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/641899

相关文章

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

SpringBoot结合Docker进行容器化处理指南

《SpringBoot结合Docker进行容器化处理指南》在当今快速发展的软件工程领域,SpringBoot和Docker已经成为现代Java开发者的必备工具,本文将深入讲解如何将一个SpringBo... 目录前言一、为什么选择 Spring Bootjavascript + docker1. 快速部署与

深入理解Go语言中二维切片的使用

《深入理解Go语言中二维切片的使用》本文深入讲解了Go语言中二维切片的概念与应用,用于表示矩阵、表格等二维数据结构,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧... 目录引言二维切片的基本概念定义创建二维切片二维切片的操作访问元素修改元素遍历二维切片二维切片的动态调整追加行动态

prometheus如何使用pushgateway监控网路丢包

《prometheus如何使用pushgateway监控网路丢包》:本文主要介绍prometheus如何使用pushgateway监控网路丢包问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录监控网路丢包脚本数据图表总结监控网路丢包脚本[root@gtcq-gt-monitor-prome

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

SpringBoot中如何使用Assert进行断言校验

《SpringBoot中如何使用Assert进行断言校验》Java提供了内置的assert机制,而Spring框架也提供了更强大的Assert工具类来帮助开发者进行参数校验和状态检查,下... 目录前言一、Java 原生assert简介1.1 使用方式1.2 示例代码1.3 优缺点分析二、Spring Fr

Python办公自动化实战之打造智能邮件发送工具

《Python办公自动化实战之打造智能邮件发送工具》在数字化办公场景中,邮件自动化是提升工作效率的关键技能,本文将演示如何使用Python的smtplib和email库构建一个支持图文混排,多附件,多... 目录前言一、基础配置:搭建邮件发送框架1.1 邮箱服务准备1.2 核心库导入1.3 基础发送函数二、

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

java使用protobuf-maven-plugin的插件编译proto文件详解

《java使用protobuf-maven-plugin的插件编译proto文件详解》:本文主要介绍java使用protobuf-maven-plugin的插件编译proto文件,具有很好的参考价... 目录protobuf文件作为数据传输和存储的协议主要介绍在Java使用maven编译proto文件的插件

Python包管理工具pip的升级指南

《Python包管理工具pip的升级指南》本文全面探讨Python包管理工具pip的升级策略,从基础升级方法到高级技巧,涵盖不同操作系统环境下的最佳实践,我们将深入分析pip的工作原理,介绍多种升级方... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核