一个 FastAPI 服务接了三方 OCR 接口,压测发现单实例 QPS 上不去,p99 居高不下。看代码"全部用了 async",看 CPU 也没满。最后用 py-spy 抓到罪魁:有一处偷偷的 requests.get 阻塞了 event loop,所有协程都被卡在那。本文把 asyncio 里常见的 5 种隐性阻塞讲清楚,带 py-spy 实战。
事故现场
@app.post("/ocr")
async def ocr_endpoint(file: UploadFile):
img_bytes = await file.read()
# 步骤 1: 调三方 OCR
result = await ocr_client.recognize(img_bytes) # ✅ 用了 await
# 步骤 2: 把图压缩存 OSS
compressed = compress_image(img_bytes) # ❌ CPU 密集,阻塞 event loop
# 步骤 3: 写日志到内部日志服务
log_to_internal(result) # ❌ 用的 requests,阻塞
return {"text": result.text}
看起来"加了 async 就够",实际上压缩图片和日志服务调用都阻塞了 event loop。一个请求阻 200ms,event loop 这 200ms 啥也干不了,所有其他请求被排队。
asyncio 的核心模型
Python 的 asyncio 是单线程协作式调度:
[event loop] - [coroutine A] - await io
[coroutine B] - await io
[coroutine C] - await io
...
event loop 调度所有协程,await 时把控制权还回 loop,loop 选下一个就绪的协程跑。
❗ 一旦协程不 await(纯 CPU 或同步 IO),就独占 event loop
其他协程必须等它结束才能跑。整个服务的并发度 → 1。
隐性阻塞 1:用了同步 IO 库
# 错:requests 是同步的
import requests
async def fetch_data():
r = requests.get("https://api.example.com/data") # 阻塞 200ms
return r.json()
# 对:用 httpx 或 aiohttp
import httpx
async def fetch_data():
async with httpx.AsyncClient() as client:
r = await client.get("https://api.example.com/data")
return r.json()
常见误用 + 对应正确库:
同步库 异步替代
requests httpx / aiohttp
psycopg2 asyncpg / aiopg
pymongo motor
redis-py(同步) aioredis / redis.asyncio(redis-py 4.2+ 内置)
elasticsearch elasticsearch-async / async client
boto3 aioboto3
pymysql aiomysql
kafka-python aiokafka
隐性阻塞 2:CPU 密集计算
# 错:在协程里直接做 CPU 密集计算
async def process_image(img_bytes):
img = Image.open(io.BytesIO(img_bytes))
img.thumbnail((800, 800)) # PIL 缩放,纯 CPU
output = io.BytesIO()
img.save(output, format='JPEG', quality=85)
return output.getvalue() # 整个过程阻塞 event loop
# 对:扔到线程池
import asyncio
import concurrent.futures
_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
async def process_image(img_bytes):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_thread_pool, _process_sync, img_bytes)
def _process_sync(img_bytes):
img = Image.open(io.BytesIO(img_bytes))
img.thumbnail((800, 800))
output = io.BytesIO()
img.save(output, format='JPEG', quality=85)
return output.getvalue()
更进一步:CPU 密集 + GIL,线程池其实并发不上去(Python GIL 限制单核)。重 CPU 用进程池:
_process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=4)
async def process_image_heavy(img_bytes):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_process_pool, _process_sync, img_bytes)
# 进程池避免 GIL,真并行
隐性阻塞 3:asyncio.sleep 写成 time.sleep
# 错
async def retry_with_delay():
for i in range(3):
try:
return await call_api()
except RetryableError:
time.sleep(1) # ❌ 阻塞整个 event loop 1 秒
raise Exception("max retry exceeded")
# 对
async def retry_with_delay():
for i in range(3):
try:
return await call_api()
except RetryableError:
await asyncio.sleep(1) # 让出 event loop
raise Exception("max retry exceeded")
隐性阻塞 4:同步生成器 + 大数据处理
# 错
async def process_large_file(path):
with open(path, 'rb') as f:
data = f.read() # ❌ 同步读 1GB 文件,阻塞 5 秒
for line in data.split(b'\n'):
process(line)
return "done"
# 对:用 aiofiles + 分块
import aiofiles
async def process_large_file(path):
async with aiofiles.open(path, 'rb') as f:
async for line in f: # 异步迭代
process(line)
return "done"
隐性阻塞 5:不知道某些库底层是同步的
# 看起来异步,实际底层用 thread pool 模拟,IO 真发生时仍是同步
async def hash_file(path):
return await asyncio.to_thread(_hash_sync, path)
# fastapi 的 BackgroundTasks 在主进程里跑,不是真异步
@app.post("/upload")
async def upload(bg: BackgroundTasks):
bg.add_task(send_notification) # 不会阻塞响应,但在 event loop 里跑,长任务还是会阻
return {"ok": True}
# 真的后台任务要用 Celery / Arq / RQ
from arq import create_pool
async def upload_endpoint():
redis = await create_pool(...)
await redis.enqueue_job('send_notification', user_id)
return {"ok": True}
诊断工具:py-spy
# 安装
pip install py-spy
# 抓堆栈采样(60 秒,在 prod 上可以跑,几乎无开销)
py-spy record -o profile.svg --pid 12345 --duration 60
# 火焰图打开看,直接定位哪个函数占了 event loop 大部分时间
# 关键指标:event loop 线程在 read() / send() / time.sleep / requests / json.loads 大数据 等同步调用上的时间
# 也可以 top 模式实时看
py-spy top --pid 12345
# %CPU Function Module
# 45.2 read ssl
# 20.1 loads json
# 10.8 compress_image app.image_utils
# ...
# subprocess 模式:不用 attach
py-spy record -o profile.svg -- python -m uvicorn app:app
uvloop + orjson 性能优化
# uvloop:用 Cython 重写的 event loop,比默认快 2-4 倍
import uvloop
import asyncio
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# FastAPI 启动加 --loop uvloop
# uvicorn app:app --loop uvloop --workers 4
# orjson:比标准 json 快 5-10 倍,用 Rust 写
import orjson
from fastapi.responses import ORJSONResponse
@app.get("/users", response_class=ORJSONResponse)
async def get_users():
return {"data": [...]}
# 替换默认 response_class
app = FastAPI(default_response_class=ORJSONResponse)
asyncpg 连接池配置
import asyncpg
from contextlib import asynccontextmanager
pool: asyncpg.Pool
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(
dsn="postgresql://user:pass@db/mydb",
min_size=10, # 启动就有 10 个连接
max_size=50, # 高峰最多 50
max_queries=50_000, # 单连接处理 5w 个 query 后回收
max_inactive_connection_lifetime=300, # 空闲 5 分钟回收
command_timeout=30, # 单 query 超时 30 秒
)
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/user/{uid}")
async def get_user(uid: int):
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", uid)
return dict(row) if row else {"error": "not_found"}
压测对比
FastAPI + 同样的业务逻辑,wrk -t 4 -c 200 -d 30s
配置 QPS p99 延迟
=================================================
- requests 同步 + sync sleep + PIL 320 2100ms
+ 换 httpx 950 800ms
+ 用 asyncpg 而不是 psycopg2 1400 550ms
+ PIL 扔线程池 2100 280ms
+ uvloop 3400 150ms
+ orjson 3800 130ms
+ 4 worker(gunicorn) 14500 90ms
总提升:45 倍 QPS,延迟降到 4%
结构化排查路径
- 装 py-spy 抓火焰图,确认是不是有同步函数在 event loop 上
- 检查所有 IO 调用(DB / Redis / HTTP / Kafka)是不是用的异步库
- 检查所有 CPU 密集函数(图片处理 / 加密 / 压缩 / JSON 大对象)有没有 run_in_executor
- 把
time.sleep全局替换为asyncio.sleep - 开 uvloop + orjson
- 合理的 worker 数(一般 = 2 * CPU 数 + 1)
- 数据库连接池 size 配合 worker 数
团队最后做的代码规范
# 1. 禁止在 async def 里调用同步 IO 库
# CI 检查:grep -E "async def" -A 50 *.py | grep -E "requests\.(get|post)|psycopg2|time\.sleep"
# 命中即拒绝合并
# 2. 重 CPU 操作必须扔 executor,用装饰器统一
def run_in_thread(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
return wrapper
@run_in_thread
def heavy_compute(data):
# 这里可以放任何 CPU 操作
return process(data)
# 3. 每个新接口必须跑 py-spy 60 秒看火焰图,event loop 占用不能超 5%
asyncio 是 Python 服务高并发的命脉,但隐性阻塞太容易写错了。一处同步 IO 就能让全体协程一起趴下。装好 py-spy,养成"每个新接口看一眼火焰图"的习惯,事故率能降一个数量级。我们这套规范跑了一年半,服务的 p99 从最早的 2 秒降到 100ms 以内,worker 数还省了一半。
—— 别看了 · 2026