本文主要是介绍async def 异步编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Python异步函数即async必须在普通函数的命名前加上async
参考文档: 第二十一章 异步编程_async def-CSDN博客
三 示例一
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlistMAX_KEYWORD_LEN = 4 1async def probe(domain: str) -> tuple[str, bool]: 2loop = asyncio.get_running_loop() 3try:await loop.getaddrinfo(domain, None) 4except socket.gaierror:return (domain, False)return (domain, True)async def main() -> None: 5names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) 6domains = (f'{name}.dev'.lower() for name in names) 7coros = [probe(domain) for domain in domains] 8for coro in asyncio.as_completed(coros): 9domain, found = await coro 10mark = '+' if found else ' 'print(f'{mark} {domain}')if __name__ == '__main__':asyncio.run(main()) 11
四 示例二
import asyncio
import timefrom x_mock.m_mock import m_mockdef get_time():return m_mock.mock('@time("%H:%M:%S.%f")')async def case_a():print('start', get_time(), 'case_a')await asyncio.sleep(2) # 只阻塞当前函数,所以比case_b 多等 1s,下面这句最后打印print('end', get_time(), 'case_a')return 'case_a'async def case_b():print('start', get_time(), 'case_b')await asyncio.sleep(1)print('end', get_time(), 'case_b')return 'case_b'async def main():return await asyncio.gather(case_a(),case_b())if __name__ == '__main__':start = time.time()# asyncio.run(main()) # 运行方式1,不建议,会有一些报错# 运行方式2loop = asyncio.get_event_loop()print(loop.run_until_complete(main()))print(time.time() - start)
这篇关于async def 异步编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!