python使用多进程multiprocessing

2024-08-30 04:20

本文主要是介绍python使用多进程multiprocessing,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

python使用多进程multiprocessing

  • 1 多进程解释
  • 2 进程的演示
  • 3 进程池方法
  • 4 pool.map()的解析
      • pool.map() 的基本用法
      • 返回值
      • 语法
      • 示例
      • 注意事项
      • 适用场景
  • 5 pool.join()详解
      • 示例
      • 注意事项
      • pool.join()的运行逻辑
      • 阻塞特性的影响
      • 对计算速度的影响
      • 示例
      • 总结
  • 6 apply_async(), apply(), 和 pool.map()
      • `apply_async()`
        • 特性:
        • 语法:
      • `apply()`
        • 特性:
        • 语法:
      • `pool.map()`
        • 特性:
        • 语法:
      • 原理
      • 示例
        • 使用 `apply_async()`
        • 使用 `apply()`
        • 使用 `pool.map()`
      • 总结
  • 7callback和get方法
      • `callback` 方法
        • 使用示例
      • `get()` 方法
        • 使用示例
      • 总结
      • 何时使用
  • 8 start的使用
      • 使用 `start` 方法
        • 示例
      • 何时使用 `start`
      • 使用 `run` 方法
      • 总结

1 多进程解释

在Python中,多进程是一种利用操作系统的多核或多处理器能力来并行执行任务的方法。Python的标准库提供了multiprocessing模块来支持多进程编程。
多进程

多进程是指使用多个进程同时运行程序的不同部分。在Python中,由于全局解释器锁(GIL)的存在,即使在多线程环境中,也不能充分利用多核CPU的优势。因此,使用多进程可以绕过GIL,从而实现真正的并行计算。
进程池

进程池是一个包含多个工作进程的对象。它会预先创建一定数量的进程,并将任务分发给这些进程处理。这样可以避免频繁创建和销毁进程带来的开销。当一个任务完成时,该进程就可以被用来执行下一个任务。

2 进程的演示

import multiprocessingimport timedef worker(num):"""模拟任务"""print(f"Worker {num} started")time.sleep(2)  # 模拟耗时任务print(f"Worker {num} finished")if __name__ == "__main__":processes = []# 创建进程for i in range(4):p = multiprocessing.Process(target=worker, args=(i,))processes.append(p)p.start()# 等待所有进程结束for p in processes:p.join()print("All workers finished.")

在这个例子中,我们创建了四个进程,每个进程都执行worker函数。我们首先创建了这些进程,并使用start()方法启动它们。然后使用join()方法等待所有子进程完成。然后结束主进程。

3 进程池方法

进程池则是一种更加高效的方式来管理多个进程。进程池会在开始时创建一定数量的进程,并将任务分配给这些进程执行。

示例代码

import multiprocessingdef square(x):return x * xif __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:numbers = [1, 2, 3, 4, 5]# 使用 map 方法result = pool.map(square, numbers)print("Results:", result)# 使用 apply_async 方法async_result = []for number in numbers:res = pool.apply_async(square, (number,))async_result.append(res)for r in async_result:print(r.get())  # 获取结果pool.close()pool.join()

在这个例子中,我们创建了一个包含4个进程的进程池。我们使用pool.map()方法来并行地对列表中的每一个元素应用square函数。此外,我们还展示了如何使用apply_async()方法异步地执行任务,并通过get()方法获取结果。

4 pool.map()的解析

pool.map() 是 Python 的 multiprocessing 模块中的一个方法,用于将一个可迭代对象中的元素分布到进程池中的各个进程上进行并行处理。它类似于内置的 map() 函数,但专门设计用于多进程环境。

pool.map() 的基本用法

pool.map() 接受两个参数:

  1. function: 一个可调用对象,将在每个进程上并行执行。
  2. iterable: 一个可迭代对象,其元素将作为参数传递给 function

返回值

pool.map() 返回一个列表,其中的元素是按照输入可迭代对象的顺序排列的结果。

语法

result = pool.map(function, iterable[, chunksize])
  • chunksize 是一个可选参数,用于控制每次提交给进程池的任务的数量。如果不指定,将自动计算一个合适的值。

示例

