Python最广为使用的并发库futures使用入门与内部原理

2023-11-28 02:08

本文主要是介绍Python最广为使用的并发库futures使用入门与内部原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在使用Python处理任务时,限于单线程处理能力有限,需要将任务并行化,分散到多个线程或者是多个进程去执行。

concurrent.futures就是这样一种库,它可以让用户可以非常方便的将任务并行化。这个名字有点长,后面我直接使用词汇concurrent来代替concurrent.futures。

640?wx_fmt=jpeg

concurrent提供了两种并发模型,一个是多线程ThreadPoolExecutor,一个是多进程ProcessPoolExecutor。对于IO密集型任务宜使用多线程模型。对于计算密集型任务应该使用多进程模型。

为什么要这样选择呢?是因为Python GIL的存在让Python虚拟机在进行运算时无法有效利用多核心。对于纯计算任务,它永远最多只能榨干单个CPU核心。如果要突破这个瓶颈,就必须fork出多个子进程来分担计算任务。而对于IO密集型任务,CPU使用率往往是极低的,使用多线程虽然会加倍CPU使用率,但是还远远到不了饱和(100%)的地步,在单核心可以应付整体计算的前提下,自然是应该选择资源占用少的模式,也就是多线程模式。

接下来我们分别尝试一下两种模式来进行并行计算。

多线程

640?wx_fmt=png

多线程模式适合IO密集型运算,这里我要使用sleep来模拟一下慢速的IO任务。同时为了方便编写命令行程序,这里使用Google fire开源库来简化命令行参数处理。

# coding: utf8
# t.py
import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait# 分割子任务
def each_task(index):time.sleep(1)  # 睡1s,模拟IOprint "thread %s square %d" % (threading.current_thread().ident, index)return index * index  # 返回结果def run(thread_num, task_num):# 实例化线程池,thread_num个线程executor = ThreadPoolExecutor(thread_num)start = time.time()fs = []  # future列表for i in range(task_num):fs.append(executor.submit(each_task, i))  # 提交任务wait(fs)  # 等待计算结束end = time.time()duration = end - starts = sum([f.result() for f in fs])  # 求和print "total result=%s cost: %.2fs" % (s, duration)executor.shutdown()  # 销毁线程池if __name__ == '__main__':fire.Fire(run)

运行python t.py 2 10,也就是2个线程跑10个任务,观察输出

thread 123145422131200 square 0thread 123145426337792 square 1
thread 123145426337792 square 2thread 123145422131200 square 3
thread 123145426337792 square 4
thread 123145422131200 square 5
thread 123145426337792 square 6
thread 123145422131200 square 7
thread 123145426337792 square 8
thread 123145422131200 square 9
total result=285 cost: 5.02s

我们看到计算总共花费了大概5s,总共sleep了10s由两个线程分担,所以是5s。读者也许会问,为什么输出乱了,这是因为print操作不是原子的,它是两个连续的write操作合成的,第一个write输出内容,第二个write输出换行符,write操作本身是原子的,但是在多线程环境下,这两个write操作会交错执行,所以输出就不整齐了。如果将代码稍作修改,将print改成单个write操作,输出就整齐了(关于write是否绝对原子性还需要进一步深入讨论)

# 分割子任务
def each_task(index):time.sleep(1)  # 睡1s,模拟IOimport syssys.stdout.write("thread %s square %d\n" % (threading.current_thread().ident, index))return index * index  # 返回结果

我们再跑一下python t.py 2 10,观察输出

thread 123145438244864 square 0
thread 123145442451456 square 1
thread 123145442451456 square 2
thread 123145438244864 square 3
thread 123145438244864 square 4
thread 123145442451456 square 5
thread 123145438244864 square 6
thread 123145442451456 square 7
thread 123145442451456 square 9
thread 123145438244864 square 8
total result=285 cost: 5.02s

接下来,我们改变参数,扩大到10个线程,看看所有任务总共需要多久完成

> python t.py 10 10
thread 123145327464448 square 0
thread 123145335877632 square 2
thread 123145331671040 square 1
thread 123145344290816 square 4
thread 123145340084224 square 3
thread 123145348497408 square 5
thread 123145352704000 square 6
thread 123145356910592 square 7
thread 123145365323776 square 9
thread 123145361117184 square 8
total result=285 cost: 1.01s

可以看到1s中就完成了所有的任务。这就是多线程的魅力,可以将多个IO操作并行化,减少整体处理时间。

多进程

640?wx_fmt=png

相比多线程适合处理IO密集型任务,多进程适合计算密集型。接下来我们要模拟一下计算密集型任务。我的个人电脑有2个核心,正好可以体验多核心计算的优势。

那这个密集型计算任务怎么模拟呢,我们可以使用圆周率计算公式。

640?wx_fmt=png

