第二十章 Concurrent Executors

2023-11-11 14:50

本文主要是介绍第二十章 Concurrent Executors,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

抨击线程的人往往是系统程序员,他们考虑的用例是一般的应用程序程序员在她的生活中永远不会遇到的用例。在应用程序程序员可能会遇到 的用例中, 99%的情况下只需知道如何派生一堆独立的线程,然后用队列收集结果。

                                ------Michele Simionato, Python deep thinker

本章重点介绍了 concurrent.futures.Executor 类,这些类封装了 Michele Simionato 描述的“生成一堆独立线程并将结果收集到队列中”的模式。并发执行器使这种模式几乎可以轻松使用,不仅适用于线程,而且适用于进程——对于计算密集型任务非常有用。

在这里,我还介绍了futures的概念——代表操作的异步执行的对象,类似于 JavaScript 的promise。这个原始想法不仅是 concurrent.futures 的基础,也是第 21 章主题的 asyncio 包的基础。

本章的新内容

我将这一章从 Concurrency with Futures 重命名为 Concurrent Executors,因为 executors 是这里涵盖的最重要的高级功能。Futures 是低级对象,集中在““Where Are the Futures?”中,但在本章的其余部分中大部分是看不到的。

所有 HTTP 客户端示例现在都使用新的 HTTPX 库,该库提供同步和异步 API。由于多线程服务器添加到 Python 3.7 中的 http.server 包, “Downloads with Progress Display and Error Handling”中的实验设置现在更加简单。以前,标准库只有单线程的BaseHttpServer,不利于并发客户端的实验,所以在第一版中不得不借助于外部工具。

“使用 concurrent.futures 启动进程”现在演示了执行器如何简化我们在 “Code for the Multi-core Prime Checker”中看到的代码。

最后,我将大部分理论知识转移到新的第 19 章——Python 中的并发模型。

并发 Web 下载

并发性对于高效的网络 I/O 是必不可少的:应用程序应该在响应返回前做其他事情,而不是无所事事地等待远程机器。

为了用代码进行演示,我编写了三个简单的程序来从 Web 下载 20 个国家/地区的国旗图像。第一个,flags.py,按顺序运行:它下载一个图像并在本地保存后才请求下一个图像。另外两个脚本进行并发下载:它们几乎同时请求多个图像,每下载一个文件就保存一个文件。flags_threadpool.py 脚本使用 concurrent.futures 包,而 flags_asyncio.py 使用 asyncio。

示例 20-1 显示了运行三个脚本的结果,每个脚本 运行3 次。我还在 YouTube 上发布了一个 73 秒的视频,这样您就可以在 MacOS Finder 窗口中观看它们运行时显示它们保存的国旗图像文件。脚本从 fluentpython.com 下载图像,这个网站架设在CDN 后面,因此您可能会在第一次运行时看到较慢的结果。示例 20-1 中的结果是在多次运行后获得的,因为CDN此时已经有了缓存。

例 20-1。脚本 flags.py、flags_threadpool.py 和 flags_asyncio.py 得到的结果

$ python3 flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN  1
20 flags downloaded in 7.26s  2
$ python3 flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 7.20s
$ python3 flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 7.09s
$ python3 flags_threadpool.py
DE BD CN JP ID EG NG BR RU CD IR MX US PH FR PK VN IN ET TR
20 flags downloaded in 1.37s  3
$ python3 flags_threadpool.py
EG BR FR IN BD JP DE RU PK PH CD MX ID US NG TR CN VN ET IR
20 flags downloaded in 1.60s
$ python3 flags_threadpool.py
BD DE EG CN ID RU IN VN ET MX FR CD NG US JP TR PK BR IR PH
20 flags downloaded in 1.22s
$ python3 flags_asyncio.py  4
BD BR IN ID TR DE CN US IR PK PH FR RU NG VN ET MX EG JP CD
20 flags downloaded in 1.36s
$ python3 flags_asyncio.py
RU CN BR IN FR BD TR EG VN IR PH CD ET ID NG DE JP PK MX US
20 flags downloaded in 1.27s
$ python3 flags_asyncio.py
RU IN ID DE BR VN PK MX US IR ET EG NG BD FR CN JP PH CD TR  5
20 flags downloaded in 1.42s
  1. 每次运行的输出显示下载完毕的国家代码,并在结束时说明耗时。
  2. flags.py 下载 20 张图片平均需要 7.18 秒。
  3. flags_threadpool.py 的平均值为 1.40 秒。
  4. 对于 flags_asyncio.py平均用时1.35秒。
  5. 请注意国家/地区代码的顺序:对于并发脚本,每次下载的顺序都不相同。

并发脚本之间的性能差异并不显着,但它们都比顺序脚本快 5 倍以上——这仅适用于下载 20 个文件的小任务,每个文件只有几千字节。如果您将任务扩展到数百次下载,并发脚本的速度可能会超过顺序代码 20 倍或更多。

Warning:

在针对公共 Web 服务器测试并发 HTTP 客户端时,您可能会无意中发起拒绝服务 (DoS) 攻击,或者有这么做的嫌疑。在示例 20-1 的情况下,这样做是可以的,因为这些脚本被硬编码为仅发送20 个请求。我们将在本章后面使用 Python 的 http.server 包来运行测试。

现在让我们研究示例 20-1 中测试的两个脚本的实现:flags.py 和 flags_threadpool.py。我将把第三个脚本 flags_asyncio.py 留在第 21 章,但我想将这三个脚本一起演示以说明两点:

  1. 无论您使用何种并发构造——线程或协程——如果代码正确,您将看到网络 I/O 操作中相较于顺序代码的吞吐量大大提高。
  2. 对于可以控制自己发出多少请求的 HTTP 客户端,线程和协程之间的性能没有显着差异。

下面分析代码

顺序下载脚本

示例 20-2 包含 flags.py 的实现,这是我们在上面的示例 20-1 中运行的第一个脚本。这不是很有趣,但我们将重用其大部分代码和设置来实现并发脚本,因此值得关注。

Note:

为清楚起见,示例 20-2 中没有异常。我们稍后会处理异常,但在这里我想关注代码的基本结构,以便更容易地将此​​脚本与并发脚本进行对比。

例 20-2。 flags.py:顺序下载脚本;一些函数将被其他脚本重用

