python并发与并行(十一) ———— 让asyncio的事件循环保持畅通,以便进一步提升程序的响应能力

本文主要是介绍python并发与并行(十一) ———— 让asyncio的事件循环保持畅通,以便进一步提升程序的响应能力,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前一篇blog说明了怎样把采用线程所实现的项目逐步迁移到asyncio方案上面。迁移后的run_tasks协程,可以将多份输入文件通过tail_async协程正确地合并成一份输出文件。

import asyncio# On Windows, a ProactorEventLoop can't be created within
# threads because it tries to register signal handlers. This
# is a work-around to always use the SelectorEventLoop policy
# instead. See: https://bugs.python.org/issue33792
policy = asyncio.get_event_loop_policy()
policy._loop_factory = asyncio.SelectorEventLoopasync def run_tasks(handles, interval, output_path):with open(output_path, 'wb') as output:async def write_async(data):output.write(data)tasks = []for handle in handles:coro = tail_async(handle, interval, write_async)task = asyncio.create_task(coro)tasks.append(task)await asyncio.gather(*tasks)

但这样写有个大问题,就是针对输出文件所做的open、close以及write操作,全都要放在主线程中执行,而这些操作又需要在程序所处的操作系统执行系统调用,这些调用可能会让事件循环阻塞很长一段时间,导致其他协程没办法推进。这会降低程序的总体响应能力,而且会增加延迟,对于高并发服务器来说,这个问题尤其严重。

调用asyncio.run函数时,把debug参数设为True,可以帮助我们发现这种问题。例如,下面这种写法就能显示出,slow_coroutine协程所执行的系统调用耗时比较长,这可以提醒我们注意,要读取的文件是否已经损坏,或者其中某一行是否读不出来。

import timeasync def slow_coroutine():time.sleep(0.5)  # Simulating slow I/Oasyncio.run(slow_coroutine(), debug=True)

为了进一步提升程序的响应能力,我们可以想办法把那些有可能会执行系统调用的操作从程序本身的事件循环里面拿走。例如,新建这样一个Thread子类,让它把那种给输出文件写入数据的操作封装到自己的事件循环里面,这样就不会阻塞程序本身的事件循环了。

其他线程中的协程,可以直接调用这个线程类的write方法,并对该方法做await。其实这个write方法,只不过是把真正负责执行写入操作的那个real_write封装了起来。这种封装方式能够确保线程安全,因此不需要再通过Lock加锁.

然后,我们按照相似的思路,编写真正负责停止本线程的real_stop方法,并把它封装到stop里面,这样的话,其他协程就可以通过stop方法告知本线程应该结束工作。这项操作同样是线程安全的。

另外,还可以定义__aenter__与__aexit__方法,让我们的线程能够用在异步版本的with语句之中,以确保该线程的启动与关闭会安排在适当的时机执行,而不拖慢主事件循环所在的那条线程。

from threading import Threadclass WriteThread(Thread):def __init__(self, output_path):super().__init__()self.output_path = output_pathself.output = Noneself.loop = asyncio.new_event_loop()def run(self):asyncio.set_event_loop(self.loop)with open(self.output_path, 'wb') as self.output:self.loop.run_forever()# Run one final round of callbacks so the await on# stop() in another event loop will be resolved.self.loop.run_until_complete(asyncio.sleep(0))# Example 4async def real_write(self, data):self.output.write(data)async def write(self, data):coro = self.real_write(data)future = asyncio.run_coroutine_threadsafe(coro, self.loop)await asyncio.wrap_future(future)# Example 5async def real_stop(self):self.loop.stop()async def stop(self):coro = self.real_stop()future = asyncio.run_coroutine_threadsafe(coro, self.loop)await asyncio.wrap_future(future)# Example 6async def __aenter__(self):loop = asyncio.get_event_loop()await loop.run_in_executor(None, self.start)return selfasync def __aexit__(self, *_):await self.stop()

