【python百宝箱】抛开GIL束缚:线程、进程、异步实现高效编程

本文主要是介绍【python百宝箱】抛开GIL束缚:线程、进程、异步实现高效编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Python并发编程大揭秘:线程、进程、异步

前言

在当今计算机科学领域,处理大规模任务并提高程序性能的需求越来越迫切。Python作为一种流行而灵活的编程语言,提供了多种处理并发的工具和库。本文将深入探讨Python中的并发编程,主要聚焦于concurrent.futuresthreadingmultiprocessing以及asyncio等关键库,通过实例和详细介绍,帮助读者更好地理解并发编程的核心概念和应用场景。

文章目录

  • Python并发编程大揭秘:线程、进程、异步
    • 前言
      • 1. `concurrent.futures`
        • 1.1 简介
        • 1.2 主要特性
        • 1.3 应用场景
        • 1.4 批量提交任务
        • 1.5 Timeout 控制
        • 1.6 处理异常
        • 1.7 使用 `as_completed`
        • 1.8 取消任务
      • 2. `threading`
        • 2.1 概述
        • 2.2 主要功能
        • 2.3 适用场景
        • 2.4 线程间通信与队列
        • 2.5 定时线程
        • 2.6 定时线程与 `Timer`
        • 2.7 线程安全的数据结构
        • 2.8 `threading` 中的事件
      • 3. `multiprocessing`
        • 3.1 概述
        • 3.2 关键特性
        • 3.3 应用场景
        • 3.4 进程间通信
        • 3.5 进程池
        • 3.6 `Value` 和 `Array` 实现共享内存
        • 3.7 异步执行任务
        • 3.8 进程间事件通知
        • 3.9 进程间共享资源
      • 4. `asyncio`
        • 4.1 简介
        • 4.2 主要组件
        • 4.3 优势与应用
        • 4.4 异步I/O与协程
        • 4.5 异步任务与`gather`
        • 4.6 异步I/O与`async with`
        • 4.7 异步网络请求
      • 5. 其他相关库
        • 5.1 `queue`
        • 5.2 `async/await`
        • 5.3 `threading` 和 `multiprocessing` 的替代品
        • 5.4 `asyncpg`
      • 6. 并发和多线程的最佳实践
        • 6.1 尽量使用线程池和进程池
        • 6.2 注意线程同步
        • 6.3 使用 `async/await` 进行异步编程
        • 6.4 谨慎处理共享资源
        • 6.5 选择适当的工具和库
    • 总结

1. concurrent.futures

1.1 简介

Python的concurrent.futures模块为异步执行函数提供了一个高层次的接口,使并发编程更加便捷。这个模块主要关注于任务的并行执行,通过提供线程池和进程池的支持,使得开发者能够更轻松地实现并发操作。

1.2 主要特性
  • 线程池和进程池的支持: concurrent.futures通过ThreadPoolExecutorProcessPoolExecutor两个类实现线程池和进程池。这使得开发者能够根据任务的性质选择适当的并发方式,充分利用多核资源。

  • 异步执行任务: 使用submit函数可以异步提交任务,该函数返回一个Future对象,代表将来会返回结果的承诺。

  • 获取结果和处理异常: Future对象提供了一种轻松获取任务执行结果的方式,同时也能捕获任务中可能出现的异常。通过result方法,我们能够同步等待任务的完成并获取最终结果。

1.3 应用场景

concurrent.futures适用于需要并发执行独立任务的场景,从而提高程序的性能。以下是一个简单的示例,展示了如何使用线程池计算数字的平方:

from concurrent.futures import ThreadPoolExecutordef square(n):return n * nwith ThreadPoolExecutor() as executor:# 异步提交任务future = executor.submit(square, 5)# 获取结果result = future.result()print(result)  # 输出:25

这个例子展示了如何使用concurrent.futures创建一个线程池,并通过submit方法提交任务,然后通过result方法获取任务执行的结果。这种模式特别适用于需要并行执行多个相互独立任务的情景。

1.4 批量提交任务