下面是一个使用 pool.map() 的简单示例:

import multiprocessingdef square(x):return x * xif __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:numbers = [1, 2, 3, 4, 5]# 使用 map 方法result = pool.map(square, numbers)print("Results:", result)

在这个例子中,我们定义了一个简单的函数 square,它返回传入数字的平方。然后我们创建了一个包含4个进程的进程池,并使用 pool.map()numbers 列表中的每个元素传递给 square 函数。

注意事项

  1. 阻塞性pool.map() 是一个阻塞调用,意味着它会等待所有任务完成后再返回结果。
  2. 数据类型pool.map() 只能处理可以序列化的输入参数和返回值。这是因为数据需要在进程间传输,而 Python 的进程间通信机制(如 Pipe 和 Queue)只能传输可以序列化的数据。
  3. 错误处理:如果在某个进程中发生错误,pool.map() 会抛出异常。可以通过 try-except 块来捕获这些异常。

适用场景

pool.map() 最适合用于那些可以很容易地将任务分解成独立单元的情况,例如对一个列表中的每个元素进行相同的计算操作。这种方法非常适合 CPU 密集型任务,如数值计算或图像处理。

通过使用 pool.map(),你可以有效地利用多核处理器的并行处理能力来加速程序的执行。

5 pool.join()详解

pool.join()的作用是在使用Python的multiprocessing模块中的Pool类时确保所有的子进程都已经完成执行。当你使用进程池(Pool)来执行任务时,通常会先调用close()方法来禁止向进程池添加新的任务,然后再调用join()方法来等待所有已经提交的任务完成。

pool.join()详解

  1. 关闭进程池

    • 在调用join()之前,通常需要先调用pool.close()close()方法确保不会再有新的任务提交到进程池中。
    pool.close()
    
  2. 等待任务完成

    • join()方法则等待所有已提交的任务完成执行。这意味着主进程会暂停执行,直到进程池中的所有工作进程都完成了它们的任务。
    pool.join()
    

示例

以下是一个使用Pool类的示例,演示了如何正确使用close()join()方法:

import multiprocessingdef square(x):return x * xif __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:numbers = [1, 2, 3, 4, 5]# 使用 map 方法result = pool.map(square, numbers)print("Results:", result)# 使用 apply_async 方法async_result = []for number in numbers:res = pool.apply_async(square, (number,))async_result.append(res)for r in async_result:print(r.get())  # 获取结果# 关闭进程池,不再接受新任务pool.close()# 等待所有任务完成pool.join()

在这个例子中,我们首先使用pool.map()来并行处理一组数字。然后,我们使用apply_async()来异步地执行任务,并通过get()方法获取结果。最后,我们调用pool.close()来阻止进一步的任务提交,然后调用pool.join()来等待所有任务完成。

注意事项

  • 调用join()前务必先调用close(),否则可能会引发异常。
  • 如果不使用with语句管理进程池,那么在调用join()之后,通常还需要调用pool.terminate()来清理进程池资源。

通过这种方式,你可以确保在继续执行主程序之前,所有的任务都已经完成执行。这对于确保数据一致性以及避免潜在的资源泄漏非常重要。

pool.join()的运行逻辑

pool.map() 方法确实是一个阻塞调用,这意味着它会等待所有任务完成并收集结果后才会返回。尽管如此,这并不一定会显著影响整体计算速度,特别是对于CPU密集型任务而言。下面是一些关键点来说明这一点:

阻塞特性的影响

  1. 等待所有任务完成pool.map() 会等待所有提交给进程池的任务完成,并且会按顺序返回结果。这意味着在调用 pool.map() 后,主进程会暂停执行,直到所有任务完成。

  2. CPU 密集型任务:对于 CPU 密集型任务来说,阻塞特性不会显著影响性能,因为主要瓶颈在于 CPU 计算而非 I/O 或其他延迟操作。在这种情况下,进程池可以充分利用多核处理器的能力来并行执行任务。