这段代码演示了如何结合使用 Python 的 asyncio 模块和线程 (threading.Thread) 来创建一个在单独线程中运行的异步事件循环。这在需要在异步环境中执行 I/O 操作,但又不想阻塞主事件循环时非常有用。下面是对代码中关键部分的解释:

WriteThread 类 (继承自 Thread)

  1. 初始化 (__init__ 方法):

    • 构造函数设置了 output_path,这是输出文件的路径。
    • self.loop 创建了一个新的异步事件循环。
  2. run 方法:

    • 这个方法是线程的入口点,它设置当前线程的事件循环,并打开输出文件。
    • self.loop.run_forever() 使事件循环持续运行,直到调用 stop 方法。
    • 最后,通过 self.loop.run_until_complete(asyncio.sleep(0)) 确保事件循环能够完成所有挂起的协程。

异步写入方法

  1. real_write 协程:

    • 这是一个普通的协程,用于执行实际的写入操作。
  2. write 协程:

    • 这个方法使用 asyncio.run_coroutine_threadsafe 安全地从另一个线程运行 real_write 协程。
    • 它等待 real_write 完成,并使用 await asyncio.wrap_future(future)Future 对象包装为协程。

停止事件循环

  1. real_stop 协程:

    • 这个方法简单地调用 self.loop.stop() 来停止事件循环。
  2. stop 协程:

    • 类似于 write 方法,它使用 asyncio.run_coroutine_threadsafe 来安排 real_stop 在事件循环中运行。

上下文管理器协议

  1. __aenter__ 协程:

    • 这个方法实现了上下文管理器协议的 __enter__ 方法,允许使用 with 语句来管理 WriteThread 对象的生命周期。
    • 它使用 await 来启动线程。
  2. __aexit__ 协程:

    • 这个方法实现了上下文管理器协议的 __exit__ 方法,用于清理操作,比如停止事件循环。

关键点

  • 事件循环在新线程中运行:通过在 Thread 的子类中创建和运行自己的事件循环,可以在不阻塞主线程的情况下执行异步操作。
  • 线程安全地运行协程:使用 asyncio.run_coroutine_threadsafe 可以在不同的线程中安排协程的执行。
  • 上下文管理器:通过实现 __aenter____aexit__ 方法,WriteThread 对象可以在 with 语句中使用,这提供了一种优雅的资源管理方式。

这种模式允许你将异步 I/O 操作与线程结合使用,充分利用 asyncio 的优势,同时避免在 I/O 密集型操作中阻塞主事件循环。

写好了新的线程类之后,我们可以重构run_tasks,把它变成纯粹的异步版本。这个版本更易读懂,而且完全避免了那些耗时较长的系统调用把主事件循环所在的线程拖慢。

class NoNewData(Exception):passdef readline(handle):offset = handle.tell()handle.seek(0, 2)length = handle.tell()if length == offset:raise NoNewDatahandle.seek(offset, 0)return handle.readline()async def tail_async(handle, interval, write_func):loop = asyncio.get_event_loop()while not handle.closed:try:line = await loop.run_in_executor(None, readline, handle)except NoNewData:await asyncio.sleep(interval)else:await write_func(line)async def run_fully_async(handles, interval, output_path):async with WriteThread(output_path) as output:tasks = []for handle in handles:coro = tail_async(handle, interval, output.write)task = asyncio.create_task(coro)tasks.append(task)await asyncio.gather(*tasks)

现在验证这样写是否正确。我们把一批输入文件所对应的句柄放在handles里面,交给run_fully_async去合并,然后调用confirm_merge函数,以确认这些文件之中的内容,已经合并到了输出文件里面。