除了单个任务的提交,concurrent.futures还支持批量提交任务,这在需要同时处理多个任务的情况下非常实用。使用map方法,可以方便地将一个函数应用于可迭代的多个参数,实现并行处理。

from concurrent.futures import ThreadPoolExecutordef square(n):return n * nnumbers = [1, 2, 3, 4, 5]with ThreadPoolExecutor() as executor:# 批量异步提交任务results = executor.map(square, numbers)# 获取结果for result in results:print(result)
# 输出:
# 1
# 4
# 9
# 16
# 25

这个例子中,我们使用map方法一次性提交了多个任务,每个任务计算一个数字的平方。通过map返回的生成器,我们可以轻松地迭代并获取每个任务的结果。

1.5 Timeout 控制

在实际应用中,有时我们希望对任务的执行时间进行控制,以防止某个任务因为执行时间过长而影响整体程序的性能。concurrent.futures允许我们设置timeout参数,限定任务的最大执行时间。

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import timedef long_running_task():time.sleep(5)return "Task completed"with ThreadPoolExecutor() as executor:# 异步提交任务并设置 timeout 为 3 秒future = executor.submit(long_running_task)try:result = future.result(timeout=3)print(result)except TimeoutError:print("Task timed out")

在这个例子中,long_running_task函数模拟一个耗时的任务,但我们通过设置timeout参数为3秒,使得即便任务未完成,也会在指定时间内抛出TimeoutError异常。

这些用法展示了concurrent.futures模块的灵活性和实用性,为开发者提供了强大的工具来处理并发任务。

1.6 处理异常

concurrent.futures模块提供了方便的异常处理机制。通过add_done_callback方法,我们能够注册一个回调函数,用于处理任务执行完成时的结果或异常。

from concurrent.futures import ThreadPoolExecutordef task_with_exception():raise ValueError("Simulated error in the task")def handle_result(future):try:result = future.result()print("Task result:", result)except Exception as e:print("Task raised an exception:", e)with ThreadPoolExecutor() as executor:# 异步提交任务future = executor.submit(task_with_exception)# 注册异常处理回调函数future.add_done_callback(handle_result)

在这个例子中,task_with_exception函数模拟了一个会抛出异常的任务。我们使用add_done_callback方法注册了一个回调函数handle_result,该函数会在任务执行完成时被调用,以处理任务的结果或异常。

1.7 使用 as_completed

concurrent.futures还提供了as_completed函数,它接受一组Future对象,并在它们完成时生成这些Future对象的迭代器,使得我们能够按照完成的顺序获取结果。

from concurrent.futures import ThreadPoolExecutor, as_completeddef square(n):return n * nnumbers = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]with ThreadPoolExecutor() as executor:# 异步提交任务futures = [executor.submit(square, num) for num in numbers]# 按照完成顺序获取结果for future in as_completed(futures):result = future.result()print(result)

在这个例子中,我们使用as_completed函数,它返回一个迭代器,按照任务完成的顺序产生Future对象。这样,我们可以立即处理已完成任务的结果,而不必等待所有任务都完成。

1.8 取消任务

concurrent.futures模块允许我们取消尚未开始的任务或正在运行的任务。通过cancel方法,我们可以在提交任务后取消它,但需要注意,已经开始执行的任务无法取消。

from concurrent.futures import ThreadPoolExecutor
import timedef long_running_task():time.sleep(5)return "Task completed"with ThreadPoolExecutor() as executor:# 异步提交任务future = executor.submit(long_running_task)# 取消任务cancelled = future.cancel()if cancelled:print("Task successfully cancelled")else:print("Task could not be cancelled as it is already running")

在这个例子中,我们通过cancel方法尝试取消一个正在休眠的任务。由于任务已经开始执行,所以取消操作无法生效。在实际应用中,需要根据任务的性质谨慎使用任务取消功能。

这些进一步拓展了对 concurrent.futures 模块的理解,为读者提供了更多处理异步任务的技巧和手段。

2. threading

2.1 概述

