并行计算
在 Python 中,可以通过 内置模块 或 第三方库 实现并行循环,核心思路是利用多线程、多进程或协程(根据任务类型选择)。
一、concurrent.futures(内置模块,无需额外安装)‘
核心逻辑:
用 ThreadPoolExecutor(多线程)或 ProcessPoolExecutor(多进程)提交循环任务,自动并行执行。
适用场景:
- 多线程(
ThreadPoolExecutor):适合 IO 密集型任务(如网络请求、文件读写、数据库操作),线程切换成本低。 - 多进程(
ProcessPoolExecutor):适合 CPU 密集型任务(如数学计算、数据处理),规避 Python GIL 锁限制。
python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
# 1. 定义要并行执行的任务(循环体逻辑)
def process_task(x):
"""模拟任务:IO 密集型(sleep)或 CPU 密集型(计算)"""
time.sleep(0.1) # 模拟 IO 操作(换为计算逻辑就是 CPU 密集型)
return x * 2 # 任务结果
if __name__ == "__main__":
# 待处理的循环数据(相当于 for 循环的迭代对象)
data = range(10) # 0~9,共 10 个任务
# --------------------------
# 方式1:多线程(IO 密集型首选)
# --------------------------
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor: # 最多 5 个线程并行
# map:自动将 data 中的元素逐个传给 process_task,返回结果列表
results = list(executor.map(process_task, data))
print("多线程耗时:", time.time() - start) # 约 0.2 秒(10 个任务/5 线程 ≈ 2 批)
print("结果:", results) # [0, 2, 4, ..., 18]
# --------------------------
# 方式2:多进程(CPU 密集型首选)
# --------------------------
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor: # 最多 4 个进程并行
results = list(executor.map(process_task, data))
print("多进程耗时:", time.time() - start) # 约 0.3 秒(进程启动有少量开销)
print("结果:", results)二、高效并行:multiprocessing(内置模块,更灵活)
python
from multiprocessing import Pool
import time
def process_task(x):
time.sleep(0.1)
return x * 2
if __name__ == "__main__":
data = range(10)
start = time.time()
# 创建进程池,指定 4 个进程
with Pool(processes=4) as pool:
# map 方法:批量执行任务
results = pool.map(process_task, data)
print("耗时:", time.time() - start) # 约 0.3 秒
print("结果:", results)三、第三方神器:joblib(专为数据处理优化)
joblib 是 sklearn 生态的工具,对 multiprocessing 做了封装,处理大数据 / 数组时更高效(支持内存共享),语法极简。
安装:
bash
pip install joblib代码示例:
python
from joblib import Parallel, delayed
import time
def process_task(x):
time.sleep(0.1)
return x * 2
if __name__ == "__main__":
data = range(10)
start = time.time()
# n_jobs:并行数(-1 表示自动使用所有 CPU 核心)
#默认 `backend="loky"`(多进程,是joblib推荐的进程后端,比`multiprocessing`更稳定)
results = Parallel(n_jobs=-1)(delayed(process_task)(x) for x in data)
print("耗时:", time.time() - start) # 约 0.1 秒(满核心运行)
print("结果:", results)参数:
"loky"(默认):多进程,CPU 密集型(计算多)、数据处理(如 numpy/pandas 运算) 自动规避 GIL 锁,支持内存共享(大数据高效) "threading" : 多线程,IO 密集型(等待多,如文件读写、网络请求),切换开销低,共享进程内存(无需 IPC 通信) "multiprocessing" :多进程,兼容旧版本场景,基于 Python 内置 multiprocessing,功能不如 loky 完善