通过扩大级数的长度n,就可以无限逼近圆周率。当n特别大时,计算会比较缓慢,这时候CPU就会一直处于繁忙状态,这正是我们所期望的。

好,下面开写多进程并行计算代码

# coding: utf8
# p.py
import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait# 分割子任务
def each_task(n):# 按公式计算圆周率s = 0.0for i in range(n):s += 1.0/(i+1)/(i+1)pi = math.sqrt(6*s)# os.getpid可以获得子进程号sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))return pidef run(process_num, *ns):  # 输入多个n值,分成多个子任务来计算结果# 实例化进程池,process_num个进程executor = ProcessPoolExecutor(process_num)start = time.time()fs = []  # future列表for n in ns:fs.append(executor.submit(each_task, int(n)))  # 提交任务wait(fs)  # 等待计算结束end = time.time()duration = end - startprint "total cost: %.2fs" % durationexecutor.shutdown()  # 销毁进程池if __name__ == '__main__':fire.Fire(run)

通过代码可以看出多进程模式在代码的编写上和多线程没有多大差异,仅仅是换了一个类名,其它都一摸一样。这也是concurrent库的魅力所在,将多线程和多进程模型抽象出了一样的使用接口。

接下来我们运行一下python p.py 1 5000000 5001000 5002000 5003000,总共计算4次pi,只用一个进程。观察输出

process 96354 n=5000000 pi=3.1415924626
process 96354 n=5001000 pi=3.14159246264
process 96354 n=5002000 pi=3.14159246268
process 96354 n=5003000 pi=3.14159246272
total cost: 9.45s

可以看出来随着n的增大,结果越来越逼近圆周率,因为只用了一个进程,所以任务是串行执行,总共花了大约9.5s。

接下来再增加一个进程,观察输出

> python p.py 2 5000000 5001000 5002000 5003000
process 96529 n=5001000 pi=3.14159246264
process 96530 n=5000000 pi=3.1415924626
process 96529 n=5002000 pi=3.14159246268
process 96530 n=5003000 pi=3.14159246272
total cost: 4.98s

从耗时上看缩短了接近1半,说明多进程确实起到了计算并行化的效果。此刻如果使用top命令观察进程的CPU使用率,这两个进程的CPU使用率都占到了接近100%。

如果我们再增加2个进程,是不是还能继续压缩计算时间呢

> python p.py 4 5000000 5001000 5002000 5003000
process 96864 n=5002000 pi=3.14159246268
process 96862 n=5000000 pi=3.1415924626
process 96863 n=5001000 pi=3.14159246264
process 96865 n=5003000 pi=3.14159246272
total cost: 4.86s

看来耗时不能继续节约了,因为只有2个计算核心,2个进程已经足以榨干它们了,即使再多加进程也只有2个计算核心可用。

深入原理

concurrent用的时候非常简单,但是内部实现并不是很好理解。在深入分析内部的结构之前,我们需要先理解一下Future这个对象。在前面的例子中,executor提交(submit)任务后都会返回一个Future对象,它表示一个结果的坑,在任务刚刚提交时,这个坑是空的,一旦子线程运行任务结束,就会将运行的结果塞到这个坑里,主线程就可以通过Future对象获得这个结果。简单一点说,Future对象是主线程和子线程通信的媒介。

640?wx_fmt=png

Future对象的内部逻辑简单一点可以使用下面的代码进行表示

class Future(object):def __init__(self):self._condition = threading.Condition()  # 条件变量self._result = Nonedef result(self, timeout=None):self._condition.wait(timeout)return self._resultdef set_result(self, result):self._result = resultself._condition.notify_all()

主线程将任务塞进线程池后得到了这个Future对象,它内部的_result还是空的。如果主线程调用result()方法获取结果,就会阻塞在条件变量上。如果子线程计算任务完成了就会立即调用set_result()方法将结果填充进future对象,并唤醒阻塞在条件变量上的线程,也就是主线程。这时主线程立即醒过来并正常返回结果。

线程池内部结构

主线程和子线程交互分为两部分,第一部分是主线程如何将任务传递给子线程,第二部分是子线程如何将结果传递给主线程。第二部分已经讲过了是通过Future对象来完成的。那第一部分是怎么做到的呢?

640?wx_fmt=png

如上图所示,秘密就在于这个队列,主线程是通过队列将任务传递给多个子线程的。一旦主线程将任务塞进任务队列,子线程们就会开始争抢,最终只有一个线程能抢到这个任务,并立即进行执行,执行完后将结果放进Future对象就完成了这个任务的完整执行过程。

线程池的缺点

concurrent的线程池有个重大的设计问题,那就是任务队列是无界的。如果队列的生产者任务生产的太快,而线程池消费太慢处理不过来,任务就会堆积。如果堆积一直持续下去,内存就会持续增长直到OOM,任务队列里堆积的所有任务全部彻底丢失。用户使用时一定要注意这点,并做好适当的控制。