threading模块是Python标准库提供的模块,专用于创建和管理线程。线程是程序中执行的最小单位,threading模块允许程序在同一进程中的多个线程中运行,实现并发执行。在处理I/O密集型任务时,使用线程可以有效提高程序的响应性,使得在等待外部资源时,其他线程仍能执行。

2.2 主要功能
  • 创建和启动线程: 使用Thread类可以轻松创建线程对象,然后调用start方法启动线程。下面是一个简单的例子:

    import threadingdef my_function():print("Thread is running!")# 创建线程对象
    my_thread = threading.Thread(target=my_function)# 启动线程
    my_thread.start()
    
  • 线程同步和通信: 在多线程环境下,为了避免竞争条件和确保数据一致性,线程同步非常重要。threading模块提供了多种同步机制,包括锁(Lock)、条件变量(Condition)、信号量(Semaphore)等。这些工具可以用于线程间的同步和通信,确保线程安全。

2.3 适用场景

threading适用于处理I/O密集型任务,其中线程可以在等待外部资源时继续执行其他任务,提高整体程序性能。然而,由于Python的全局解释器锁(GIL)限制,threading对于CPU密集型任务并不是一个理想的选择。以下是一个简单的线程同步的例子:

import threadingshared_variable = 0
lock = threading.Lock()def increment():global shared_variablefor _ in range(100000):with lock:shared_variable += 1# 创建两个线程并启动
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
thread1.start()
thread2.start()
thread1.join()
thread2.join()print(shared_variable)  # 输出:200000

在这个例子中,我们通过使用锁(Lock)确保了shared_variable的线程安全访问。两个线程并发执行 increment 函数,通过锁的机制保证了对共享变量的互斥访问。这是一个典型的线程同步例子,适用于需要保证数据一致性的场景。

2.4 线程间通信与队列

线程间通信是多线程编程中常见的需求之一,而threading模块通过提供Queue类,简化了线程之间的通信。Queue是一个线程安全的队列实现,可以用于在不同线程之间传递数据。

import threading
import queue
import timedef producer(q):for i in range(5):time.sleep(1)item = f"Item {i}"q.put(item)print(f"Produced: {item}")def consumer(q):while True:item = q.get()if item is None:breakprint(f"Consumed: {item}")# 创建线程安全的队列
my_queue = queue.Queue()# 创建生产者线程和消费者线程
producer_thread = threading.Thread(target=producer, args=(my_queue,))
consumer_thread = threading.Thread(target=consumer, args=(my_queue,))# 启动线程
producer_thread.start()
consumer_thread.start()# 等待生产者线程完成
producer_thread.join()# 等待队列被消费完(放入 None 表示结束消费)
my_queue.put(None)
consumer_thread.join()

在这个例子中,producer函数模拟了生产者,每秒向队列中放入一个项目。consumer函数模拟了消费者,不断从队列中取出项目并处理。通过Queue的机制,我们实现了线程之间的通信和数据共享。

2.5 定时线程

threading模块还支持定时线程,即可以在指定的时间间隔内重复执行某个任务。这通过使用Timer类实现。

import threading
import timedef print_message():print("Timer expired!")# 创建定时线程,每 2 秒执行一次 print_message 函数
timer_thread = threading.Timer(2, print_message)# 启动定时线程
timer_thread.start()# 主线程等待定时线程完成
timer_thread.join()print("Main thread continues...")

在这个例子中,Timer类被用于创建一个在2秒后执行的定时线程。通过这种方式,我们可以在多线程环境下实现周期性的任务调度。

这些用法展示了threading模块在多线程编程中的灵活性和强大功能,为读者提供了更多处理线程相关任务的工具和技巧。

2.6 定时线程与 Timer

threading模块中的 Timer 类提供了一种在指定时间间隔后执行某个任务的方式。这对于需要周期性执行任务的场景非常有用。

import threadingdef print_message():print("Timer expired!")# 创建定时线程,每 2 秒执行一次 print_message 函数
timer_thread = threading.Timer(2, print_message)# 启动定时线程
timer_thread.start()# 主线程等待定时线程完成
timer_thread.join()print("Main thread continues...")