import collections
import os
import random
import string
from tempfile import TemporaryDirectorydef write_random_data(path, write_count, interval):with open(path, 'wb') as f:for i in range(write_count):time.sleep(random.random() * interval)letters = random.choices(string.ascii_lowercase, k=10)data = f'{path}-{i:02}-{"".join(letters)}\n'f.write(data.encode())f.flush()def start_write_threads(directory, file_count):paths = []for i in range(file_count):path = os.path.join(directory, str(i))with open(path, 'w'):# Make sure the file at this path will exist when# the reading thread tries to poll it.passpaths.append(path)args = (path, 10, 0.1)thread = Thread(target=write_random_data, args=args)thread.start()return pathsdef close_all(handles):time.sleep(1)for handle in handles:handle.close()def setup():tmpdir = TemporaryDirectory()input_paths = start_write_threads(tmpdir.name, 5)handles = []for path in input_paths:handle = open(path, 'rb')handles.append(handle)Thread(target=close_all, args=(handles,)).start()output_path = os.path.join(tmpdir.name, 'merged')return tmpdir, input_paths, handles, output_path# Example 9
def confirm_merge(input_paths, output_path):found = collections.defaultdict(list)with open(output_path, 'rb') as f:for line in f:for path in input_paths:if line.find(path.encode()) == 0:found[path].append(line)expected = collections.defaultdict(list)for path in input_paths:with open(path, 'rb') as f:expected[path].extend(f.readlines())for key, expected_lines in expected.items():found_lines = found[key]assert expected_lines == found_linesinput_paths = ...
handles = ...
output_path = ...tmpdir, input_paths, handles, output_path = setup()asyncio.run(run_fully_async(handles, 0.1, output_path))confirm_merge(input_paths, output_path)tmpdir.cleanup()

完整代码:


# Example 1
import asyncio# On Windows, a ProactorEventLoop can't be created within
# threads because it tries to register signal handlers. This
# is a work-around to always use the SelectorEventLoop policy
# instead. See: https://bugs.python.org/issue33792
policy = asyncio.get_event_loop_policy()
policy._loop_factory = asyncio.SelectorEventLoopasync def run_tasks(handles, interval, output_path):with open(output_path, 'wb') as output:async def write_async(data):output.write(data)tasks = []for handle in handles:coro = tail_async(handle, interval, write_async)task = asyncio.create_task(coro)tasks.append(task)await asyncio.gather(*tasks)# Example 2
import timeasync def slow_coroutine():time.sleep(0.5)  # Simulating slow I/Oasyncio.run(slow_coroutine(), debug=True)# Example 3
from threading import Threadclass WriteThread(Thread):def __init__(self, output_path):super().__init__()self.output_path = output_pathself.output = Noneself.loop = asyncio.new_event_loop()def run(self):asyncio.set_event_loop(self.loop)with open(self.output_path, 'wb') as self.output:self.loop.run_forever()# Run one final round of callbacks so the await on# stop() in another event loop will be resolved.self.loop.run_until_complete(asyncio.sleep(0))async def real_write(self, data):self.output.write(data)async def write(self, data):coro = self.real_write(data)future = asyncio.run_coroutine_threadsafe(coro, self.loop)await asyncio.wrap_future(future)async def real_stop(self):self.loop.stop()async def stop(self):coro = self.real_stop()future = asyncio.run_coroutine_threadsafe(coro, self.loop)await asyncio.wrap_future(future)async def __aenter__(self):loop = asyncio.get_event_loop()await loop.run_in_executor(None, self.start)return selfasync def __aexit__(self, *_):await self.stop()class NoNewData(Exception):passdef readline(handle):offset = handle.tell()handle.seek(0, 2)length = handle.tell()if length == offset:raise NoNewDatahandle.seek(offset, 0)return handle.readline()async def tail_async(handle, interval, write_func):loop = asyncio.get_event_loop()while not handle.closed:try:line = await loop.run_in_executor(None, readline, handle)except NoNewData:await asyncio.sleep(interval)else:await write_func(line)async def run_fully_async(handles, interval, output_path):async with WriteThread(output_path) as output:tasks = []for handle in handles:coro = tail_async(handle, interval, output.write)task = asyncio.create_task(coro)tasks.append(task)await asyncio.gather(*tasks)# This is all code to simulate the writers to the handles
import collections
import os
import random
import string
from tempfile import TemporaryDirectorydef write_random_data(path, write_count, interval):with open(path, 'wb') as f:for i in range(write_count):time.sleep(random.random() * interval)letters = random.choices(string.ascii_lowercase, k=10)data = f'{path}-{i:02}-{"".join(letters)}\n'f.write(data.encode())f.flush()def start_write_threads(directory, file_count):paths = []for i in range(file_count):path = os.path.join(directory, str(i))with open(path, 'w'):# Make sure the file at this path will exist when# the reading thread tries to poll it.passpaths.append(path)args = (path, 10, 0.1)thread = Thread(target=write_random_data, args=args)thread.start()return pathsdef close_all(handles):time.sleep(1)for handle in handles:handle.close()def setup():tmpdir = TemporaryDirectory()input_paths = start_write_threads(tmpdir.name, 5)handles = []for path in input_paths:handle = open(path, 'rb')handles.append(handle)Thread(target=close_all, args=(handles,)).start()output_path = os.path.join(tmpdir.name, 'merged')return tmpdir, input_paths, handles, output_pathdef confirm_merge(input_paths, output_path):found = collections.defaultdict(list)with open(output_path, 'rb') as f:for line in f:for path in input_paths:if line.find(path.encode()) == 0:found[path].append(line)expected = collections.defaultdict(list)for path in input_paths:with open(path, 'rb') as f:expected[path].extend(f.readlines())for key, expected_lines in expected.items():found_lines = found[key]assert expected_lines == found_linesinput_paths = ...
handles = ...
output_path = ...tmpdir, input_paths, handles, output_path = setup()asyncio.run(run_fully_async(handles, 0.1, output_path))confirm_merge(input_paths, output_path)tmpdir.cleanup()