import time
from pathlib import Path
from typing import Callableimport httpx  1POP20_CC = ('CN IN US ID BR PK NG BD RU JP ''MX PH VN ET EG DE IR TR CD FR').split()  2BASE_URL = 'https://www.fluentpython.com/data/flags'  3
DEST_DIR = Path('downloaded')                         4def save_flag(img: bytes, filename: str) -> None:     5(DEST_DIR / filename).write_bytes(img)def get_flag(cc: str) -> bytes:  6url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()resp = httpx.get(url, timeout=6.1,       7follow_redirects=True)  8resp.raise_for_status()  9return resp.contentdef download_many(cc_list: list[str]) -> int:  10for cc in sorted(cc_list):                 11image = get_flag(cc)save_flag(image, f'{cc}.gif')print(cc, end=' ', flush=True)         12return len(cc_list)def main(downloader: Callable[[list[str]], int]) -> None:  13DEST_DIR.mkdir(exist_ok=True)                          14t0 = time.perf_counter()                               15count = downloader(POP20_CC)elapsed = time.perf_counter() - t0print(f'\n{count} downloads in {elapsed:.2f}s')if __name__ == '__main__':main(download_many)     16
  1. 导入httpx库;它不是标准库的一部分,所以按照惯例,在标准库模块后添加一个空行再导入。
  2. 20 个人口最多的国家的 ISO 3166 国家代码列表,按人口递减顺序排列。
  3. 获取国旗图像的网站
  4. 保存图像的本地目录。
  5. 将 img 字节保存到 DEST_DIR 中的filename
  6. 根据传入的国家代码,构建 URL 并下载图像,返回响应的二进制内容。
  7. 为网络操作添加合理的超时时间是一种很好的做法,以避免长时间阻塞。
  8. 默认情况下,HTTPX 不遵循重定向
  9. 此脚本中没有错误处理,但如果 HTTP 状态不在 2XX 范围内,则此方法会抛出异常 - 强烈建议避免静默失败。
  10. download_many 是与并发实现进行比较的关键函数。
  11. 按字母顺序循环遍历国家/地区代码列表,以便于查看输出中保留的排序;返回下载的国家代码数量。
  12. 在同一行中一次显示一个国家/地区代码,以便我们可以在每次下载时看​​到进度。end=' ' 参数替换了每行末尾的通常用空格字符打印的换行符,因此所有国家/地区代码都在同一行中渐进显示。需要 flush=True 参数,因为默认情况下,Python 输出是行缓冲的,这意味着 Python 仅在换行符后显示打印的字符。
  13. 必须将进行下载的函数作为参数传入 main;这样,我们可以将 main 作为库函数与threadpool和 ascyncio 示例中的其他 download_many 实现一起使用。
  14. 如果需要,创建 DEST_DIR;如果目录存在,则不抛出异常。
  15. 记录并报告运行download_many函数后经过的时间。
  16. 使用 download_many 函数作为参数调用 main。

TIP:

HTTPX 库的灵感来自 Pythonic requests 包,但建立在更现代的基础上。HTTPX 提供了同步和异步 API,因此我们可以在本章和下一章的所有 HTTP 客户端示例中使用它。Python 的标准库提供了 urllib.request 模块,但它的 API 只是同步的且对用户并不友好。

flags.py 真的没什么新东西。它用作比较其他脚本的基线,我将其用作库以避免在实现它们时出编写重复代码。现在让我们看一下使用 concurrent.futures 的重新实现。

使用 concurrent.futures 下载

concurrent.futures 包的主要功能是 ThreadPoolExecutor 和 ProcessPoolExecutor 类,它们实现了一个 API 能分别在不同的线程/进程执行可调用对象。这些类在内部管理工作线程或进程池,以及用于分发任务和收集结果的队列。但是这个接口的层级很高,对于像我们的下载国旗这样的简单用例,无需关心任何实现细节。

示例 20-3 显示了使用 ThreadPoolExecutor.map 方法并发实现下载的最简单方法。

例 20-3。 flags_threadpool.py:使用 futures.ThreadPoolExecutor 实现多线程下载的脚本

from concurrent import futuresfrom flags import save_flag, get_flag, main  1def download_one(cc: str):  2image = get_flag(cc)save_flag(image, f'{cc}.gif')print(cc, end=' ', flush=True)return ccdef download_many(cc_list: list[str]) -> int:with futures.ThreadPoolExecutor() as executor:         3res = executor.map(download_one, sorted(cc_list))  4return len(list(res))                                  5if __name__ == '__main__':main(download_many)  6
  1. 重用 flags 模块中的一些函数(示例 20-2)。
  2. 下载单个图像的函数;这是每个worker线程将执行的函数。
  3. 将 ThreadPoolExecutor 实例化为上下文管理器; executor.__exit__ 方法将调用 executor.shutdown(wait=True),它将阻塞直到所有线程完成。
  4. map 方法与内置的map 方法类似,只是download_one 函数会被多个线程并发调用;它返回一个生成器,您可以迭代该生成器以检索每个函数调用返回的值——在这种情况下,对 download_one 的每次调用都将返回一个国家/地区代码。
  5. 返回获得的结果数量;当列表构造函数中的隐式 next() 调用尝试从 executor.map 返回的迭代器中检索相应的返回值时,如果任何线程调用抛出异常,在这里会抛出这个异常。
  6. 从 flags 模块调用 main 函数,传递 download_many 的并发版本。

请注意,示例 20-3 中的 download_one 函数本质上是示例 20-2 中的 download_many 函数中 for 循环的主体。编写并发代码时经常这样重构:将顺序 for 循环的主体转换为要并发调用的函数。

TIP:

示例 20-3 非常简短,因为我能够重用顺序 flags.py 脚本中的大多数函数。 concurrent.futures 的最佳功能之一是可以方便的在遗留的顺序代码之上添加并发执行。

ThreadPoolExecutor 构造函数有几个未显示的参数,但第一个也是最重要的一个是 max_workers,设置要执行的最大工作线程数。当 max_workers 为 None(默认值)时,ThreadPoolExecutor 使用以下表达式决定其值——自 Python 3.8 起:

max_workers = min(32, os.cpu_count() + 4)

ThreadPoolExecutordocumentation中解释了基本原理:

        此默认值至少为 I/O 密集型任务保留 5 个工作线程。它最多使用 32 个 CPU 内核来执行释放 GIL 的 CPU 密集型任务。它避免在多核机器上隐式使用非常大的资源。

        ThreadPoolExecutor 现在在启动 max_workers 工作线程之前会重用空闲工作线程。

总结:max_workers 的计算默认值是合理的,并且 ThreadPoolExecutor 避免了不必要地启动新的 worker。了解 max_workers 背后的逻辑可能会帮助您决定何时以及如何自己设置最大线程数。

我们用的库名为 concurrency.futures,但在示例 20-3 中没有可以看到的futures,因此您可能想知道它们在哪里。下一节解释这个问题。

future在哪里

Futures 是 concurrent.futures 和 asyncio 的核心组件,但作为这些库的用户,我们有时看不到它们。示例 20-3 依赖于幕后的future,但我写的代码并没有直接使用它们。本节是对future的概述,并通过一个示例展示了它们的实际作用。

从 Python 3.4 开始,标准库中有两个名为 Future 的类: concurrent.futures.Future 和 asyncio.Future。它们具有相同的目的:任一 Future 类的实例表示可能已完成或可能未完成的延迟计算。这有点类似于 Twisted 中的 Deferred 类、Tornado 中的 Future 类和现代 JavaScript 中的 Promise。

Futures 封装待处理的操作,以便我们可以将它们放入队列中,检查它们是否完成,并在它们可用时获取结果(或异常)。

关于future要知道的一件重要的事情是你我不应该创建它们:它们旨在由并发框架进行实例化,无论是 concurrent.futures 还是 asyncio。原因如下:Future 代表最终会运行的事情,因此它必须被排期运行,这就是框架的工作。concurrent.futures.Future 实例仅作为使用 concurrent.futures.Executor 子类提交可调用对象执行的结果而创建。例如,Executor.submit() 方法接受一个可调用对象,对这个对象进行排期执行,并返回一个 Future。

客户端代码不应该改变future的状态:并发框架在future代表的计算完成时改变future的状态,我们无法控制这个计算何时发生。

两种类型的 Future 都有一个 .done() 方法,该方法是非阻塞的,并返回一个布尔值,告诉您由该 Future 包装的可调用对象是否已执行。然而,客户端代码通常会要求获得通知,而不是轮询future是否完成。这就是为什么两个 Future 类都有一个 .add_done_callback() 方法:传入一个可调用对象,当future完成时,这个可调用对象将future作为参数进行调用。请注意,回调一个可调用对象将在运行该函数的同一个工作线程或进程中运行。

还有一个 .result() 方法,当future完成时,它在两个类中的工作方式相同:它返回可调用对象的结果,或者重新抛出执行可调用时可能抛出的任何异常。但是,当 future未完成 时,两种风格的 Future 之间的result方法的行为非常不同。在 concurrency.futures.Future 实例中,调用 f.result() 将阻塞调用者的线程,直到结果计算完成。可以传递一个可选的timeout参数,如果future没有在指定的时间内完成,则结果方法会抛出 TimeoutError异常。asyncio.Future.result 方法不支持超时,而 await 是在 asyncio 中获取future结果的首选方式——但 await 不适用于 concurrency.futures.Future 实例。

两个库中的几个函数都返回future;其他函数以对用户透明的方式在他们的实现中使用future。后者的一个例子是我们在例 20-3 中看到的 Executor.map:它返回一个迭代器,其中 __next__ 调用每个 future 的 result 方法,所以我们得到了 future 的结果,而不是 future 本身。

为了从实践上了解future,我们可以重写示例 20-3 以使用 concurrent.futures.as_completed 函数,该函数接受future组成的可迭代对象并返回一个迭代器,该迭代器在future完成时产出future。

使用 futures.as_completed 只需要更改 download_many 函数。更高级别的 executor.map 调用被两个 for 循环替换:一个用于创建和编排future,另一个用于检索其结果。在此期间,我们将添加一些打印调用以显示每个future完成前后的状态。示例 20-4 显示了新的 download_many 函数的代码。download_many 的代码从 5 行增加到 17 行,但现在我们可以一窥神秘的future。其余功能与示例 20-3 中的相同。

例 20-4。 flags_threadpool_futures.py:在download_many函数中用executor.submit和futures.as_completed替换executor.map

def download_many(cc_list: list[str]) -> int:cc_list = cc_list[:5]  1with futures.ThreadPoolExecutor(max_workers=3) as executor:  2to_do: list[futures.Future] = []for cc in sorted(cc_list):  3future = executor.submit(download_one, cc)  4to_do.append(future)  5print(f'Scheduled for {cc}: {future}')  6for count, future in enumerate(futures.as_completed(to_do), 1):  7res: str = future.result()  8print(f'{future} result: {res!r}')  9return count
  1. 对于此演示,仅使用前五个人口最多的国家
  2. 将 max_workers 设置为 3,以便我们可以在输出中看到挂起的future。
  3. 按字母顺序遍历国家/地区代码,以明确结果的顺序是无序的。
  4. executor.submit 调度要执行的可调用对象,并返回表示这个待执行操作的future。

  5. 存储每个future,以便我们以后可以使用 as_completed进行检索。

  6. 显示带有国家代码和对应的future的信息。

  7. as_completed 在完成时产出future。

  8. 得到这个future的结果。

  9. 打印future和他的结果

请注意,在此示例中,future.result() 调用永远不会阻塞,因为 future 来自 as_completed。示例 20-5 显示了示例 20-4 的一次运行的输出。

$ python3 flags_threadpool_futures.py
Scheduled for BR: <Future at 0x100791518 state=running>  1
Scheduled for CN: <Future at 0x100791710 state=running>
Scheduled for ID: <Future at 0x100791a90 state=running>
Scheduled for IN: <Future at 0x101807080 state=pending>  2
Scheduled for US: <Future at 0x101807128 state=pending>
CN <Future at 0x100791710 state=finished returned str> result: 'CN'  3
BR ID <Future at 0x100791518 state=finished returned str> result: 'BR'  4
<Future at 0x100791a90 state=finished returned str> result: 'ID'
IN <Future at 0x101807080 state=finished returned str> result: 'IN'
US <Future at 0x101807128 state=finished returned str> result: 'US'5 downloads in 0.70s
  1. future按字母顺序排列;future的 repr() 显示其状态:前三个正在运行,因为有三个工作线程。
  2. 最后两个future正在等待中,等待工作线程
  3. 这里的第一个 CN 是一个工作线程中的 download_one 的输出;该行的其余部分是 download_many 的输出。
  4. 之前的两个线程输出国家代码,这里主线程中download_many函数显示第一个线程的结果。

TIP:

我建议您去自己试验 flags_threadpool_futures.py。如果您多次运行它,您会看到结果的顺序有所不同。将 max_workers 增加到 5 将增加结果顺序的变化。将max_workers 减少为 1 将使此脚本按顺序运行,并且结果的顺序将始终是提交调用的顺序。

我们看到了使用 concurrent.futures 的下载脚本的两种变体:使用 ThreadPoolExecutor.map 的示例 20-3 和使用 futures.as_completed 的示例 20-4。如果您对 flags_asyncio.py 的代码感到好奇,您可以查看第 21 章中的示例 21-3,在那里对其进行了解释。

现在让我们简单地看一下使用 concurrent.futures 解决 CPU 密集型任务的 GIL 的简单方法。

使用 concurrent.futures 启动进程

concurrent.futures documentation page 的副标题是“执行并行任务”。该包支持在多核机器上进行并行计算,因为它支持使用 ProcessPoolExecutor 类在多个 Python 进程之间分配工作。

ProcessPoolExecutor 和 ThreadPoolExecutor 都实现了 Executor 接口,因此使用 concurrent.futures 很容易从基于线程的解决方案切换到基于进程的解决方案。

将 ProcessPoolExecutor 用于国旗下载的示例或任何 I/O 密集型任务没有任何优势。验证这一点很容易;只需更改示例 20-3 中的这些行:

def download_many(cc_list: list[str]) -> int:with futures.ThreadPoolExecutor() as executor:

改成这样:

def download_many(cc_list: list[str]) -> int:with futures.ProcessPoolExecutor() as executor:

ProcessPoolExecutor 的构造函数还有一个默认为 None 的 max_workers 参数。在这种情况下,执行程序将worker进程的数量设置为 os.cpu_count() 返回的数量。

与线程相比,进程使用更多内存并且启动时间更长,因此 ProcessPoolExecutor 的真正价值在于 CPU 密集型任务。让我们回到“A Homegrown Process Pool”的素数测试示例,用 concurrent.futures 重写它。

最终版的多核素数检查

在“多核素数检查器的代码”中,我们研究了 procs.py,这是一个使用多处理检查一些大的数字是否为素数的脚本。在示例 20-6 中,我们使用 ProcessPoolExecutor 解决了 proc_pool.py 程序中的相同问题。从第一次导入到最后的 main() 调用,procs.py 有 43 行非空白代码,而 proc_pool.py 缩短了 28%-31%左右。

例 20-6。 proc_pool.py: procs.py 用 ProcessPoolExecutor 重写

import sys
from concurrent import futures  1
from time import perf_counter
from typing import NamedTuplefrom primes import is_prime, NUMBERSclass PrimeResult(NamedTuple):  2n: intflag: boolelapsed: floatdef check(n: int) -> PrimeResult:t0 = perf_counter()res = is_prime(n)return PrimeResult(n, res, perf_counter() - t0)def main() -> None:if len(sys.argv) < 2:workers = None      3else:workers = int(sys.argv[1])executor = futures.ProcessPoolExecutor(workers)  4actual_workers = executor._max_workers  # type: ignore  5print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')t0 = perf_counter()numbers = sorted(NUMBERS, reverse=True)  6with executor:  7for n, prime, elapsed in executor.map(check, numbers):  8label = 'P' if prime else ' 'print(f'{n:16}  {label} {elapsed:9.6f}s')time = perf_counter() - t0print(f'Total time: {time:.2f}s')if __name__ == '__main__':main()
  1. 无需导入 multiprocessing、SimpleQueue 等; concurrent.futures不需要这些导入类型。
  2. PrimeResult 元组和check函数与我们在 procs.py 中看到的相同,但我们不再需要队列和worker函数。
  3. 如果没有给出命令行参数,我们不是自己决定要使用多少worker进程,而是将workers设置为 None 并让 ProcessPoolExecutor 决定。
  4. 在这里,我在 ➐ 中的 with 块之前构建了 ProcessPoolExecutor,以便我可以在下一行显示实际的worker进程数量。
  5. _max_workers 是 ProcessPoolExecutor 的一个未公开的实例属性。当workers变量为None时,我使用这个变量来显示worker的数量; mypy 在我访问它时正确地报警,所以我在后面加上了 type: ignore 评论以使其静音。
  6. 按降序对要检查的数字进行排序。这将暴露 proc_pool.py与 procs.py 相比 的行为差异。见下文。
  7. 将executor用作上下文管理器。
  8. executor.map 调用以和 numbers 参数相同的顺序返回 check 返回的 PrimeResult 实例。

如果您运行示例 20-6,您将看到结果是绝对的降序排列,如示例 20-7 所示。相比之下,procs.py 的输出顺序(如“基于进程的解决方案”所示)在很大程度上受到检查每个数字是否为素数的复杂度的影响。例如,procs.py 在顶部附近显示 777777777777777 的结果,因为它可以被一个较小的7整除,所以 is_prime 很快确定它不是素数。相比之下,7777777536340681 是 88191709的平方, 所以 is_prime 将需要更长的时间来确定它是一个合数,甚至需要更长的时间才能发现 7777777777777753 是素数——因此这两个数字都出现在 procs.py 输出的末尾附近。

运行 proc_pool.py,你不仅会看到结果按照降序排列,还会看到程序在显示 9999999999999999 的结果后似乎卡住了。

例 20-7。 proc_pool.py 的输出

$ ./proc_pool.py
Checking 20 numbers with 12 processes:
9999999999999999     0.000024s  1
9999999999999917  P  9.500677s  2
7777777777777777     0.000022s  3
7777777777777753  P  8.976933s
7777777536340681     8.896149s
6666667141414921     8.537621s
6666666666666719  P  8.548641s
6666666666666666     0.000002s
5555555555555555     0.000017s
5555555555555503  P  8.214086s
5555553133149889     8.067247s
4444444488888889     7.546234s
4444444444444444     0.000002s
4444444444444423  P  7.622370s
3333335652092209     6.724649s
3333333333333333     0.000018s
3333333333333301  P  6.655039s299593572317531  P  2.072723s142702110479723  P  1.461840s2  P  0.000001s
Total time: 9.65s
  1. 这行出现得很快。
  2. 此行需要 9.5 秒以上才能显示出来。
  3. 所有剩余的行几乎立即显示出现。

下面是 proc_pool.py 这种行为的原因:

  • 如前所述, executor.map(check, numbers) 以与numbers相同的顺序返回结果。
  • 默认情况下,proc_pool.py 使用与 CPU 一样多的工作线程——这就是当 max_workers 为 None 时 ProcessPoolExecutor 所做的。这台笔记本电脑中有 12 个进程。
  • 因为我们按降序提交数字,第一个是 9999999999999999,这个数可以被9整除,所以很快就可以返回
  • 第二个数字是 9999999999999917,样本中最大的素数。它的检查将比所有其他检查需要更长的时间。
  • 同时,其余 11 个进程将检查其他数字,这些数字要么是质数,要么是具有大因数的合数,要么是具有非常小的因数的合数。
  • 当负责 9999999999999917 的worker进程最终确定这是一个质数时,所有其他进程都完成了他们最后的工作,因此结果紧随其后立刻打印出来。

Note:

尽管 proc_pool.py 的执行过程不像 procs.py 那样可见,但对于相同数量的工作线程和 CPU 内核,总体执行时间实际上与图 19-2 中所示相同。

理解并发程序的行为并不简单,所以这里是第二个实验,可以帮助您可视化 Executor.map 的操作。

对 Executor.map 进行试验

让我们研究 Executor.map,现在使用 三个工作线程的ThreadPoolExecutor ,运行五个输出带时间戳消息的可调用对象。代码在示例 20-8 中,输出在示例 20-9 中。

例 20-8。 demo_executor_map.py:简单演示ThreadPoolExecutor的map方法

from time import sleep, strftime
from concurrent import futuresdef display(*args):  1print(strftime('[%H:%M:%S]'), end=' ')print(*args)def loiter(n):  2msg = '{}loiter({}): doing nothing for {}s...'display(msg.format('\t'*n, n, n))sleep(n)msg = '{}loiter({}): done.'display(msg.format('\t'*n, n))return n * 10  3def main():display('Script starting.')executor = futures.ThreadPoolExecutor(max_workers=3)  4results = executor.map(loiter, range(5))  5display('results:', results)  6display('Waiting for individual results:')for i, result in enumerate(results):  7display(f'result {i}: {result}')if __name__ == '__main__':main()
  1. 这个函数作用很简单,打印传入的任何参数,并在前面加上 [HH:MM:SS] 格式的时间戳。
  2. loiter 除了在启动时打印一条消息,随后休眠 n 秒,然后在结束时打印一条消息外,什么都不做;制表符用于根据 n 的值所谓缩进量缩进消息。
  3. loiter 返回 n * 10 以便我们可以看到如何收集结果。
  4. 创建一个具有三个线程的 ThreadPoolExecutor。
  5. 向executor提交五个任务。由于只有三个线程,其中只有三个任务会立即启动:调用 loiter(0)、loiter(1) 和 loiter(2));这是一个非阻塞调用。
  6. 立即显示调用 executor.map 的结果:它是一个生成器,如示例 20-9 中的输出所示。
  7. for 循环中的 enumerate 调用将隐式调用 next(results),而 next(results) 又将在(内部)_f future 上调用 _f.result(),代表第一个调用 的结果loiter(0)。result 方法将阻塞,直到future完成,因此此循环中的每次迭代都必须等待下一个结果准备就绪。

我建议您运行示例 20-8 并查看结果逐渐显示出来。在此过程中,还可以修改 ThreadPoolExecutor 的 max_workers 参数以及为 executor.map 方法中 range 函数的参数,或者自己挑选几个值,以列表形式传给map方法,以得到不同的延迟。

示例 20-9 显示了示例 20-8 的示例运行。

$ python3 demo_executor_map.py
[15:56:50] Script starting.  1
[15:56:50] loiter(0): doing nothing for 0s...  2
[15:56:50] loiter(0): done.
[15:56:50]      loiter(1): doing nothing for 1s...  3
[15:56:50]              loiter(2): doing nothing for 2s...
[15:56:50] results: <generator object result_iterator at 0x106517168>  4
[15:56:50]                      loiter(3): doing nothing for 3s...  5
[15:56:50] Waiting for individual results:
[15:56:50] result 0: 0  6
[15:56:51]      loiter(1): done. 7
[15:56:51]                              loiter(4): doing nothing for 4s...
[15:56:51] result 1: 10  8
[15:56:52]              loiter(2): done.  9
[15:56:52] result 2: 20
[15:56:53]                      loiter(3): done.
[15:56:53] result 3: 30
[15:56:55]                              loiter(4): done.  10
[15:56:55] result 4: 40
  1. 本次运行于 15:56:50 开始。
  2. 第一个线程执行 loiter(0),所以它会休眠 0s ,有时甚至会在第二个线程启动之前返回,不过具体情况因人而异
  3. loiter(1)和loiter(2)立即启动(因为线程池有3个worker,可以并发运行3个函数)。
  4. 这说明executor.map返回的结果是一个生成器;到目前为止,无论任务数量是多少和 max_workers 的值是多少,都不会阻塞。
  5. 因为 loiter(0) 已经完成,第一个 worker 现在可以启动 loiter(3) 的第四个线程。
  6. 这是执行可能会阻塞的地方,具体取决于给 loiter 调用的参数:results生成器的 __next__ 方法必须等到第一个 future 完成。此时不会阻塞,因为在此循环开始之前对 loiter(0) 的调用已完成。请注意,到目前为止,一切都发生在同一秒内:15:56:50。
  7. loiter(1) 在一秒钟后完成,在 15:56:51。线程被释放以启动 loiter(4)。
  8. loiter(1) 的结果如下所示: 10. 现在 for 循环将阻塞等待 loiter(2) 的结果。
  9. 同上:loiter(2) 完成,显示结果20;loiter(3) 也一样。
  10. 在 loiter(4) 完成之前有 2 秒的延迟,因为它从 15:56:51 开始并且在 4 秒内什么也没做。

Executor.map 函数易于使用,但通常最好在结果准备好时获取它们,而不用关心提交顺序。为此,我们需要将 Executor.submit 方法和 futures.as_completed 函数结合起来,如示例 20-4 中所见。我们将在“使用 futures.as_completed”中讨论这个技术。

TIP:

executor.submit 和 futures.as_completed 的组合比 executor.map 更灵活,因为您可以提交不同的可调用对象和参数,而 executor.map 旨在在不同的参数上运行相同的可调用对象。此外,您传递给 futures.as_completed 的一组future可能来自多个executer程序——也许有些是由 ThreadPoolExecutor 实例创建的,而另一些则来自 ProcessPoolExecutor。

在下一节中,我们将根据最新的需求继续实现下载国旗的示例,这将需要我们迭代 futures.as_completed 的结果,而不是使用 executor.map。

显示下载进度并处理错误

如前所述,“并发 Web 下载”中的脚本没有进行异常处理,使它们更易于阅读并对比了三种方法的结构:顺序、线程和异步。

为了测试对各种异常情况的处理,我创建了flags2示例:

flags2_common.py:

该模块包含所有 flags2 示例使用的通用函数和设置,包括一个main函数,负责命令行解析、计时和报告结果。这些脚本中的代码是提供支持的,与本章的主题没有直接关系,所以我不会在这里列出源代码,但您可以在 fluentpython/example-code-2e 存储库中阅读:20-executors/getflags/flags2_common. py。

flags2_sequential.py:

具有正确错误处理和进度条显示的顺序 HTTP 客户端。它的 download_one 函数也被 flags2_threadpool.py 使用。

flags2_threadpool.py:

并发HTTP客户端基于futures.ThreadPoolExecutor来演示错误处理和进度条的集成。

flags2_asyncio.py:

与前面的示例功能相同,但使用 asyncio 和 httpx 实现。这将在第 21 章的“增强 asyncio 下载器”中介绍。

测试并发客户端时要小心

在公共 Web 服务器上测试并发 HTTP 客户端时,您可能每秒生成许多请求,这就是拒绝服务 (DoS) 攻击的方式。我们的目的不是攻击任何人而是学习如何开发高性能的客户端。在访问公共服务器时小心地限制您的客户端。为了进行测试,最好在本地 假设HTTP 服务器。有关说明,请参阅“Setting up test servers” 。

flags2 示例最明显的特征是它们具有使用tqdm package实现的动画文本模式进度条。我在 YouTube 上发布了一个 108 秒的视频来显示进度条并对比三个 flags2 脚本的速度。在视频中,我从顺序下载开始,但我在 32 秒后中断它,因为访问 676 个 URL 并获得 194 个国旗需要 5 多分钟;然后我将多线程和异步脚本各运行 3 次,每次它们在 6 秒或更短的时间内完成工作(即快 60 倍以上)。图 20-1 显示了两个屏幕截图: flags2_threadpool.py 脚本运行中和运行后。

最简单的 tqdm 示例出现在项目 README.md 中的动画 .gif 中。如果您在安装 tqdm 包后在 Python 控制台中键入以下代码,您将看到一个动画进度条,注释是: 

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(1000)):
...     time.sleep(.01)
...
>>> # -> progress bar will appear here <-

除了灵巧的效果,tqdm 函数在概念上也很有趣:

它可以处理任何可迭代对象并生成一个迭代器,当迭代器被消费时,显示进度条并估计完成所有迭代的剩余时间。要计算该估计值,tqdm 需要获得一个具有 len 的可迭代对象,或者另外接收带有预期项数的 total= 参数。将 tqdm 与我们的 flags2 示例集成可以让我们深入了解并发脚本的实际工作方式,因此我们必须使用 futures.as_completed 和 asyncio.as_completed 函数,以便 tqdm 可以在每个 future 完成时显示进度。

flags2 示例的另一个功能是命令行界面。所有三个脚本都接受相同的选项,您可以通过使用 -h 选项运行任何脚本来查看帮助文本。示例 20-10 显示了帮助文本。

例 20-10。 flags2 系列脚本的帮助文档

$ python3 flags2_threadpool.py -h
usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL][-v][CC [CC ...]]Download flags for country codes. Default: top 20 countries by population.positional arguments:CC                    country code or 1st letter (eg. B for BA...BZ)optional arguments:-h, --help            show this help message and exit-a, --all             get all available flags (AD to ZW)-e, --every           get flags for every possible code (AA...ZZ)-l N, --limit N       limit to N first codes-m CONCURRENT, --max_req CONCURRENTmaximum concurrent requests (default=30)-s LABEL, --server LABELServer to hit; one of DELAY, ERROR, LOCAL, REMOTE(default=LOCAL)-v, --verbose         output detailed progress info

所有参数都是可选的。但是 -s/--server 对于测试是必不可少的:它允许您选择在测试中使用的 HTTP 服务器和端口。这个选项的值可以设为下面4个字符串(不区分大小写),以确定脚本将在哪里下载国旗:

LOCAL

使用 http://localhost:8000/flags;这是默认设置。您应该配置一个本地 HTTP 服务器以响应 8000 端口的请求。请参阅下面的说明。

REMOTE

使用 http://fluentpython.com/data/flags;那是我搭建的公共网站,托管在共享服务器上。请不要用太多的并发请求来处理它。fluentpython.com 域由 Cloudflare CDN(Content Delivery Network)的一个免费账号管理,因此您可能会注意到第一次下载速度较慢,但​​当 CDN 缓存生效后请求会变得更快。

DELAY

使用 http://localhost:8001/flags;延迟 HTTP 响应的服务器应该监听端口 8001。我编写了 slow_server.py 以使其更易于实验。您可以在 Fluent Python 2e 代码库的 20-futures/getflags/ 目录中找到它。请参阅下面的说明。

ERROR

使用 http://localhost:8002/flags;返回一些 HTTP 错误的服务器需要监听端口 8002。接下来进行说明。


架设测试服务器

如果您没有用于测试的本地 HTTP 服务器,我在 fluentpython/example-code-2e 仓库中的 20-executors/getflags/README.adoc 中编写了设置说明,只支持 Python ≥ 3.9(无外部库)。简而言之,README.adoc 描述了如何搭建服务:

python3 -m http.server

端口8000上的LOCAL服务器;

python3 slow_server.py

端口 8001上的DELAY服务器, 为每次服务加了0.5s到5s的随机延迟;

python3 slow_server.py 8002 --error-rate .25

端口 8002 上的 ERROR 服务器,除了随机延迟外,还有 25% 的机会返回 418 I'm a teapot 错误响应。


默认情况下,每个 flags2*.py 脚本将使用默认的并发连接数从 LOCAL 服务器 (http://localhost:8000/flags) 中获取 20 个人口最多的国家的国旗,每个脚本的并发连接数都不同。示例 20-11 显示了使用所有默认值的 flags2_sequential.py 脚本的示例运行。要运行它,您需要使用本地服务器,如 “Be Careful When Testing Concurrent Clients”中所述。

例 20-11。使用所有默认值运行 flags2_sequential.py:LOCAL服务器、人口最多的前20个国家的国旗、1 个并发连接

$ python3 flags2_sequential.py
LOCAL site: http://localhost:8000/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.
--------------------
20 flags downloaded.
Elapsed time: 0.10s

您可以通过多种方式选择要下载的国旗。示例 20-12 展示了如何下载国家代码以字母 A、B 或 C 开头的所有国家的国旗。

例 20-12。运行 flags2_threadpool.py 以从 DELAY 服务器获取带有国家/地区代码前缀 A、B 或 C 的所有国旗

$ python3 flags2_threadpool.py -s DELAY a b c
DELAY site: http://localhost:8001/flags
Searching for 78 flags: from AA to CZ
30 concurrent connections will be used.
--------------------
43 flags downloaded.
35 not found.
Elapsed time: 1.72s

不管如何选择国家/地区代码,都可以使用 -l/--limit 选项来限制要获取的国旗的数量。示例 20-13 演示了如何恰好运行 100 个请求,将 -a 选项与 -l 100 结合使用以获取所有国旗。

例 20-13。运行 flags2_asyncio.py 从 ERROR 服务器获取 100 个国旗(-al 100),一共100次并发请求(-m 100)

$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8002/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
--------------------
73 flags downloaded.
27 errors.
Elapsed time: 0.64s

这是 flags2 示例的用户界面。让我们看看它们是如何实现的。

flags2系列 示例中的错误处理

所有三个示例中处理 HTTP 错误的通用策略是由负责下载单个文件的函数 (download_one) 处理404 错误(Not Found)。任何其他异常都会传播以由 download_many 函数或supervisor协程处理(在 asyncio 示例中)。

再一次,我们将从研究顺序代码开始,它更容易理解——并且大部分都由线程池脚本重用。示例 20-14 显示了在 flags2_sequential.py 和 flags2_threadpool.py 脚本中执行实际下载的函数。

例 20-14。 flags2_sequential.py:负责下载的基本函数;两者都在 flags2_threadpool.py 中重用

from collections import Counter
from http import HTTPStatusimport httpx
import tqdm  # type: ignore  1from flags2_common import main, save_flag, DownloadStatus  2DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1def get_flag(base_url: str, cc: str) -> bytes:url = f'{base_url}/{cc}/{cc}.gif'.lower()resp = httpx.get(url, timeout=3.1, follow_redirects=True)resp.raise_for_status()  3return resp.contentdef download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:try:image = get_flag(base_url, cc)except httpx.HTTPStatusError as exc:  4res = exc.responseif res.status_code == HTTPStatus.NOT_FOUND:status = DownloadStatus.NOT_FOUND  5msg = f'not found: {res.url}'else:raise  6else:save_flag(image, f'{cc}.gif')status = DownloadStatus.OKmsg = 'OK'if verbose:  7print(cc, msg)return status
  1. 导入tqdm进度条显示库,告诉mypy跳过检查
  2. 从 flags2_common 模块导入几个函数和一个Enum。
  3. 如果 HTTP 状态码不在范围 (200, 300) 内,则抛出 HTTPStatusError。
  4. download_one 捕获 HTTPstatusError 来专门处理 HTTP 代码 404...
  5. ...通过将其本地状态设置为 DownloadStatus.NOT_FOUND; DownloadStatus 是从 flags2_common.py 导入的 Enum。
  6. 任何其他 HTTPstetusError 异常都会重新抛出冒泡给调用者。
  7. 如果设置了 -v/--verbose 命令行选项,则显示国家代码和状态消息;这就是您在详细模式下看到进度信息。

示例 20-15 列出了 download_many 函数的顺序版本。这段代码很简单,但值得研究和即将出现的并发版本进行对比。关注它如何报告进度、处理错误和统计下载量。

def download_many(cc_list: list[str],base_url: str,verbose: bool,_unused_concur_req: int) -> Counter[DownloadStatus]:counter: Counter[DownloadStatus] = Counter()  1cc_iter = sorted(cc_list)  2if not verbose:cc_iter = tqdm.tqdm(cc_iter)  3for cc in cc_iter:try:status = download_one(cc, base_url, verbose)  4except httpx.HTTPStatusError as exc:  5error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'error_msg = error_msg.format(resp=exc.response)except httpx.RequestError as exc:  6error_msg = f'{exc} {type(exc)}'.strip()except KeyboardInterrupt:  7breakelse:  8error_msg = ''if error_msg:status = DownloadStatus.ERROR  9counter[status] += 1           10if verbose and error_msg:      11print(f'{cc} error: {error_msg}')return counter  12
  1. 这个Counter将记录不同的下载结果:DownloadStatus.OK、DownloadStatus.NOT_FOUND 或 DownloadStatus.ERROR。
  2. cc_iter 保存作为参数接收的国家代码列表,按字母顺序排列。
  3. 如果不是在详细模式下运行,cc_iter 将传递给 tqdm,它返回一个迭代器,生成 cc_iter 中的项目,同时还为进度条设置动画。
  4. 连续调用 download_one。
  5. 由 get_flag 抛出但未由 download_one 处理的 HTTP 状态码异常在此处处理。
  6. 其他与网络相关的异常在这里处理。任何其他异常都会中止脚本,因为调用 download_many 的 flags2_common.main 函数没有 try/except。
  7. 如果用户按下 CTRL-C,则退出循环
  8. 如果 download_one 没有发生异常,则清除error_message。
  9. 如果出现错误异常,把局部变量status设置为相应的状态
  10. Counter中这个status的值+1。
  11. 在详细模式下,显示当前国家/地区代码的错误消息(如果有)
  12. 返回Counter,以便 main 可以显示最终报告中的数字。

我们现在将研究重构的线程池示例 flags2_threadpool.py。

使用 futures.as_completed函数

为了集成 tqdm 进度条并处理每个请求的错误,flags2_threadpool.py 脚本使用了 futures.ThreadPoolExecutor 和我们已经看到的 futures.as_completed 函数。示例 20-16 是 flags2_threadpool.py 的完整代码列表。只实现了download_many函数;其他函数从 flags2_common.py 和 flags2_sequential.py 中重用。

例 20-16。 flags2_threadpool.py:完整代码列表

from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completedimport httpx
import tqdm  # type: ignorefrom flags2_common import main, DownloadStatus
from flags2_sequential import download_one  1DEFAULT_CONCUR_REQ = 30  2
MAX_CONCUR_REQ = 1000  3def download_many(cc_list: list[str],base_url: str,verbose: bool,concur_req: int) -> Counter[DownloadStatus]:counter: Counter[DownloadStatus] = Counter()with ThreadPoolExecutor(max_workers=concur_req) as executor:  4to_do_map = {}  5for cc in sorted(cc_list):  6future = executor.submit(download_one, cc,base_url, verbose)  7to_do_map[future] = cc  8done_iter = as_completed(to_do_map)  9if not verbose:done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  10for future in done_iter:  11try:status = future.result()  12except httpx.HTTPStatusError as exc:  13error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'error_msg = error_msg.format(resp=exc.response)except httpx.RequestError as exc:error_msg = f'{exc} {type(exc)}'.strip()except KeyboardInterrupt:breakelse:error_msg = ''if error_msg:status = DownloadStatus.ERRORcounter[status] += 1if verbose and error_msg:cc = to_do_map[future]  14print(f'{cc} error: {error_msg}')return counterif __name__ == '__main__':main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
  1. 重用 flags2_sequential 中的 download_one(示例 20-14)。
  2. 如果没有给出 -m/--max_req 命令行选项,这将是最大并发请求数,实现为线程池的大小;如果要下载的国旗数量较少,则实际数量可能较小。
  3. MAX_CONCUR_REQ 限制并发请求的最大数量,无视要下载的国旗数量或 -m/--max_req 命令行选项;这是一种安全预防措施,可避免启动过多具有显着内存开销的线程。
  4. 创建执行器,将 max_workers 设置为 concur_req,由main函数计算为:MAX_CONCUR_REQ、cc_list 的长度和 -m/--max_req 命令行选项的值中的较小者。这避免了创建不必要的线程。
  5. 这个 dict 将每个 Future 实例(代表一次下载)映射到相应的国家代码,在错误报告中使用。
  6. 按字母顺序遍历国家/地区代码列表。结果的顺序将取决于 HTTP 响应的时间,但是如果线程池的大小(由 concur_req 给出)远小于 len(cc_list),您可能会注意到按字母顺序分批下载的现象。
  7. 每次调用 executor.submit 都会调度一个可调用对象的执行并返回一个 Future 实例。第一个参数是可调用的对象,其余的是这个可调用对象接收的参数。
  8. 将future和国家/地区代码存储在 dict 中。
  9. futures.as_completed 返回一个迭代器,该迭代器在每个任务完成时产出future。
  10. 如果不是verbose模式,将as_completed的结果传入tqdm函数以显示进度条;因为 done_iter 没有 len,我们必须告诉 tqdm 作为 total= 参数的预期项数是多少,所以 tqdm 可以估计剩余的工作。
  11. 迭代运行结束后的future。
  12. 在future调用 result 方法要么返回可调用对象返回的值,要么引发在执行可调用对象时捕获的任何异常。此方法可能会阻塞等待结果,但在此示例中不会,因为 as_completed 仅返回已完成的future。
  13. 处理可能出现的异常;此函数的其余部分与示例 20-15 中的顺序 download_many 相同,除了下一个标注。
  14. 要为错误消息提供上下文,使用当前future作为键从 to_do_map 中检索国家/地区代码。在顺序版本中不是必需的,因为我们正在迭代国家代码列表,所以我们知道当前的 cc;而这里,我们正在迭代future。

TIP:

示例 20-16 使用了一个对 futures.as_completed 非常有用的习惯:构建一个 dict 来将每个 future 映射到其他可能在 future 完成时有用的数据。这里 to_do_map 将每个future映射到分配给它的国家/地区代码。这使得对future的结果进行后续处理变得容易,尽管它们是无序生成的。

Python 线程非常适合 I/O 密集型应用程序,并且 concurrent.futures 包使其在某些用例中使用起来相对简单。使用 ProcessPoolExecutor,您还可以解决多核上的 CPU 密集型问题——如果计算是“embarrassingly parallel”.。我们对 concurrent.futures 的基本介绍到此结束。

这篇关于第二十章 Concurrent Executors的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/390822

相关文章

Python 中考虑 concurrent.futures 实现真正的并行计算

Python 中考虑 concurrent.futures 实现真正的并行计算 思考,如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。 Python 的全局解释器锁(global interpreter lock,GIL)导致没办法用线程来实现真正的并行​,所以先把这种方案排除掉。另一种常见的方案,是把那些对性能要求比较高的(performance-critica

java多线程学习--java.util.concurrent

题记:util和concurrent 包是后续重点先看的和学习的模块 原文地址:http://www.cnblogs.com/sunhan/p/3817806.html   CountDownLatch,api 文档:http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

C++ tbb::concurrent_hash_map怎么用

Intel TBB 提供高并发的容器类,Windows或者Linux线程能使用这些容器类或者和基于task编程相结合(TBB)。 concurrent_hash_map<Key,T,HashCompare>是一个hash表,允许并行访问,表是一个从Key到类型T的映射,类型HashCompare定义怎样hash一个Key和怎样比较2个Key。 concurrent_hash_map i

C++ tbb 并发容器适用场景 concurrent_set concurrent_map concurrent_queue

比如 tbb::concurrent_set is a class template that represents an unordered sequence of unique elements. It supports concurrent insertion, lookup and traversal, but does not support concurrent erasure.

第二十章 rust多平台编译

注意 本系列文章已升级、转移至我的自建站点中,本章原文为:rust多平台编译 目录 注意一、前言二、跨平台代码编写三、跨平台编译四、静态编译 一、前言 相比于C/C++,rust最方便的一个东西除了包管理之外,便是跨平台编译了。 rust提供了一系列的工具可以帮助我们非常容易的完成跨平台编译的目的。 但经过我的实际使用经验来说,你想要实现在单一的物理机上跨平台编译其它系

concurrent库学习之ThreadPoolExecutor模块

concurrent库学习之ThreadPoolExecutor模块 一、简介 concurrent.futures.ThreadPoolExecutor 是 Python 标准库中的一个模块,用于管理线程池并行执行任务。它提供了一种高层次的接口来启动和管理线程,简化了并发编程的复杂性。 二、语法和参数 语法 concurrent.futures.ThreadPoolExecuto

python并发与并行(十二) ———— 考虑用concurrent.futures实现真正的并行计算

有些Python程序写到一定阶段,性能就再也上不去了。即便优化了代码,程序的执行速度可能还是达不到要求。考虑到现在的计算机所装配的CPU核心数量越来越多,所以我们很自然地就想到用并行方式来解决这个问题。那么接下来就必须思考,如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。 Python的全局解释器锁(global interpreter lock,GIL)导致我们没

java通过Executors创建多线程

java 线程池创建多线程 多线程开发,我这里就不说理论直接上代码。 java通过Executors可以创建五种方式的对线程: 第一种: 创建一个单线程。 第二种: 创建一个可变的多线程。: 第三种: 创建一个固定线程数的多线程。 第四种:创建一个定时的多线程。 第五种:创建可以调参数的多线程。 四种线程的创建方式: //创建固定的线程池executorService = Executors

python——concurrent.futures

concurrent.futures 是 Python 标准库中用于并行编程的高级模块,它提供了一种高级别的接口来管理线程和进程。通过这个模块,你可以轻松地利用多线程和多进程来并行执行任务,进而提高程序的执行效率。 1. concurrent.futures 概述 concurrent.futures 提供了两种执行器类型: ThreadPoolExecutor:用于管理线程池。Proces

《第二十章 多媒体 - 图形绘制》

《第二十章 多媒体 - 图形绘制》 在 Android 开发中,图形绘制是创建丰富用户界面和实现独特视觉效果的关键技能。本章将深入探讨图形绘制的两个重要方面:Canvas 绘图基础和自定义 View 绘图。 一、图形绘制在 Android 中的重要性 图形绘制能够让应用展现出独特的视觉效果,增强用户体验。无论是创建游戏界面、设计数据可视化图表,还是构建个性化的用户界面组件,都离不