在这个例子中,Timer 类被用于创建一个在2秒后执行的定时线程。通过这种方式,我们可以在多线程环境下实现周期性的任务调度。

2.7 线程安全的数据结构

threading模块还提供了一些线程安全的数据结构,如 LockSemaphoreCondition,用于协调多个线程对共享资源的访问。这些数据结构可以确保在多线程环境中对共享数据的安全访问。

import threading# 使用 Lock 实现线程安全的计数器
class Counter:def __init__(self):self.value = 0self.lock = threading.Lock()def increment(self):with self.lock:self.value += 1# 创建 Counter 实例
counter = Counter()# 创建多个线程并启动
threads = []
for _ in range(5):thread = threading.Thread(target=counter.increment)threads.append(thread)thread.start()# 主线程等待所有线程完成
for thread in threads:thread.join()print("Counter value:", counter.value)  # 输出:Counter value: 5

在这个例子中,我们使用 Lock 确保了 Counter 类中的 increment 方法的线程安全。多个线程并发执行 increment 方法,但由于使用了锁,共享数据 value 的访问是互斥的,从而避免了竞争条件。

2.8 threading 中的事件

threading 模块中的 Event 类提供了一种线程间通信的方式,用于在多个线程之间设置信号。一个线程可以等待事件的发生,而另一个线程可以触发事件的发生。

import threading
import timedef worker(event):print("Worker is waiting for the event.")event.wait()print("Worker got the event and is now working.")# 创建事件对象
my_event = threading.Event()# 创建并启动线程
my_thread = threading.Thread(target=worker, args=(my_event,))
my_thread.start()# 主线程等待一段时间后设置事件
time.sleep(2)
print("Main thread is setting the event.")
my_event.set()# 主线程等待子线程完成
my_thread.join()

在这个例子中,主线程创建了一个 Event 对象 my_event,并传递给子线程 worker。子线程在 wait 方法上阻塞等待事件的发生,而主线程通过 set 方法设置了事件。一旦事件被设置,子线程将收到通知并执行相应的任务。

这些进一步拓展了对 threading 模块的理解,为读者提供了更多处理线程相关任务的工具和技巧。

3. multiprocessing

3.1 概述

multiprocessing模块处理多进程,与threading相似,但使用多个进程。这使得它适用于CPU密集型任务。

3.2 关键特性
  • 充分利用多核CPU: 通过创建多个进程,每个进程在独立的CPU核心上运行。
  • 进程间通信机制: 使用Queue等机制实现进程间通信。
3.3 应用场景

multiprocessing适用于并行处理独立的计算密集型任务。以下是一个简单的多进程示例:

from multiprocessing import Process, Value, Lockshared_variable = Value('i', 0)
lock = Lock()def increment():global shared_variablefor _ in range(100000):with lock:shared_variable.value += 1# 创建两个进程并启动
process1 = Process(target=increment)
process2 = Process(target=increment)
process1.start()
process2.start()
process1.join()
process2.join()print(shared_variable.value)  # 输出:200000
3.4 进程间通信

在多进程编程中,multiprocessing模块提供了多种进程间通信的机制,其中最常用的是 QueueQueue是一个进程安全的队列,可以在多个进程之间安全地传递数据。

from multiprocessing import Process, Queuedef producer(q):for i in range(5):item = f"Item {i}"q.put(item)print(f"Produced: {item}")def consumer(q):while True:item = q.get()if item is None:breakprint(f"Consumed: {item}")if __name__ == "__main__":# 创建进程安全的队列my_queue = Queue()# 创建生产者进程和消费者进程producer_process = Process(target=producer, args=(my_queue,))consumer_process = Process(target=consumer, args=(my_queue,))# 启动进程producer_process.start()consumer_process.start()# 主进程等待生产者进程完成producer_process.join()# 向队列放入 None 表示结束消费my_queue.put(None)# 主进程等待消费者进程完成consumer_process.join()

在这个例子中,producer 进程负责往队列中放入数据,而 consumer 进程负责从队列中取出数据进行消费。通过 Queue 的机制,我们实现了多进程之间的通信。