进程池内部结构

进程池内部结构复杂,连concurent库的作者自己也觉得特别复杂,所以在代码里专门画了一张ascii图来讲解模型内部结构

640?wx_fmt=png
640?wx_fmt=png

我觉得作者的这张图还不够好懂,所以也单独画了一张图,请读者们仔细结合上面两张图,一起来过一边完整的任务处理过程。

  1. 主线程将任务塞进TaskQueue(普通内存队列),拿到Future对象

  2. 唯一的管理线程从TaskQueue获取任务,塞进CallQueue(分布式跨进程队列)

  3. 子进程从CallQueue中争抢任务进行处理

  4. 子进程将处理结果塞进ResultQueue(分布式跨进程队列)

  5. 管理线程从ResultQueue中获取结果,塞进Future对象

  6. 主线程从Future对象中拿到结果

这个复杂的流程中涉及到3个队列,还有中间附加的管理线程。那为什么作者要设计的这么复杂,这样的设计有什么好处?

首先,我们看这张图的左半边,它和线程池的处理流程没有太多区别,区别仅仅是管理线程只有一个,而线程池的子线程会有多个。这样设计可以使得多进程模型和多线程模型的使用方法保持一致,这就是为什么两个模型使用起来没有任何区别的原因所在——通过中间的管理线程隐藏了背后的多进程交互逻辑。

然后我们再看这张图的右半边,管理线程通过两个队列来和子进程们进行交互,这两个队列都是跨进程队列(multiprocessing.Queue)。CallQueue是单生产者多消费者,ResultQueue是多生产者单消费者。

CallQueue是个有界队列,它的上限在代码里写死了为「子进程数+1」。如果子进程们处理不过来,CallQueue就会变满,管理线程就会停止往里面塞数据。但是这里也遭遇了和线程池一样的问题,TaskQueue是无界队列,它的内容可不管消费者是否在持续(管理线程)消费,TaskQueue会无限制的持续生长,于是最终也会会导致OOM。

跨进程队列

进程池模型中的跨进程队列是用multiprocessing.Queue实现的。那这个跨进程队列内部细节是怎样的,它又是用什么高科技来实现的呢

笔者仔细阅读了multiprocessing.Queue的源码发现,它使用无名套接字sockerpair来完成的跨进程通信,socketpair和socket的区别就在于socketpair不需要端口,不需要走网络协议栈,通过内核的套接字读写缓冲区直接进行跨进程通信。

640?wx_fmt=png

当父进程要传递任务给子进程时,先使用pickle将任务对象进行序列化成字节数组,然后将字节数组通过socketpair的写描述符写入内核的buffer中。子进程接下来就可以从buffer中读取到字节数组,然后再使用pickle对字节数组进行反序列化来得到任务对象,这样总算可以执行任务了。同样子进程将结果传递给父进程走的也是一样的流程,只不过这里的socketpair是ResultQueue内部创建的无名套接字。

multiprocessing.Queue是支持双工通信,数据流向可以是父到子,也可以是子到父,只不过在concurrent的进程池实现中只用到了单工通信。CallQueue是从父到子,ResultQueue是从子到父。

总结

concurrent.futures框架非常好用,虽然内部实现机制异常复杂,读者也无需完全理解内部细节就可以直接使用了。但是需要特别注意的是不管是线程池还是进程池其内部的任务队列都是无界的,一定要避免消费者处理不及时内存持续攀升的情况发生。

近日, 老钱的新书《深入理解RPC》正式上线,限时优惠9.9元,感兴趣的读者扫描下面的二维码进入小册首页来阅读。还可以点击阅读原文进入小册首页。老钱的文章内容一如既往——带读者透过技术表面形式看穿深度内核本质。

640?wx_fmt=other

640?wx_fmt=jpeg

名人推荐

640?wx_fmt=other

640?wx_fmt=other

读者福利

作者决定拿出一部分自己收藏的好书作为福利,以抽奖的形式免费赠予购买小册的用户(国内用户包邮),数量有限,大家就拼人品吧(抽奖规则时间在读者微信群同步)。

  1. 《C Primer Plus》

  2. 《动手玩转 Arduino》

  3. 《白帽子讲 Web 安全》

  4. 《深入理解程序设计——使用 Linux 汇编语言》

  5. 《图解服务器网络架构》

  6. 《构建 Oracle 高可用环境》

等等,除了以上 6 本外,还有很多其它的好书,这里就不一一列举了。

640?wx_fmt=jpeg

高级文章,关注公众号【码洞】

点击阅读原文,进入小册首页

这篇关于Python最广为使用的并发库futures使用入门与内部原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/428895

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal