跳到主要内容

Python 多线程编程实践

· 阅读需 4 分钟
素明诚
Full stack development

全局解释器锁(GIL)

在 Python 的并发编程中,全局解释器锁(GIL)是一个重要的考量因素。它限制了在 CPython(解释器)中同时只有一个线程执行字节码,从而影响 CPU 密集型任务的性能。尽管如此,GIL 的存在对 I/O 密集型任务影响较小,因为线程在等待 I/O 操作时可以释放 GIL,允许其他线程运行。在此背景下,开发者通常根据任务性质选择合适的并发模式。

对于 I/O 密集型任务,Threading模块和ThreadPoolExecutor提供了较为简单的实现,允许多个线程并行执行 I/O 操作。而asyncio则利用事件循环和协程来处理高并发 I/O 请求,其非阻塞模型在处理大量连接时更为高效。

在 CPU 密集型任务的场景中,multiprocessing模块是更优的选择。它通过创建多个独立进程,每个进程都有自己的 Python 解释器和内存空间,从而绕过 GIL,实现真正的并行计算。这使得multiprocessing能充分利用多核处理器的能力。

并发编程方式

特性threadingmultiprocessingThreadPoolExecutorasyncio
适用场景I/O 密集型CPU 密集型I/O 密集型I/O 密集型
并行性受 GIL 限制真正的并行受 GIL 限制真正的并行(基于协程)
资源管理共享内存进程间内存隔离线程池自动管理单线程事件循环
管理复杂性简单较复杂简单中等复杂(需要理解异步概念)
性能对于 I/O 效率较高对于计算效率较高对于 I/O 效率较高对于大量并发 I/O 操作效率高
适合并发量较小(线程上下文切换开销)大(多个进程)较大(可配置最大线程数)很大(可处理成千上万的连接)
编程模型基于线程基于进程基于线程基于协程

threading

最适合处理 I/O 密集型任务,但在 CPU 密集型任务中性能受限于 GIL。管理简单,适合小规模并发场景。

import threading
import requests


# 下载单个 URL 的函数
def download_url(url):
try:
response = requests.get(url, timeout=5)
print(f"成功下载: {url}, 内容长度: {len(response.content)}")
except Exception as e:
print(f"下载失败: {url}, 错误: {e}")


def main(urls):
threads = []
for url in urls:
thread = threading.Thread(target=download_url, args=(url,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join() # 等待所有线程完成


if __name__ == "__main__":
urls = [f"https://www.example.com/{i}" for i in range(10)]
main(urls)

multiprocessing

适合 CPU 密集型任务,能够实现真正的并行。进程间内存隔离,适合大规模计算,但管理较复杂。

import multiprocessing
import requests


# 下载单个 URL 的函数
def download_url(url):
try:
response = requests.get(url, timeout=5)
print(f"成功下载: {url}, 内容长度: {len(response.content)}")
except Exception as e:
print(f"下载失败: {url}, 错误: {e}")


def main(urls):
with multiprocessing.Pool(processes=5) as pool: # 使用进程池
pool.map(download_url, urls)


if __name__ == "__main__":
urls = [f"https://www.example.com/{i}" for i in range(10)]
main(urls)

ThreadPoolExecutor

提供了简单的线程池管理,适合 I/O 密集型任务。更高效地管理线程,易于使用。

import requests
from concurrent.futures import ThreadPoolExecutor


# 下载单个 URL 的函数
def download_url(url):
try:
response = requests.get(url, timeout=5)
print(f"成功下载: {url}, 内容长度: {len(response.content)}")
except Exception as e:
print(f"下载失败: {url}, 错误: {e}")


def main(urls):
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_url, urls)


if __name__ == "__main__":
urls = [f"https://www.example.com/{i}" for i in range(10)]
main(urls)

asyncio

语法类似于 JS,通过事件循环和协程处理 I/O 密集型任务,能处理大量并发。编程模型相对复杂,需要理解异步编程的概念,但能显著提升 I/O 性能。

import asyncio
import aiohttp


# 异步下载单个 URL 的函数
async def download_url(session, url):
try:
async with session.get(url, timeout=5) as response:
content = await response.read()
print(f"成功下载: {url}, 内容长度: {len(content)}")
except Exception as e:
print(f"下载失败: {url}, 错误: {e}")


async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [download_url(session, url) for url in urls]
await asyncio.gather(*tasks)


if __name__ == "__main__":
urls = [f"https://www.example.com/{i}" for i in range(10)]
asyncio.run(main(urls))

四种方式对比

对于 I/O 密集型任务asyncioThreadPoolExecutor 是优秀的选择,但 asyncio 更适合处理高并发连接。

对于 CPU 密集型任务multiprocessing 模块允许在 Python 中创建多个进程,每个进程都有自己的 Python 解释器和内存空间。因此,它可以绕过全局解释器锁(GIL),实现真正的并行计算。这使得 multiprocessing 非常适合 CPU 密集型任务,因为它能够充分利用多核处理器的计算能力。