3.5 进程池

multiprocessing模块提供了 Pool 类,用于创建进程池,进一步提高了多进程编程的便利性。通过进程池,可以重用已创建的进程,降低了进程创建和销毁的开销。

from multiprocessing import Pooldef square(n):return n * nif __name__ == "__main__":with Pool(processes=4) as pool:# 使用 map 方法将 square 函数应用于一组参数results = pool.map(square, [1, 2, 3, 4, 5])print(results)  # 输出:[1, 4, 9, 16, 25]

在这个例子中,我们使用 Pool 类创建了一个包含4个进程的进程池,并通过 map 方法将 square 函数应用于一组参数。这样,square 函数将并行执行在不同进程中,提高了整体计算速度。

3.6 ValueArray 实现共享内存

在多进程编程中,有时需要在不同进程之间共享数据。multiprocessing模块提供了 ValueArray 两种方式实现共享内存。

from multiprocessing import Process, Value, Arraydef modify_shared_data(shared_value, shared_array):with shared_value.get_lock():shared_value.value += 1for i in range(len(shared_array)):shared_array[i] *= 2if __name__ == "__main__":shared_counter = Value("i", 0)  # 共享整数shared_numbers = Array("i", [1, 2, 3, 4, 5])  # 共享数组# 创建两个进程process1 = Process(target=modify_shared_data, args=(shared_counter, shared_numbers))process2 = Process(target=modify_shared_data, args=(shared_counter, shared_numbers))# 启动进程process1.start()process2.start()# 等待进程结束process1.join()process2.join()print("Shared counter:", shared_counter.value)  # 输出:2print("Shared array:", shared_numbers[:])  # 输出:[2, 4, 6, 8, 10]

在这个例子中,通过 Value 创建了一个共享的整数 shared_counter,通过 Array 创建了一个共享的整数数组 shared_numbers。两个进程并行执行 modify_shared_data 函数,通过 get_lock 方法确保了对共享数据的互斥访问。

这些进一步拓展了对 multiprocessing 模块的理解,为读者提供了更多在多进程编程中的实际应用场景和技巧。

3.7 异步执行任务

multiprocessing模块的 map_async 方法允许异步地并行执行函数。这种方式适用于对结果的获取没有先后顺序要求的情况。

from multiprocessing import Pooldef square(n):return n * nif __name__ == "__main__":with Pool(processes=4) as pool:# 使用 map_async 异步执行 square 函数result = pool.map_async(square, [1, 2, 3, 4, 5])# 等待所有进程完成pool.close()pool.join()print(result.get())  # 输出:[1, 4, 9, 16, 25]

在这个例子中,通过 map_async 异步执行了 square 函数,然后通过 get 方法获取结果。这种方式适用于对结果的获取没有先后顺序要求的情况。

3.8 进程间事件通知

multiprocessing模块的 Event 类与 threading 中的 Event 类相似,用于实现进程间的事件通知。

from multiprocessing import Process, Event
import timedef worker(event):print("Worker is waiting for the event.")event.wait()print("Worker got the event and is now working.")if __name__ == "__main__":# 创建事件对象my_event = Event()# 创建并启动进程my_process = Process(target=worker, args=(my_event,))my_process.start()# 主进程等待一段时间后设置事件time.sleep(2)print("Main process is setting the event.")my_event.set()# 主进程等待子进程完成my_process.join()

在这个例子中,主进程创建了一个 Event 对象 my_event,并传递给子进程 worker。子进程在 wait 方法上阻塞等待事件的发生,而主进程通过 set 方法设置了事件。一旦事件被设置,子进程将收到通知并执行相应的任务。

3.9 进程间共享资源

在多进程编程中,有时候需要在不同进程之间共享资源。multiprocessing模块的 Manager 类提供了一种简便的方式来管理进程间共享的数据。

from multiprocessing import Process, Managerdef modify_shared_list(shared_list):shared_list.append(4)shared_list.extend([5, 6, 7])if __name__ == "__main__":with Manager() as manager:# 创建进程共享的列表shared_list = manager.list([1, 2, 3])# 创建并启动进程my_process = Process(target=modify_shared_list, args=(shared_list,))my_process.start()# 主进程等待子进程完成my_process.join()print("Shared list:", shared_list)  # 输出:Shared list: [1, 2, 3, 4, 5, 6, 7]