对计算速度的影响

  1. 并行计算的优势:对于能够有效利用多核 CPU 的任务,pool.map() 提供的并行计算可以大大加快计算速度。每个进程都会独立执行任务的一部分,最终合并结果。

  2. 任务分解:如果任务可以很好地分解为独立的小任务,那么使用 pool.map() 就非常合适。这种方法可以提高整体效率,尤其是当任务数量大于处理器核心数时。

  3. 任务粒度:任务的粒度(即单个任务的大小)也会影响性能。较大的任务粒度可能更适合并行化,因为这样可以减少任务调度的开销。

示例

假设我们有一个计算密集型任务,比如计算一个大列表中每个元素的平方。我们可以使用 pool.map() 来加速这个过程:

import multiprocessingdef square(x):return x * xif __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:numbers = list(range(1, 1000001))  # 生成一个从 1 到 1000000 的列表# 使用 map 方法start_time = time.time()result = pool.map(square, numbers)end_time = time.time()print("Results:", result[:5])  # 输出前五个结果print("Time taken:", end_time - start_time, "seconds")

在这个例子中,我们使用 pool.map() 来并行计算列表中每个元素的平方。尽管 pool.map() 是阻塞的,但由于计算密集型任务的性质,这种阻塞并不会导致显著的性能损失。

总结

  • 对于 CPU 密集型任务,pool.map() 的阻塞性不会显著影响计算速度,反而可以大大提高处理速度。
  • 对于 I/O 密集型任务或需要频繁交互的任务,可能需要考虑非阻塞方法或使用其他并行技术。

因此,在选择是否使用 pool.map() 时,请根据具体的任务类型和要求来决定。如果你的任务主要是计算密集型的,那么 pool.map() 是一个很好的选择。

6 apply_async(), apply(), 和 pool.map()

apply_async(), apply(), 和 pool.map() 是 Python 的 multiprocessing 模块中用于并行处理任务的三种不同方法。它们在使用场景、参数处理、结果获取等方面有所不同。下面我将详细介绍它们的异同点及原理。

apply_async()

apply_async() 方法用于异步地将参数传递给一个函数并在进程池中的一个进程中执行该函数。它可以处理任意数量的参数,并允许指定回调函数来处理结果。

特性:
  1. 异步调用apply_async() 是一个非阻塞调用,它可以立即返回一个结果对象,而不需要等待任务完成。
  2. 参数灵活性:可以接受任意数量的位置参数和关键字参数。
  3. 单一进程:每次调用 apply_async() 会将任务分配给进程池中的一个进程,而不是像 pool.map() 那样将多个任务分发给多个进程。
  4. 结果获取:通过 get() 方法从结果对象中获取结果。也可以通过 callback 参数指定一个回调函数来处理结果。
语法:
result_object = pool.apply_async(function, args[, kwds][, callback][, error_callback])

apply()

apply() 方法用于同步地将参数传递给一个函数并在进程池中的一个进程中执行该函数。它可以处理任意数量的参数。

特性:
  1. 阻塞调用apply() 是一个阻塞调用,它会等待任务完成并返回结果。
  2. 参数灵活性:可以接受任意数量的位置参数和关键字参数。
  3. 单一进程:每次调用 apply() 会将任务分配给进程池中的一个进程。
  4. 结果获取:直接返回结果。
语法:
result = pool.apply(function, args[, kwds])

pool.map()

pool.map() 方法用于将一个可迭代对象中的元素分布到进程池中的各个进程上进行并行处理。它类似于内置的 map() 函数,但专门设计用于多进程环境。

特性:
  1. 阻塞调用pool.map() 是一个阻塞调用,它会等待所有任务完成并返回结果。
  2. 参数限制:只能处理单一可迭代参数。如果需要处理多个参数,需要先将它们组合成一个元组或列表。
  3. 结果顺序:返回的结果列表与输入的可迭代对象保持相同的顺序。
  4. 并行处理:每个元素会被分配给进程池中的一个进程进行处理。
语法:
result = pool.map(function, iterable[, chunksize])

原理

  • apply_async()

    • 使用 apply_async() 时,你可以在等待任务完成的同时继续执行其他代码。它返回一个结果对象,你可以通过调用 get() 方法来获取结果,或者通过 callback 参数指定一个回调函数来处理结果。
  • apply()

    • 使用 apply() 时,它会等待任务完成并直接返回结果。这是一种简单的同步调用,适合处理单一任务。
  • pool.map()

    • 使用 pool.map() 时,它会等待所有任务完成并返回一个结果列表。它适合处理可以容易地分解为独立任务的情况,特别是当任务数量较大时。

示例

使用 apply_async()
import multiprocessingdef square(x):return x * xdef callback(result):print("Result:", result)if __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:for i in range(5):pool.apply_async(square, (i,), callback=callback)pool.close()pool.join()
使用 apply()
import multiprocessingdef square(x):return x * xif __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:for i in range(5):result = pool.apply(square, (i,))print("Result:", result)
使用 pool.map()
import multiprocessingdef square(x):return x * xif __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:numbers = [1, 2, 3, 4, 5]# 使用 map 方法result = pool.map(square, numbers)print("Results:", result)

总结

  • apply_async()

    • 适用于需要异步执行任务的情况,可以立即返回结果对象。
    • 适合处理单个任务,可以处理任意数量的参数。
  • apply()

    • 适用于需要同步执行任务的情况,会等待任务完成并直接返回结果。
    • 适合处理单个任务,可以处理任意数量的参数。
  • pool.map()

    • 适用于将单一可迭代参数中的每个元素并行处理。
    • 返回结果列表,结果顺序与输入参数一致。
    • 通常用于处理大量相似任务。

根据你的具体需求选择合适的方法。如果你的任务可以很容易地分解为独立的、可以并行处理的任务,并且这些任务只需要单一参数,那么 pool.map() 是一个很好的选择。如果你的任务参数较为复杂,或者你需要更灵活地处理任务,那么 apply_async()apply() 可能更适合。

7callback和get方法

在 Python 的 multiprocessing 模块中,callbackget() 方法通常与 apply_async() 方法一起使用,用于处理异步任务的结果。

callback 方法

callback 是一个可选参数,可以在调用 apply_async() 时指定。当异步任务完成后,callback 函数会被调用,并将任务的结果作为参数传递给它。

使用示例
import multiprocessingdef square(x):return x * xdef callback(result):print("Result:", result)if __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:for i in range(5):pool.apply_async(square, (i,), callback=callback)pool.close()pool.join()

在这个例子中,我们定义了一个简单的函数 square,它返回传入数字的平方。我们使用 apply_async() 方法来异步地执行任务,并通过 callback 参数来指定一个回调函数来处理结果。当我们调用 pool.close()pool.join() 时,所有任务都已经被提交,并且主程序会等待所有任务完成。

get() 方法

get() 方法用于从异步任务的结果对象中获取结果。当你使用 apply_async() 方法时,它会返回一个结果对象。你可以通过调用 get() 方法来等待任务完成并获取结果。

使用示例
import multiprocessingdef square(x):return x * xif __name__ == "__main__":with multiprocessing.Pool(processes=4) as pool:results = []for i in range(5):result = pool.apply_async(square, (i,))results.append(result)for r in results:print("Result:", r.get())  # 获取结果pool.close()pool.join()

在这个例子中,我们同样定义了一个简单的函数 square。我们使用 apply_async() 方法来异步地执行任务,并将返回的结果对象存储在一个列表中。然后我们遍历这个列表,通过调用 get() 方法来获取每个任务的结果。

总结

  • callback

    • 用于指定一个回调函数来处理异步任务的结果。
    • 回调函数会在任务完成后被自动调用。
  • get()

    • 用于从结果对象中获取异步任务的结果。
    • 会等待任务完成并返回结果。

何时使用

  • 如果你需要在任务完成后立即处理结果,并且想要避免阻塞主程序的执行,那么使用 callback 是一个好选择。
  • 如果你只需要在所有任务完成后处理结果,并且不介意等待所有任务完成,那么使用 get() 方法更为简单。

根据你的具体需求选择合适的方法。如果你的任务可以很容易地分解为独立的、可以并行处理的任务,并且需要立即处理结果,那么使用 callback 可能更合适。如果你的任务参数较为复杂,或者只需要在所有任务完成后处理结果,那么使用 get() 方法可能更方便。

8 start的使用

在 Python 的 multiprocessing 模块中,start 方法通常用于启动一个单独的进程。当你创建了一个 Process 对象时,你需要调用 start 方法来启动这个进程。

使用 start 方法

当你创建了一个 Process 对象时,你需要调用它的 start 方法来启动进程。一旦调用了 start 方法,进程就会开始执行 target 函数。

示例
import multiprocessing
import timedef worker():print("Worker started")time.sleep(2)  # 模拟耗时操作print("Worker finished")if __name__ == "__main__":p = multiprocessing.Process(target=worker)p.start()  # 启动进程# 主程序可以继续执行其他操作print("Main program continues...")p.join()  # 等待进程完成print("Process finished.")

在这个例子中,我们创建了一个名为 worker 的简单函数,它打印一条消息,然后模拟一个耗时的操作,最后再次打印一条消息。我们创建了一个 Process 对象,并指定了 worker 函数作为目标函数。接着,我们调用 start 方法来启动进程,并在主程序中继续执行其他操作。最后,我们调用 join 方法来等待进程完成。

何时使用 start

  • 启动独立进程:当你需要创建一个新的进程来执行特定任务时,你应该使用 start 方法。
  • 并行执行任务:当你希望在主线程继续执行的同时,另一个进程也在后台执行时,你应该使用 start 方法。
  • 避免阻塞:如果你不希望主线程等待子进程完成,那么应该使用 start 方法,而不是 run 方法。

使用 run 方法

需要注意的是,run 方法并不是用来启动进程的。run 方法是 Process 类的一个内部方法,它实际上包含了进程执行的逻辑。当你创建一个 Process 对象时,实际上是 run 方法在内部被调用来执行 target 函数。通常情况下,你不应该直接调用 run 方法。

总结

  • start 方法

    • 用于启动进程。
    • 应该在创建 Process 对象后调用。
    • 允许进程在后台执行,不影响主线程的继续执行。
  • run 方法

    • Process 类的内部方法,不应直接调用。
    • 包含了执行 target 函数的逻辑。

当你需要创建并启动一个单独的进程来执行特定任务时,你应该使用 start 方法。这使得你可以在主线程继续执行其他操作的同时,让新进程在后台执行任务。

这篇关于python使用多进程multiprocessing的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1119781

相关文章

python管理工具之conda安装部署及使用详解

《python管理工具之conda安装部署及使用详解》这篇文章详细介绍了如何安装和使用conda来管理Python环境,它涵盖了从安装部署、镜像源配置到具体的conda使用方法,包括创建、激活、安装包... 目录pytpshheraerUhon管理工具:conda部署+使用一、安装部署1、 下载2、 安装3

Mysql虚拟列的使用场景

《Mysql虚拟列的使用场景》MySQL虚拟列是一种在查询时动态生成的特殊列,它不占用存储空间,可以提高查询效率和数据处理便利性,本文给大家介绍Mysql虚拟列的相关知识,感兴趣的朋友一起看看吧... 目录1. 介绍mysql虚拟列1.1 定义和作用1.2 虚拟列与普通列的区别2. MySQL虚拟列的类型2

Python进阶之Excel基本操作介绍

《Python进阶之Excel基本操作介绍》在现实中,很多工作都需要与数据打交道,Excel作为常用的数据处理工具,一直备受人们的青睐,本文主要为大家介绍了一些Python中Excel的基本操作,希望... 目录概述写入使用 xlwt使用 XlsxWriter读取修改概述在现实中,很多工作都需要与数据打交

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

关于@MapperScan和@ComponentScan的使用问题

《关于@MapperScan和@ComponentScan的使用问题》文章介绍了在使用`@MapperScan`和`@ComponentScan`时可能会遇到的包扫描冲突问题,并提供了解决方法,同时,... 目录@MapperScan和@ComponentScan的使用问题报错如下原因解决办法课外拓展总结@

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超

Linux使用fdisk进行磁盘的相关操作

《Linux使用fdisk进行磁盘的相关操作》fdisk命令是Linux中用于管理磁盘分区的强大文本实用程序,这篇文章主要为大家详细介绍了如何使用fdisk进行磁盘的相关操作,需要的可以了解下... 目录简介基本语法示例用法列出所有分区查看指定磁盘的区分管理指定的磁盘进入交互式模式创建一个新的分区删除一个存

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学