当您从事计算机视觉项目时,您可能需要预处理大量图像数据。这很耗时,如果能够并行处理多个图像,那将是非常棒的。多进程是一个系统同时运行多个处理器的能力。如果您的计算机只有一个处理器,它会在多个进程之间切换以保持所有进程运行。然而,今天大多数计算机至少都有多核处理器,允许同时执行多个进程。Python 多进程模块是一个工具,您可以通过将任务分配给不同的进程来提高脚本的效率。
完成本教程后,您将了解:
- 为什么我们要使用多进程
- 如何使用 Python 多进程模块中的基本工具
使用我的新书《Python for Machine Learning》启动您的项目,其中包括逐步教程和所有示例的 Python 源代码文件。
让我们开始吧。
Python 中的多进程处理
图片来源:Thirdman。保留部分权利。
概述
本教程分为四个部分;它们是
- 多进程的优点
- 基本多进程
- 实际应用中的多进程
- 使用 joblib
多进程的优点
您可能会问:“为什么是多进程?” 多进程可以通过并行而不是顺序运行多个任务,从而使程序效率大幅提高。一个类似的术语是多线程,但它们是不同的。
进程是一个加载到内存中运行的程序,它不与其他进程共享内存。线程是进程中的一个执行单元。多个线程在一个进程中运行并相互共享该进程的内存空间。
Python 的全局解释器锁(GIL)只允许在解释器下一次运行一个线程,这意味着如果需要 Python 解释器,您无法享受多线程的性能优势。这就是多进程在 Python 中优于线程的地方。多个进程可以并行运行,因为每个进程都有自己的解释器来执行分配给它的指令。此外,操作系统会将您的程序视为多个进程并分别调度它们,即您的程序总共获得了更大份额的计算机资源。因此,当程序是 CPU 密集型时,多进程更快。在程序中存在大量 I/O 的情况下,多线程可能更高效,因为大多数时候您的程序都在等待 I/O 完成。然而,多进程通常更高效,因为它并发运行。
基本多进程
让我们使用 Python 多进程模块来编写一个演示如何进行并发编程的基本程序。
让我们看看这个函数 task()
,它休眠 0.5 秒,然后在休眠前后打印
1 2 3 4 5 6 |
import time def task(): print('Sleeping for 0.5 seconds') time.sleep(0.5) print('Finished sleeping') |
要创建一个进程,我们只需使用多进程模块进行说明
1 2 3 4 |
... import multiprocessing p1 = multiprocessing.Process(target=task) p2 = multiprocessing.Process(target=task) |
Process()
的 target
参数指定了进程运行的目标函数。但这些进程不会立即运行,直到我们启动它们
1 2 3 |
... p1.start() p2.start() |
一个完整的并发程序如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import multiprocessing import time def task(): print('Sleeping for 0.5 seconds') time.sleep(0.5) print('Finished sleeping') if __name__ == "__main__": start_time = time.perf_counter() # 创建两个进程 p1 = multiprocessing.Process(target=task) p2 = multiprocessing.Process(target=task) # 启动两个进程 p1.start() p2.start() finish_time = time.perf_counter() print(f"Program finished in {finish_time-start_time} seconds") |
我们必须将主程序围在 if __name__ == "__main__"
下,否则 multiprocessing
模块会报错。这种安全机制保证 Python 在子进程创建之前完成程序分析。
然而,这段代码有一个问题,因为程序计时器在创建的进程执行之前就打印出来了。以下是上述代码的输出
1 2 3 4 5 |
程序在 0.012921249988721684 秒内完成 休眠 0.5 秒 休眠 0.5 秒 休眠结束 休眠结束 |
我们需要在两个进程上调用 join()
函数,使它们在时间打印之前运行。这是因为有三个进程正在运行:p1
、p2
和主进程。主进程负责计时并打印执行所需的时间。我们应该确保 finish_time
这行代码不早于 p1
和 p2
进程完成之后运行。我们只需在 start()
函数调用之后立即添加以下代码片段
1 2 3 |
... p1.join() p2.join() |
join()
函数允许我们让其他进程等待,直到被调用 join()
的进程完成。以下是添加了 join 语句后的输出
1 2 3 4 5 |
休眠 0.5 秒 休眠 0.5 秒 休眠结束 休眠结束 程序在 0.5688213340181392 秒内完成 |
同理,我们可以让更多的进程运行。以下是从上面修改过的完整代码,它包含 10 个进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import multiprocessing import time def task(): print('Sleeping for 0.5 seconds') time.sleep(0.5) print('Finished sleeping') if __name__ == "__main__": start_time = time.perf_counter() processes = [] # 创建 10 个进程并启动它们 for i in range(10): p = multiprocessing.Process(target = task) p.start() processes.append(p) # 加入所有进程 for p in processes: p.join() finish_time = time.perf_counter() print(f"Program finished in {finish_time-start_time} seconds") |
想开始学习机器学习 Python 吗?
立即参加我为期7天的免费电子邮件速成课程(附示例代码)。
点击注册,同时获得该课程的免费PDF电子书版本。
实际应用中的多进程
启动一个新进程,然后将其重新连接到主进程,这就是 Python(和许多其他语言一样)中多进程的工作方式。我们想要运行多进程的原因可能是为了提高速度而并发执行许多不同的任务。这可能是一个图像处理函数,我们需要对成千上万张图像进行处理。也可能是为了后续的自然语言处理任务将 PDF 转换为纯文本,我们需要处理一千个 PDF。通常,我们会为这些任务创建一个接受参数(例如文件名)的函数。
我们来看一个函数
1 2 |
def cube(x): return x**3 |
如果我们要使用参数 1 到 1000 运行它,我们可以创建 1000 个进程并并行运行它们
1 2 3 4 5 6 7 8 9 10 11 |
import multiprocessing def cube(x): return x**3 if __name__ == "__main__": # 这不起作用 processes = [multiprocessing.Process(target=cube, args=(x,)) for x in range(1,1000)] [p.start() for p in processes] result = [p.join() for p in processes] print(result) |
然而,这行不通,因为您的计算机中可能只有少数几个核心。运行 1000 个进程会产生太多的开销,并使您的操作系统不堪重负。此外,您可能已经耗尽了内存。更好的方法是运行一个进程池来限制一次可以运行的进程数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import multiprocessing import time def cube(x): return x**3 if __name__ == "__main__": pool = multiprocessing.Pool(3) start_time = time.perf_counter() processes = [pool.apply_async(cube, args=(x,)) for x in range(1,1000)] result = [p.get() for p in processes] finish_time = time.perf_counter() print(f"Program finished in {finish_time-start_time} seconds") print(result) |
multiprocessing.Pool()
的参数是要在池中创建的进程数量。如果省略,Python 将使其等于您计算机中的核心数量。
我们使用 apply_async()
函数以列表推导式的方式将参数传递给 cube
函数。这将为池创建要运行的任务。之所以称为“async
”(异步),是因为我们没有等待任务完成,并且主进程可能继续运行。因此,apply_async()
函数不返回结果,而是返回一个我们可以使用 get()
来等待任务完成并检索结果的对象。由于我们在列表推导式中获取结果,因此结果的顺序与我们在异步任务中创建的参数相对应。然而,这并不意味着进程在池中是按此顺序启动或完成的。
如果您认为编写代码行来启动进程并加入它们过于冗长,可以考虑使用 map()
代替
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import multiprocessing import time def cube(x): return x**3 if __name__ == "__main__": pool = multiprocessing.Pool(3) start_time = time.perf_counter() result = pool.map(cube, range(1,1000)) finish_time = time.perf_counter() print(f"Program finished in {finish_time-start_time} seconds") print(result) |
这里没有启动和加入,因为它隐藏在 pool.map()
函数后面。它的作用是将可迭代对象 range(1,1000)
分割成多个块,并在池中运行每个块。map 函数是列表推导式的并行版本
1 |
result = [cube(x) for x in range(1,1000)] |
但是,现代替代方案是使用 concurrent.futures
中的 map
,如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import concurrent.futures import time def cube(x): return x**3 if __name__ == "__main__": with concurrent.futures.ProcessPoolExecutor(3) as executor: start_time = time.perf_counter() result = list(executor.map(cube, range(1,1000))) finish_time = time.perf_counter() print(f"Program finished in {finish_time-start_time} seconds") print(result) |
这段代码在底层运行着 multiprocessing
模块。这样做的好处是,我们可以通过简单地将 ProcessPoolExecutor
替换为 ThreadPoolExecutor
,将程序从多进程更改为多线程。当然,您必须考虑全局解释器锁是否是您代码的问题。
使用 joblib
joblib
包是一套使并行计算更容易的工具。它是一个常见的多进程第三方库。它还提供了缓存和序列化功能。要安装 joblib
包,请在终端中使用命令
1 |
pip install joblib |
我们可以将之前的示例转换为以下内容以使用 joblib
1 2 3 4 5 6 7 8 9 10 11 |
import time from joblib import Parallel, delayed def cube(x): return x**3 start_time = time.perf_counter() result = Parallel(n_jobs=3)(delayed(cube)(i) for i in range(1,1000)) finish_time = time.perf_counter() print(f"Program finished in {finish_time-start_time} seconds") print(result) |
确实,它的作用很直观。delayed()
函数是另一个函数的包装器,用于创建函数的“延迟”版本调用。这意味着它在调用时不会立即执行该函数。
然后,我们用我们想要传递给它的不同参数集多次调用延迟函数。例如,当我们向函数 cube
的延迟版本提供整数 1
时,我们不会计算结果,而是生成一个元组 (cube, (1,), {})
,分别表示函数对象、位置参数和关键字参数。
我们使用 Parallel()
创建了引擎实例。当它以列表作为参数被调用时,它将实际并行执行每个元组指定的作业,并在所有作业完成后将结果收集为列表。这里我们使用 n_jobs=3
创建了 Parallel()
实例,因此将有三个进程并行运行。
我们也可以直接编写元组。因此,上面的代码可以改写为
1 |
result = Parallel(n_jobs=3)((cube, (i,), {}) for i in range(1,1000)) |
使用 joblib
的好处是,我们只需添加一个额外的参数就可以在多线程中运行代码
1 |
result = Parallel(n_jobs=3, prefer="threads")(delayed(cube)(i) for i in range(1,1000)) |
这隐藏了并行运行函数的所有细节。我们使用的语法与普通的列表推导式没有太大区别。
进一步阅读
如果您想深入了解,本节提供了更多关于该主题的资源。
书籍
- 高性能 Python,第二版,作者:Micha Gorelick 和 Ian Ozsvald
API
- joblib
- Python 标准库中的 multiprocessing
- Python 标准库中的 concurrent.futures
总结
在本教程中,您学习了如何并行运行 Python 函数以提高速度。特别是,您学习了
- 如何在 Python 中使用
multiprocessing
模块创建运行函数的进程 - 启动和完成进程的机制
- 在
multiprocessing
中使用进程池进行受控多进程以及concurrent.futures
中对应的语法 - 如何使用第三方库
joblib
进行多进程
太棒了
爱了,杰森!
非常欢迎你,Soc!
您知道差分进化算法的任何并行实现吗?
非常棒的内容。感谢您的整理!
爱了!
谢谢您的反馈,Jaron!
以前没用过 joblib,非常方便!
我建议简要地多提及一些多线程的性能优势(例如,避免内存分叉),以及在哪些情况下多进程性能不佳,因此更适合使用像 Julia、C++ 等支持线程的语言(其中许多语言可以与 Python 互操作)。
Jesse,很棒的反馈!
亲爱的 Daniel,
首先感谢您的多进程教程。
Python 中的多进程和 Python 中的多线程有什么区别?
虽然多进程和多线程都旨在有独立的执行流,但多进程似乎是按执行顺序进行的,而线程的执行顺序由计算机操作系统决定。
您能做一个关于线程的教程,以便您能比较和对比结果吗?
谢谢你,
悉尼的Anthony
很抱歉告诉您,您误解了线程和进程的运行方式。两者都不是确定性的,而是取决于操作系统。由操作系统调度程序控制哪个进程运行,以及进程中的哪个线程获得 CPU。多进程和多线程的区别在于是否共享“上下文”。每个进程都有独立的内存空间,但所有线程共享同一块内存。因此,在多线程中,您会关心访问变量时的竞态条件。但您可能会问,如果两个进程不能看到对方的变量,它们如何通信。
尊敬的 Adrian 博士,
谢谢你,
尽管如此,一个关于多线程的教程,加上多线程和多进程在教程中的比较,会有所帮助。
非常感谢
悉尼的Anthony
你好,
我使用 joblib 和 multiprocessing 运行了立方体示例,但对我来说,multiprocessing 总是比简单地在 for 循环中调用函数慢。我是做错了什么,还是你能解释一下为什么会这样?
嗨 Taaresh……请详细说明您的输入数据的特征,以便我们更好地帮助您。
作者是 Daniel Chung,而不是 Jason Brownlee,对吗?如果真是这样,旁边的图片在文章归属上令人困惑。(当然,Jason 也写了很多很棒的内容。)
感谢 Daniel 提供的简洁有用的文章。