在这个例子中,通过 Manager 类创建了一个进程共享的列表 shared_list,子进程执行了 modify_shared_list 函数,往列表中添加了新元素。由于使用了 Manager,确保了进程间共享资源的安全访问。

这些进一步拓展了对 multiprocessing 模块的理解,为读者提供了更多在多进程编程中的实际应用场景和技巧。

4. asyncio

4.1 简介

asyncio是Python的异步I/O和事件循环库,用于编写高效的异步代码。它基于协程和事件循环的概念。

4.2 主要组件
  • 事件循环(Event Loop): 负责调度和执行异步任务。
  • 协程: 通过async def定义的异步函数。
4.3 优势与应用

asyncio适用于高效处理大量I/O密集型任务,例如网络应用。以下是一个简单的异步I/O的例子:

import asyncioasync def main():print("Start")async def foo():print("Foo")await asyncio.sleep(2)print("End Foo")await asyncio.gather(foo(), foo(), foo())asyncio.run(main())
4.4 异步I/O与协程

asyncio 的核心概念之一是协程(coroutine)。通过 async def 关键字定义的函数可以被视为协程,它可以在遇到异步操作时挂起自身的执行,让事件循环去执行其他任务。

import asyncioasync def example_coroutine():print("Start Coroutine")await asyncio.sleep(2)print("End Coroutine")async def main():await example_coroutine()asyncio.run(main())

在这个例子中,example_coroutine 函数中的 await asyncio.sleep(2) 是一个异步操作,它会挂起协程的执行,然后让事件循环去执行其他任务。2秒后,协程恢复执行,打印 “End Coroutine”。

4.5 异步任务与gather

asyncio.gather 函数用于并行运行多个异步任务,并等待它们全部完成。

import asyncioasync def foo():print("Foo")await asyncio.sleep(2)print("End Foo")async def bar():print("Bar")await asyncio.sleep(1)print("End Bar")async def main():await asyncio.gather(foo(), bar(), foo())asyncio.run(main())

在这个例子中,foo()bar() 是两个异步任务,通过 asyncio.gather(foo(), bar(), foo()) 并行执行。由于 asyncio.sleep 是异步操作,整个过程将在几秒内完成。

4.6 异步I/O与async with

asyncio 还提供了 async with 语法,用于在协程中使用异步上下文管理器。这在需要在异步代码中进行资源管理时非常有用。

import asyncioclass AsyncResource:async def __aenter__(self):print("Enter AsyncResource")await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):print("Exit AsyncResource")async def main():async with AsyncResource():print("Inside async with block")asyncio.run(main())

在这个例子中,AsyncResource 是一个异步上下文管理器,通过 async with AsyncResource(): 进行使用。在 __aenter__ 中进行资源的初始化,而在 __aexit__ 中进行资源的清理。await asyncio.sleep(1) 模拟了异步操作。

4.7 异步网络请求

asyncio 在处理异步网络请求时非常强大。以下是一个使用 aiohttp 库进行异步网络请求的简单例子。

首先,确保你已经安装了 aiohttp

pip install aiohttp

然后,可以使用以下代码进行异步网络请求:

import asyncio
import aiohttpasync def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as response:return await response.text()async def main():url = "https://www.example.com"response = await fetch(url)print(response)asyncio.run(main())

在这个例子中,fetch 函数使用 aiohttp 发起异步的GET请求,而 main 函数则等待这个请求的完成。这允许在异步代码中高效地处理多个并发的网络请求。

这些例子展示了 asyncio 在异步I/O编程中的应用,为读者提供了基础和实际应用的示例。

5. 其他相关库

5.1 queue
  • queue模块提供了线程安全的队列数据结构,用于线程间通信。以下是一个简单的队列使用例子:
from queue import Queue
import threadingdef worker(queue):while True:item = queue.get()if item is None:break# 处理任务print(item)# 创建队列和线程
my_queue = Queue()
my_thread = threading.Thread(target=worker, args=(my_queue,))
my_thread.start()# 向队列中添加任务
for item in range(5):my_queue.put(item)# 关闭队列
my_queue.put(None)
my_thread.join()
5.2 async/await
  • async/await是Python3.5之后引入的关键字,用于定义异步函数。以下是一个简单的异步函数的例子:
async def async_example():print("Start")await asyncio.sleep(2)print("End")asyncio.run(async_example())
5.3 threadingmultiprocessing 的替代品
  • ray: 用于构建分布式应用程序的通用框架。
  • dask: 提供并行计算和分布式计算的灵活框架。
5.4 asyncpg
  • asyncpg是一个异步的PostgreSQL数据库驱动,适用于asyncio环境。以下是一个简单的异步数据库查询的例子:
import asyncpg
import asyncioasync def query_example():# 连接到数据库connection = await asyncpg.connect(user='user', password='password',database='mydatabase', host='localhost')# 执行查询result = await connection.fetch('SELECT * FROM mytable')# 处理查询结果for row in result:print(row)# 关闭数据库连接await connection.close()# 运行异步查询
asyncio.run(query_example())

通过了解这些库和模块,你可以更深入地了解Python中并发和多线程编程的不同方面,以及如何选择适当的工具来处理特定类型的并发任务。

6. 并发和多线程的最佳实践

在进行并发和多线程编程时,一些最佳实践可以帮助确保代码的可维护性、稳定性和性能。以下是一些建议:

6.1 尽量使用线程池和进程池

使用 concurrent.futures 模块中的线程池和进程池,而不是手动创建线程和进程。线程池和进程池提供了高级别的接口,简化了并发编程的复杂性,同时有效地管理资源。

from concurrent.futures import ThreadPoolExecutordef square(n):return n * nwith ThreadPoolExecutor() as executor:# 异步提交任务future = executor.submit(square, 5)# 获取结果result = future.result()print(result)  # 输出:25
6.2 注意线程同步

在多线程编程中,确保对共享数据的访问是线程安全的至关重要。使用锁 (Lock) 或其他同步机制确保多个线程之间的数据一致性。

import threadingshared_variable = 0
lock = threading.Lock()def increment():global shared_variablefor _ in range(100000):with lock:shared_variable += 1# 创建两个线程并启动
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
thread1.start()
thread2.start()
thread1.join()
thread2.join()print(shared_variable)  # 输出:200000
6.3 使用 async/await 进行异步编程

在处理异步编程时,使用 async/await 关键字定义异步函数,并使用 asyncio 模块进行任务调度。这可以有效地处理大量 I/O 密集型任务,提高程序性能。

import asyncioasync def async_example():print("Start")await asyncio.sleep(2)print("End")asyncio.run(async_example())
6.4 谨慎处理共享资源

在多进程编程中,共享资源的处理需要特别小心。使用 multiprocessing 模块提供的 ValueArray 等数据结构,以及 Manager 类来实现共享资源的安全访问。

from multiprocessing import Process, Value, Lockshared_variable = Value('i', 0)
lock = Lock()def increment():global shared_variablefor _ in range(100000):with lock:shared_variable.value += 1# 创建两个进程并启动
process1 = Process(target=increment)
process2 = Process(target=increment)
process1.start()
process2.start()
process1.join()
process2.join()print(shared_variable.value)  # 输出:200000
6.5 选择适当的工具和库

根据项目的需求和复杂性,选择适当的工具和库。例如,对于分布式应用程序,考虑使用 raydask;对于异步数据库操作,考虑使用像 asyncpg 这样的异步数据库驱动。

这些建议可以帮助你在并发和多线程编程中更好地组织和管理代码,确保代码的可维护性和性能。

总结

通过本文的阅读,读者将获得全面的Python并发编程知识,从concurrent.futures的任务异步执行,到threadingmultiprocessing的多线程与多进程处理,再到asyncio的异步编程范式,深入理解并发编程的核心思想和实际应用。通过实例演示和详细介绍,读者将能够更自信地选择适当的工具来处理各种并发任务,提升程序性能,迎接复杂应用场景的挑战。