这篇关于python并发与并行(十一) ———— 让asyncio的事件循环保持畅通,以便进一步提升程序的响应能力的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1117086

相关文章

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

好题——hdu2522(小数问题:求1/n的第一个循环节)

好喜欢这题,第一次做小数问题,一开始真心没思路,然后参考了网上的一些资料。 知识点***********************************无限不循环小数即无理数,不能写作两整数之比*****************************(一开始没想到,小学没学好) 此题1/n肯定是一个有限循环小数,了解这些后就能做此题了。 按照除法的机制,用一个函数表示出来就可以了,代码如下

禁止平板,iPad长按弹出默认菜单事件

通过监控按下抬起时间差来禁止弹出事件,把以下代码写在要禁止的页面的页面加载事件里面即可     var date;document.addEventListener('touchstart', event => {date = new Date().getTime();});document.addEventListener('touchend', event => {if (new

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟 开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚 第一站:海量资源,应有尽有 走进“智听

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

高并发环境中保持幂等性

在高并发环境中保持幂等性是一项重要的挑战。幂等性指的是无论操作执行多少次,其效果都是相同的。确保操作的幂等性可以避免重复执行带来的副作用。以下是一些保持幂等性的常用方法: 唯一标识符: 请求唯一标识:在每次请求中引入唯一标识符(如 UUID 或者生成的唯一 ID),在处理请求时,系统可以检查这个标识符是否已经处理过,如果是,则忽略重复请求。幂等键(Idempotency Key):客户端在每次

nudepy,一个有趣的 Python 库!

更多资料获取 📚 个人网站:ipengtao.com 大家好,今天为大家分享一个有趣的 Python 库 - nudepy。 Github地址:https://github.com/hhatto/nude.py 在图像处理和计算机视觉应用中,检测图像中的不适当内容(例如裸露图像)是一个重要的任务。nudepy 是一个基于 Python 的库,专门用于检测图像中的不适当内容。该

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip