Skip to content

并行计算

在 Python 中,可以通过 内置模块 或 第三方库 实现并行循环,核心思路是利用多线程、多进程或协程(根据任务类型选择)。

一、concurrent.futures(内置模块,无需额外安装)‘

核心逻辑:

用 ThreadPoolExecutor(多线程)或 ProcessPoolExecutor(多进程)提交循环任务,自动并行执行。

适用场景:

  • 多线程(ThreadPoolExecutor):适合 IO 密集型任务(如网络请求、文件读写、数据库操作),线程切换成本低。
  • 多进程(ProcessPoolExecutor):适合 CPU 密集型任务(如数学计算、数据处理),规避 Python GIL 锁限制。
python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

# 1. 定义要并行执行的任务(循环体逻辑)
def process_task(x):
    """模拟任务:IO 密集型(sleep)或 CPU 密集型(计算)"""
    time.sleep(0.1)  # 模拟 IO 操作(换为计算逻辑就是 CPU 密集型)
    return x * 2  # 任务结果

if __name__ == "__main__":
    # 待处理的循环数据(相当于 for 循环的迭代对象)
    data = range(10)  # 0~9,共 10 个任务

    # --------------------------
    # 方式1:多线程(IO 密集型首选)
    # --------------------------
    start = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:  # 最多 5 个线程并行
        # map:自动将 data 中的元素逐个传给 process_task,返回结果列表
        results = list(executor.map(process_task, data))
    print("多线程耗时:", time.time() - start)  # 约 0.2 秒(10 个任务/5 线程 ≈ 2 批)
    print("结果:", results)  # [0, 2, 4, ..., 18]

    # --------------------------
    # 方式2:多进程(CPU 密集型首选)
    # --------------------------
    start = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:  # 最多 4 个进程并行
        results = list(executor.map(process_task, data))
    print("多进程耗时:", time.time() - start)  # 约 0.3 秒(进程启动有少量开销)
    print("结果:", results)

二、高效并行:multiprocessing(内置模块,更灵活)

python
from multiprocessing import Pool
import time

def process_task(x):
    time.sleep(0.1)
    return x * 2

if __name__ == "__main__":
    data = range(10)
    start = time.time()

    # 创建进程池,指定 4 个进程
    with Pool(processes=4) as pool:
        # map 方法:批量执行任务
        results = pool.map(process_task, data)

    print("耗时:", time.time() - start)  # 约 0.3 秒
    print("结果:", results)

三、第三方神器:joblib(专为数据处理优化)

joblib 是 sklearn 生态的工具,对 multiprocessing 做了封装,处理大数据 / 数组时更高效(支持内存共享),语法极简。

安装:

bash
pip install joblib

代码示例:

python
from joblib import Parallel, delayed
import time

def process_task(x):
    time.sleep(0.1)
    return x * 2

if __name__ == "__main__":
    data = range(10)
    start = time.time()

    # n_jobs:并行数(-1 表示自动使用所有 CPU 核心)
    #默认 `backend="loky"`(多进程,是joblib推荐的进程后端,比`multiprocessing`更稳定)
    results = Parallel(n_jobs=-1)(delayed(process_task)(x) for x in data)

    print("耗时:", time.time() - start)  # 约 0.1 秒(满核心运行)
    print("结果:", results)

参数:

"loky"(默认):多进程,CPU 密集型(计算多)、数据处理(如 numpy/pandas 运算) 自动规避 GIL 锁,支持内存共享(大数据高效) "threading" : 多线程,IO 密集型(等待多,如文件读写、网络请求),切换开销低,共享进程内存(无需 IPC 通信) "multiprocessing" :多进程,兼容旧版本场景,基于 Python 内置 multiprocessing,功能不如 loky 完善