这篇关于【python百宝箱】抛开GIL束缚:线程、进程、异步实现高效编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/402005

相关文章

Java实现MD5加密的四种方式

《Java实现MD5加密的四种方式》MD5是一种广泛使用的哈希算法,其输出结果是一个128位的二进制数,通常以32位十六进制数的形式表示,MD5的底层实现涉及多个复杂的步骤和算法,本文给大家介绍了Ja... 目录MD5介绍Java 中实现 MD5 加密方式方法一:使用 MessageDigest方法二:使用

Python如何获取域名的SSL证书信息和到期时间

《Python如何获取域名的SSL证书信息和到期时间》在当今互联网时代,SSL证书的重要性不言而喻,它不仅为用户提供了安全的连接,还能提高网站的搜索引擎排名,那我们怎么才能通过Python获取域名的S... 目录了解SSL证书的基本概念使用python库来抓取SSL证书信息安装必要的库编写获取SSL证书信息

mysql删除无用用户的方法实现

《mysql删除无用用户的方法实现》本文主要介绍了mysql删除无用用户的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 1、删除不用的账户(1) 查看当前已存在账户mysql> select user,host,pa

Nginx配置location+rewrite实现隐性域名配置

《Nginx配置location+rewrite实现隐性域名配置》本文主要介绍了Nginx配置location+rewrite实现隐性域名配置,包括基于根目录、条件和反向代理+rewrite配置的隐性... 目录1、配置基于根目录的隐性域名(就是nginx反向代理)2、配置基于条件的隐性域名2.1、基于条件

Linux配置IP地址的三种实现方式

《Linux配置IP地址的三种实现方式》:本文主要介绍Linux配置IP地址的三种实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录环境RedHat9第一种安装 直接配置网卡文件第二种方式 nmcli(Networkmanager command-line

Java实现将Markdown转换为纯文本

《Java实现将Markdown转换为纯文本》这篇文章主要为大家详细介绍了两种在Java中实现Markdown转纯文本的主流方法,文中的示例代码讲解详细,大家可以根据需求选择适合的方案... 目录方法一:使用正则表达式(轻量级方案)方法二:使用 Flexmark-Java 库(专业方案)1. 添加依赖(Ma

使用EasyExcel实现简单的Excel表格解析操作

《使用EasyExcel实现简单的Excel表格解析操作》:本文主要介绍如何使用EasyExcel完成简单的表格解析操作,同时实现了大量数据情况下数据的分次批量入库,并记录每条数据入库的状态,感兴... 目录前言固定模板及表数据格式的解析实现Excel模板内容对应的实体类实现AnalysisEventLis

Mybatis从3.4.0版本到3.5.7版本的迭代方法实现

《Mybatis从3.4.0版本到3.5.7版本的迭代方法实现》本文主要介绍了Mybatis从3.4.0版本到3.5.7版本的迭代方法实现,包括主要的功能增强、不兼容的更改和修复的错误,具有一定的参考... 目录一、3.4.01、主要的功能增强2、selectCursor example3、不兼容的更改二、

如何使用C#串口通讯实现数据的发送和接收

《如何使用C#串口通讯实现数据的发送和接收》本文详细介绍了如何使用C#实现基于串口通讯的数据发送和接收,通过SerialPort类,我们可以轻松实现串口通讯,并结合事件机制实现数据的传递和处理,感兴趣... 目录1. 概述2. 关键技术点2.1 SerialPort类2.2 异步接收数据2.3 数据解析2.

详解如何使用Python提取视频文件中的音频

《详解如何使用Python提取视频文件中的音频》在多媒体处理中,有时我们需要从视频文件中提取音频,本文为大家整理了几种使用Python编程语言提取视频文件中的音频的方法,大家可以根据需要进行选择... 目录引言代码部分方法扩展引言在多媒体处理中,有时我们需要从视频文件中提取音频,以便进一步处理或分析。本文