Python并发系统学习路线第255讲_核心原理与实战案例详解【指导】

Python并发的核心难点在于任务调度、状态共享与I/O阻塞点控制,而非语法本身;需区分I/O密集型(用asyncio)与CPU密集型(用multiprocessing),并严格管理线程/进程/事件循环边界及资源安全。

Python 并发不是靠“多开几个线程”就能解决的,真正卡住人的从来不是 threadingasyncio 的语法,而是任务调度时机、状态共享边界、I/O 阻塞点是否真的被绕过——第255讲这个编号本身没意义,但背后暴露的问题很真实:很多人学完 ThreadPoolExecutor 还是写不出稳定爬虫,跑通 async def 仍会遇到 RuntimeError: Event loop is closed

为什么 concurrent.futures.ThreadPoolExecutor 跑着跑着就卡死?

这不是线程池坏了,而是你没管住「阻塞源」。常见于混合使用同步 I/O(比如 requests.get())和线程池时,未设超时、未捕获异常、或共享了非线程安全对象(如全局 sqlite3.Connection)。

  • max_workers 设太高反而触发系统级连接数限制(尤其 HTTP),建议从 min(32, os.cpu_count() + 4) 起调
  • 务必给每个 submit() 包裹 try/except,否则一个任务崩溃会导致 as_completed() 提前退出
  • 避免在线程内复用单例 Session(如 requests.Session()),每个线程应持有一个独立实例

asyncio.run() 为什么不能在已运行的 event loop 里调用?

这是初学者最常撞上的墙:RuntimeError: asyncio.run() cannot be called from a running event loop。根本原因是 Python 的 event loop 是 per-thread 且不可重入的——你在 Jupyter、FastAPI 中间件、或已启动的 asyncio.create_task() 里再调 asyncio.run(),等于试图嵌套启动主循环。

  • 替代方案:用 asyncio.create_task() 提交协程,或直接 await 已有协程对象
  • Jupyter 中可用 await coro(需 IPython ≥ 7.0),而非强行 asyncio.run(coro)
  • 若必须从同步上下文进异步,且确定当前无 loop,才用 asyncio.run();否则检查是否已处于 async def 内部

什么时候该用 multiprocessing 而不是 asyncio

别被“异步更快”带偏。CPU 密集型任务(如图像处理、数值计算)用 asyncio 不仅没提速,还会因协程切换增加开销。真正的分水岭在「等待 vs 计算」:

  • I/O 密集(HTTP 请求、数据库查询、文件读写)→ 优先 asyncio + aiohttp/aiomysql
  • CPU 密集(numpy 矩阵运算、PIL 图像缩放)→ 必须 multiprocessing,且注意 spawn 启动方式比 fork 更安全(尤其 Windows/macOS)
  • 混合场景(如下载+解析 HTML)→ 拆:下载用 asyncio,解析用 multiprocessing.Pool.map(),用 queue.Queuemultiprocessing.Queue 传数据
import asyncio
import aiohttp
from multiprocessing import Pool

def cpu_heavy_task(data):

纯计算,不 await,不 IO

return sum(x ** 2 for x in data)

async def fetch_url(session, url): async with session.get(url) as resp: return await resp.text()

async def main(): urls = ["https://www./link/5f69e19efaba426d62faeab93c308f5c"] 10 async with aiohttp.ClientSession() as session: htmls = await asyncio.gather([fetch_url(session, u) for u in urls])

# 把 HTML 列表交给多进程解析
with Pool() as pool:
    results = pool.map(cpu_heavy_task, [list(h.encode()) for h in htmls])
return results

并发系统的复杂性不在语法,而在你能否清晰画出「哪段代码在哪个线程/进程/event loop 里执行」「数据在哪儿被读写」「错误发生时控制权落在谁手里」。很多问题其实只需要加一行 print(f"pid={os.getpid()}, tid={threading.get_ident()}") 就能定位。